#!/usr/bin/env python
#
# Copyright (c) Greenplum Inc 2008. All Rights Reserved.
#
#
# THIS IMPORT MUST COME FIRST
#
# import mainUtils FIRST to get python version check
from gppylib.mainUtils import *

import os, sys
import signal
import time
from optparse import Option, OptionGroup, OptionParser, OptionValueError, SUPPRESS_USAGE
import socket

try:
    from gppylib.gpparseopts import OptParser, OptChecker
    from gppylib.gplog import *
    from gppylib.db import dbconn
    from gppylib.db import catalog
    from gppylib.gparray import *
    from gppylib import gphostcache
    from gppylib import userinput
    from gppylib import pgconf
    from gppylib.commands import unix
    from gppylib.commands import gp
    from gppylib.commands.gp import SEGMENT_STOP_TIMEOUT_DEFAULT, SegmentStop, GpSegStopCmd
    from gppylib.commands import base
    from gppylib.commands import pg
    from gppylib.commands import dca
    from gppylib.utils import TableLogger
    from gppylib.gp_era import GpEraFile
    from gppylib.operations.unix import CleanSharedMem
    from gppylib.operations.utils import ParallelOperation, RemoteOperation
    from gppylib.operations.rebalanceSegments import ReconfigDetectionSQLQueryCommand
except ImportError, e:
    sys.exit('ERROR: Cannot import modules.  Please check that you have sourced greenplum_path.sh.  Detail: ' + str(e))

DEFAULT_NUM_WORKERS = 64
logger = get_default_logger()

PG_CTL_STATUS_RUNNING = 0
PG_CTL_STATUS_STOPPED = 3
PG_CTL_STATUS_SLEEP_INTERVAL = 1.0  # number of seconds to sleep before issuing next pg_ctl status


# ---------------------------------------------------------------
class SegStopStatus:
    """Tracks result of trying to stop an individual segment database"""

    def __init__(self, db, stopped=False, reason=None, failedCmd=None, timedOut=False):
        self.db = db
        self.stopped = stopped
        self.reason = reason
        self.failedCmd = failedCmd
        self.timedOut = timedOut

    def __str__(self):
        if self.stopped:
            return "DBID:%d  STOPPED" % self.db.dbid
        else:
            return "DBID:%d  FAILED  host:'%s' datadir:'%s' with reason:'%s'" % (
            self.db.dbid, self.db.hostname, self.db.datadir, self.reason)


def print_progress(pool, interval=10):
    """
    Waits for a WorkerPool to complete, printing a progress percentage marker
    once at the beginning of the call, and thereafter at the provided interval
    (default ten seconds). A final 100% marker is printed upon completion.
    """
    def print_completed_percentage():
        # pool.completed can change asynchronously; save its value.
        completed = pool.completed

        pct = 0
        if pool.assigned:
            pct = float(completed) / pool.assigned

        pool.logger.info('%0.2f%% of jobs completed' % (pct * 100))
        return completed >= pool.assigned

    # print_completed_percentage() returns True if we're done.
    while not print_completed_percentage():
        pool.join(interval)


# ---------------------------------------------------------------
class GpStop:
    ######
    def __init__(self, mode, master_datadir=None,
                 parallel=DEFAULT_NUM_WORKERS, quiet=False, masteronly=False, sighup=False,
                 interactive=False, stopstandby=False, restart=False,
                 timeout=SEGMENT_STOP_TIMEOUT_DEFAULT, logfileDirectory=False, onlyThisHost=None):
        self.mode = mode
        self.master_datadir = master_datadir
        self.pool = None
        self.parallel = parallel
        self.quiet = quiet
        self.pid = 0
        self.masteronly = masteronly
        self.sighup = sighup
        self.interactive = interactive
        self.stopstandby = stopstandby
        self.restart = restart
        self.hadFailures = False
        self.timeout = timeout
        self.logfileDirectory = logfileDirectory
        self.onlyThisHost = onlyThisHost

        # some variables that will be assigned during run()
        self.gphome = None
        self.port = None
        self.dburl = None
        self.conn = None
        self.gparray = None
        self.hostcache = None
        self.gpversion = None

        logger.debug("Setting level of parallelism to: %d" % self.parallel)
        pass


    #####
    def _find_any_primary_and_mirror_on_same_host(self, segs):
        map_content_id_to_segment = {}
        for seg in segs:
            seg_content_id = seg.getSegmentContentId()
            if seg_content_id in map_content_id_to_segment:
                if seg.isSegmentPrimary(True):
                    return seg, map_content_id_to_segment[seg_content_id]
                else:
                    return map_content_id_to_segment[seg_content_id], seg
            else:
                map_content_id_to_segment[seg_content_id] = seg

        return None, None

    def _filter_segments_for_single_host_stop(self):
        host_to_segments_mapping = self.gparray.getSegmentsByHostName(self.gparray.getSegDbList())
        segments_on_host = host_to_segments_mapping[self.onlyThisHost]
        up_segments_on_host = [seg for seg in segments_on_host if seg.isSegmentUp()]
        matching_primary, matching_mirror = self._find_any_primary_and_mirror_on_same_host(up_segments_on_host)
        if matching_primary or matching_mirror:
            raise Exception("Segment host '%s' has both of corresponding primary "
                            "'%s' and mirror '%s'. Aborting." % (matching_primary.getSegmentHostName(),
                                                                 matching_primary, matching_mirror))

        for seg in up_segments_on_host:
            if seg.isSegmentModeSynchronized():
                continue
            raise Exception("Segment '%s' not synchronized. Aborting." % seg)

        return up_segments_on_host


    def run(self):
        """
        Run and return the exitCode for the program
        """
        self._prepare()

        if self.masteronly:
            if self.interactive:
                if not userinput.ask_yesno(None, "\nContinue with master-only shutdown", 'N'):
                    raise UserAbortedException()
            try:
                # Disable Ctrl-C
                signal.signal(signal.SIGINT, signal.SIG_IGN)

                self._stop_master()
            finally:
                # Reenable Ctrl-C
                self.cleanup()
                signal.signal(signal.SIGINT, signal.default_int_handler)
        else:
            if self.sighup:
                return self._sighup_cluster()
            else:

                segs = []
                if self.onlyThisHost:
                    if self.onlyThisHost in self.gparray.get_master_host_names():
                        raise Exception("Specified host '%s' has the master or standby master on it. This node can only be stopped as part of a full-cluster gpstop, without '--host'." %
                                        self.onlyThisHost)
                    if not self.gparray.hasMirrors:
                        raise Exception("Cannot perform host-specific gpstop on a cluster without segment mirroring.")
                    segs = self._filter_segments_for_single_host_stop()
                else:
                    segs = self.gparray.getSegDbList()

                if self.interactive:
                    self._summarize_actions(segs)
                    if not userinput.ask_yesno(None, "\nContinue with Greenplum instance shutdown", 'N'):
                        raise UserAbortedException()

                try:
                    # Disable Ctrl-C
                    signal.signal(signal.SIGINT, signal.SIG_IGN)

                    if self.onlyThisHost is None:
                        self._stop_master()
                        self._stop_standby()
                    self._stop_segments(segs)
                    self._stop_gpmmon()
                    self._stop_gpsmon()
                    self._remove_shared_memory()  # this creates a new logfile - why?
                finally:
                    # Reenable Ctrl-C
                    self.cleanup()
                    signal.signal(signal.SIGINT, signal.default_int_handler)

                if self.onlyThisHost:
                    logger.info("Recognizing new cluster state...")
                    try:
                        # currently responsible for triggering an update to gp_segment_configuration
                        # because dbconn.connect() internally calls commit()
                        self.conn = dbconn.connect(self.dburl)

                        # backup in case connect() does not do a commit
                        ReconfigDetectionSQLQueryCommand(self.conn).run()
                    except Exception as e:
                        logger.debug('query trying to start a transaction failed: %s' % str(e))
                        logger.debug('expected: the purpose was that by attempting a transaction, gp_segment_configuration would be updated')
                    finally:
                        if self.conn:
                            self.conn.close()

                if self.restart:
                    logger.info("Restarting System...")
                    gp.NewGpStart.local('restarting system', verbose=logging_is_verbose(),
                                        nostandby=not self.stopstandby, masterDirectory=self.master_datadir)
                else:
                    if dca.is_dca_appliance():
                        logger.info("Unregistering with DCA")
                        dca.DcaGpdbStopped.local()
                        logger.info("Unregistered with DCA")

                if self.hadFailures:
                    # MPP-15208
                    return 2
        return 0

    ######
    def _stop_gpmmon(self):
        logger.info('Cleaning up leftover gpmmon process')
        gpmmon_running = False
        cmd = Command('check if gpmmon process is running', cmdStr="ps -ef | grep '[g]pmmon -D' | awk '{print $2}'")
        try:
            cmd.run(validateAfter=True)
        except Exception as e:
            logger.warning('Unable to determine if gpmmon process is running (%s)' % str(e))
            logger.warning('Not attempting to stop gpmmon process')

        result = cmd.get_results()
        if result.rc == 0 and result.stdout.strip() != '':
            gpmmon_running = True

        if gpmmon_running:
            cmd = Command('stop the gpmmon process',
                          cmdStr="ps -ef | grep '[g]pmmon -D' | awk '{print $2}' | xargs kill -9")
            try:
                cmd.run(validateAfter=True)
            except Exception as e:
                logger.error('Error while stopping gpmmon process (%s)' % str(e))
        else:
            logger.info('No leftover gpmmon process found')

    ######
    def _stop_gpsmon(self):
        logger.info('Cleaning up leftover gpsmon processes')
        cluster_hosts = set([gphost.hostname for gphost in self.hostcache.get_hosts()])
        cluster_hosts.add(self.gparray.master.getSegmentHostName())
        if self.gparray.standbyMaster:
            cluster_hosts.add(self.gparray.standbyMaster.getSegmentHostName())

        for host in cluster_hosts:
            cmd = Command('Check if gpsmon is running', cmdStr="ps -ef | grep '[g]psmon -m' | awk '{print \$2}'",
                          ctxt=REMOTE, remoteHost=host)
            self.pool.addCommand(cmd)
        self.pool.join()

        gpsmon_found = True
        valid_hosts = set([])
        for item in self.pool.getCompletedItems():
            result = item.get_results()
            if result.rc == 0 and result.stdout.strip() != '':
                valid_hosts.add(item.remoteHost)
            else:
                gpsmon_found = False

        if not gpsmon_found:
            logger.info(
                'No leftover gpsmon processes on some hosts. not attempting forceful termination on these hosts')

        for host in valid_hosts:
            cmd = Command('kill all the gpsmon processes',
                          cmdStr="ps -ef | grep '[g]psmon -m' | awk '{print \$2}' | xargs kill -9",
                          ctxt=REMOTE, remoteHost=host)
            self.pool.addCommand(cmd)
        self.pool.join()

        for item in self.pool.getCompletedItems():
            result = item.get_results()
            if result.rc != 0:
                logger.warning('Failed to stop gpsmon process on host %s: (%s)' % (item.remoteHost, result.stderr))

    ######
    def _remove_shared_memory(self):
        """
        If segment processes fail to stop even after the alotted time limit of 2 minutes, we proceed
        to forcibly terminate the processes. This might leave shared memory and semaphores uncleaned.
        CleanSharedMem class uses the ipcrm command to remove shared memory.
        ipcrm will only remove shared memory once all the processes that have attached to
        the shared memory segment have exited. Hence it is safe to call this on all segments.
        Semaphores however cannot be removed in this manner since we don't know which semaphores
        are still being used. Hence we leave it to the user to clean up semaphores when it
        is safe.
        """

        logger.info('Cleaning up leftover shared memory')
        cluster_hosts = set([gphost for gphost in self.hostcache.get_hosts()])

        operations = []
        for gphost in cluster_hosts:
            operations.append(RemoteOperation(CleanSharedMem(gphost.dbs), gphost.hostname))

        operations.append(
            RemoteOperation(CleanSharedMem([self.gparray.master]), self.gparray.master.getSegmentHostName()))

        if self.gparray.standbyMaster:
            operations.append(RemoteOperation(CleanSharedMem([self.gparray.standbyMaster]),
                                              self.gparray.standbyMaster.getSegmentHostName()))

        ParallelOperation(operations).run()

        for operation in operations:
            try:
                operation.get_ret()
            except Exception as e:
                logger.warning('Unable to clean shared memory (%s)' % str(e))

    ######
    def cleanup(self):
        if self.pool:
            self.pool.haltWork()

            ######

    def _prepare(self):
        logger.info("Gathering information and validating the environment...")
        self.gphome = gp.get_gphome()
        if self.master_datadir is None:
            self.master_datadir = gp.get_masterdatadir()
        self.user = unix.getUserName()
        gp.check_permissions(self.user)
        self._read_postgresqlconf()
        self._check_db_running()
        self._build_gparray()
        if self.onlyThisHost:
            self._is_hostname_valid()
        self._check_version()

    ######
    def _is_hostname_valid(self):
        segments = self.gparray.getSegmentsByHostName( self.gparray.getDbList())
        host_names = segments.keys()
        if self.onlyThisHost not in host_names:
            logger.error("host '%s' is not found in gp_segment_configuration" % self.onlyThisHost)
            logger.error("hosts in cluster config: %s" % host_names)
            raise SystemExit(1)

    ######
    def _check_version(self):
        self.gpversion = gp.GpVersion.local('local GP software version check', self.gphome)
        logger.info("Greenplum Version: '%s'" % self.gpversion)

    ######
    def _read_postgresqlconf(self):
        logger.debug("Obtaining master's port from master data directory")
        pgconf_dict = pgconf.readfile(self.master_datadir + "/postgresql.conf")
        self.port = pgconf_dict.int('port')
        logger.debug("Read from postgresql.conf port=%s" % self.port)


        ######

    def _check_db_running(self):
        if os.path.exists(self.master_datadir + '/postmaster.pid'):
            self.pid = gp.read_postmaster_pidfile(self.master_datadir)
            if not unix.check_pid(self.pid):
                logger.warning("Have a postmaster.pid file but no Master segment process running")
                logger.info("Clearing postmaster.pid file and /tmp lock files")

                lockfile = "/tmp/.s.PGSQL.%s" % self.port
                logger.info("Clearing Master instance lock files")
                os.remove(lockfile)

                logger.info("Clearing Master instance pid file")
                os.remove("%s/postmaster.pid" % self.master_datadir)

                logger.info("Setting recovery parameters")
                self.mode = 'fast'
                logger.info("Commencing forced shutdown")
            pass
        else:
            raise ExceptionNoStackTraceNeeded(
                'postmaster.pid file does not exist.  is Greenplum instance already stopped?')

    ######
    def _build_gparray(self):
        logger.info("Obtaining Greenplum Master catalog information")

        logger.info("Obtaining Segment details from master...")
        self.dburl = dbconn.DbURL(port=self.port, dbname='template1')
        self.gparray = GpArray.initFromCatalog(self.dburl, utility=True)

    class _SigIntHandler(object):
        def __init__(self):
            self.interrupted = False

        def _handler(self, signum, stack_frame):
            self.interrupted = True

        def enable(self):
            self.interrupted = False
            signal.signal(signal.SIGINT, self._handler)

        def disable(self):
            signal.signal(signal.SIGINT, signal.SIG_IGN)

        def __enter__(self):
            self._prev_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)

            # Try to prevent easy mistakes by ensuring that SIGINT was
            # ignored before we entered the manager.
            if self._prev_handler != signal.SIG_IGN:
                signal.signal(signal.SIGINT, self._prev_handler)
                raise Exception("SIGINT disposition must be SIG_IGN before entering SigIntHandler")
            return self

        def __exit__(self, exc_type, exc_value, traceback):
            # Put back the previous handler.
            signal.signal(signal.SIGINT, self._prev_handler)
            return False


    def _is_master_stopped(self):
        with open(os.devnull, 'w') as devnull:
            ret = subprocess.call(['pg_ctl', 'status', '-D', self.master_datadir],
                                  stdout=devnull)
        if ret == PG_CTL_STATUS_STOPPED:
            return True
        if ret != PG_CTL_STATUS_RUNNING:
            raise Exception('pg_ctl status failed with return code %d' % ret)
        return False



    # Send a no-wait stop to the master, and then poll for it to stop.  If the user gets impatient or
    # the operation times out, show the connections and allow the user to upgrade the stop to fast or
    # immediate mode.
    def _stop_master_smart(self):
        logger.info("Stopping master segment and waiting for user connections to finish ...")

        self.conn = dbconn.connect(self.dburl, utility=True)
        user_connections_header, user_connections = catalog.getUserConnectionInfo(self.conn)
        self.conn.close()

        ret = subprocess.call(['pg_ctl', 'stop', '-W', '-m', 'smart', '-D', self.master_datadir])
        if ret:
            raise Exception('pg_ctl stop smart failed')

        time_waited = 0
        wait_forever = False
        with GpStop._SigIntHandler() as handler:
            while not self._is_master_stopped():
                # time.sleep() can be safely interrupted (https://docs.python.org/2/library/time.html#time.sleep)
                handler.enable()
                time.sleep(PG_CTL_STATUS_SLEEP_INTERVAL)
                time_waited = time_waited + PG_CTL_STATUS_SLEEP_INTERVAL
                handler.disable()

                if handler.interrupted or (time_waited >= self.timeout and not wait_forever):
                    handler.interrupted = False

                    warn_text = """Smart mode failed to shutdown the master.\n"""
                    if len(user_connections):
                        warn_text += """  There were {} user connections at the start of the shutdown. Note: These connections may be outdated.
  The following connections were found before shutting down:
  {}\n""".format(len(user_connections), user_connections_header)
                        for user_conn in user_connections:
                            warn_text += "  %s" % user_conn

                    logger.warn(warn_text)
                    logger.info("Continue waiting in smart mode, stop in fast mode, or stop in immediate mode.")
                    mode_selection = userinput.ask_string("Your_choice",
                                                          "['(s)mart_mode', '(f)ast_mode', '(i)mmediate_mode']",
                                                          's',
                                                          ['s', 'f', 'i'])
                    logger.info("Your choice was '%s'" % mode_selection)
                    if mode_selection == 'f':
                        self.mode = 'fast'
                        return False
                    elif mode_selection == 'i':
                        self.mode = 'immediate'
                        return False
                    else:
                        wait_forever = True
                        logger.info("Continuing waiting for user connections to finish ...")


    ######
    # NOTE: we must make sure the master is down before leaving this routine
    def _stop_master(self, masterOnly=False):
        ''' shutsdown the master '''

        logger.info("Commencing Master instance shutdown with mode='%s'" % self.mode)
        logger.info("Master segment instance directory=%s" % self.master_datadir)

        e = GpEraFile(self.master_datadir, logger=get_logger_if_verbose())
        e.end_era()

        try:
            if self.mode == 'smart':
                self._stop_master_smart()

            # NOTE: _stop_master_smart() can change self.mode
            if self.mode != 'smart':
                cmd = gp.MasterStop("stopping master", self.master_datadir, mode=self.mode, timeout=self.timeout)
                cmd.run(validateAfter=True)
        except:
            # Didn't stop in timeout or pg_ctl failed.  So try kill
            (succeeded, mypid, file_datadir) = pg.ReadPostmasterTempFile.local("Read master tmp file",
                                                                               self.dburl.pgport).getResults()
            if succeeded and file_datadir == self.master_datadir:
                if unix.check_pid(mypid):
                    logger.info("Failed to shutdown master with pg_ctl.")
                    logger.info("Sending SIGQUIT signal...")
                    os.kill(mypid, signal.SIGQUIT)
                    time.sleep(5)

                    # Still not gone... try SIGABRT
                    if unix.check_pid(mypid):
                        logger.info("Sending SIGABRT signal...")
                        os.kill(mypid, signal.SIGABRT)
                        time.sleep(5)

                    if not unix.check_pid(mypid):
                        # Clean up files
                        lockfile = "/tmp/.s.PGSQL.%s" % self.dburl.pgport
                        if os.path.exists(lockfile):
                            logger.info("Clearing segment instance lock files")
                            os.remove(lockfile)
        logger.info('Attempting forceful termination of any leftover master process')
        (succeeded, mypid, file_datadir) = pg.ReadPostmasterTempFile.local("Read master tmp file",
                                                                           self.dburl.pgport).getResults()
        unix.kill_9_segment_processes(self.master_datadir, self.dburl.pgport, mypid)

        logger.debug("Successfully shutdown the Master instance in admin mode")

    ######
    def _stop_standby(self):
        """ assumes prepare() has been called """
        if not self.stopstandby:
            return True

        if self.gparray.standbyMaster:
            standby = self.gparray.standbyMaster

            logger.info("Stopping master standby host %s mode=fast" % standby.hostname)
            try:
                cmd = SegmentStop("stopping master standby",
                                     standby.datadir, mode='fast',
                                     timeout=self.timeout,
                                     ctxt=base.REMOTE,
                                     remoteHost=standby.hostname)
                cmd.run(validateAfter=True)
            except base.ExecutionError, e:
                logger.warning("Error occured while stopping the standby master: %s" % e)

            if not pg.DbStatus.remote('checking status of standby master instance', standby, standby.hostname):
                logger.info("Successfully shutdown standby process on %s" % standby.hostname)
                return True
            else:
                logger.warning("Process master standby still running, will issue fast shutdown with immediate")
                try:
                    cmd = SegmentStop("stopping master standby", standby.datadir, mode='immediate',
                                         timeout=self.timeout,
                                         ctxt=base.REMOTE, remoteHost=standby.hostname)
                    cmd.run(validateAfter=True)
                except base.ExecutionError, e:
                    logger.warning("Error occured while stopping the standby master: %s" % e)

                if not pg.DbStatus.remote('checking status of standby master instance', standby, standby.hostname):
                    logger.info("Successfully shutdown master standby process on %s" % standby.hostname)
                    return True
                else:
                    logger.error('Failed to stop standby. Attempting forceful termination of standby process')
                    (succeeded, mypid, file_datadir) = pg.ReadPostmasterTempFile.remote("Read standby tmp file",
                                                                                        self.dburl.pgport,
                                                                                        standby.hostname).getResults()
                    unix.kill_9_segment_processes(self.master_datadir, self.dburl.pgport, mypid)
                    if not pg.DbStatus.remote('checking status of standby master instance', standby, standby.hostname):
                        logger.info("Successfully shutdown master standby process on %s" % standby.hostname)
                        return True
                    else:
                        logger.error("Unable to stop master standby on host: %s" % standby.hostname)
                        return False
        else:
            logger.info("No standby master host configured")
            return True

    ######
    def _stop_segments(self, segs):
        failed_seg_status = []
        workers = min(len(self.gparray.get_hostlist()), self.parallel)
        self.pool = base.WorkerPool(numWorkers=workers, logger=logger)

        logger.info("Targeting dbid %s for shutdown" % [seg.getSegmentDbId() for seg in segs])

        self.hostcache = gphostcache.GpHostCache(self.gparray, self.pool, segs=segs)

        failed_pings = self.hostcache.ping_hosts(self.pool)
        for db in failed_pings:
            logger.warning(
                "Skipping startup of segdb on %s directory %s Ping Failed <<<<<<" % (db.hostname, db.datadir))
            failed_seg_status.append(SegStopStatus(db, False, 'Failed to Ping on host: %s' % db.hostname))

        self.hostcache.log_contents()

        if self.gparray.hasMirrors:
            # stop primaries
            logger.info("Commencing parallel primary segment instance shutdown, please wait...")
            try:
                self._stopseg_cmds(True, False, segs=segs)
            finally:
                self.pool.join()
            primary_success_seg_status = self._process_segment_stop(failed_seg_status)

            # stop mirrors
            logger.info("Commencing parallel mirror segment instance shutdown, please wait...")
            try:
                self._stopseg_cmds(False, True, segs=segs)
            finally:
                self.pool.join()
            mirror_success_seg_status = self._process_segment_stop(failed_seg_status)

            success_seg_status = primary_success_seg_status + mirror_success_seg_status
            self._print_segment_stop(segs, failed_seg_status, success_seg_status)

        else:
            logger.info("Commencing parallel segment instance shutdown, please wait...")
            # There are no active-mirrors
            try:
                self._stopseg_cmds(True, False, segs=segs)
            finally:
                self.pool.join()
            success_seg_status = self._process_segment_stop(failed_seg_status)

            self._print_segment_stop(segs, failed_seg_status, success_seg_status)
        pass

    ######
    def _stopseg_cmds(self, includePrimaries, includeMirrors, segs):
        host_segs_map = {}
        for seg in segs:
            if seg.getSegmentHostName() in host_segs_map.keys():
                host_segs_map[seg.getSegmentHostName()].append(seg)
            else:
                host_segs_map[seg.getSegmentHostName()] = [seg]

        for hostname, gpdb_objs in host_segs_map.iteritems():
            dbs = []
            for db in gpdb_objs:
                role = db.getSegmentRole()
                if role == 'p' and includePrimaries:
                    dbs.append(db)
                elif role != 'p' and includeMirrors:
                    dbs.append(db)

            # If we have no dbs then we have no segments of the type primary
            # or mirror.  This will occur when you have an entire host fail
            # when using group mirroring.  This is because all the mirror segs
            # on the alive host will be marked primary (or vice-versa)
            if len(dbs) == 0:
                continue

            logger.debug("Dispatching command to shutdown %d segments on host: %s" % (len(dbs), hostname))
            cmd = GpSegStopCmd("remote segment starts on host '%s'" % hostname, self.gphome, self.gpversion,
                                  mode=self.mode, dbs=dbs, timeout=self.timeout,
                                  verbose=logging_is_verbose(), ctxt=base.REMOTE, remoteHost=hostname,
                                  logfileDirectory=self.logfileDirectory)
            self.pool.addCommand(cmd)

        print_progress(self.pool)

    ######
    def _process_segment_stop(self, failed_seg_status):
        '''reviews results of gpsegstop commands '''
        success_seg_status = []
        seg_timed_out = False
        cmds = self.pool.getCompletedItems()

        for cmd in cmds:
            if cmd.get_results().rc == 0 or cmd.get_results().rc == 1:
                cmdout = cmd.get_results().stdout
                lines = cmdout.split('\n')
                for line in lines:
                    if line.startswith("STATUS"):
                        fields = line.split('--')
                        dir = fields[1].split(':')[1]
                        started = fields[2].split(':')[1]
                        reasonStr = fields[3].split(':')[1]

                        if started.lower() == 'false':
                            success = False
                        else:
                            success = True

                        for db in cmd.dblist:
                            if db.datadir == dir:
                                if success:
                                    success_seg_status.append(
                                        SegStopStatus(db, stopped=True, reason=reasonStr, failedCmd=cmd))
                                else:
                                    # dbs that are marked invalid are 'skipped' but we dispatch to them
                                    # anyway since we want to try and shutdown any runaway pg processes.
                                    failed_seg_status.append(
                                        SegStopStatus(db, stopped=False, reason=reasonStr, failedCmd=cmd))

                    elif line.strip().startswith('stderr: pg_ctl: server does not shut down'):
                        # We are assuming that we know what segment failed beforehand.
                        if failed_seg_status:
                            failed_seg_status[-1].timedOut = True
                        else:
                            logger.debug("No failed segments to time out")
            else:
                for db in cmd.dblist:
                    # dbs that are marked invalid are 'skipped' but we dispatch to them
                    # anyway since we want to try and shutdown any runaway pg processes.
                    if db.valid:
                        failed_seg_status.append(
                            SegStopStatus(db, stopped=False, reason=cmd.get_results(), failedCmd=cmd))

        self.pool.empty_completed_items()
        return success_seg_status


        ######

    def _print_segment_stop(self, segs, failed_seg_status, success_seg_status):
        stopped = len(segs) - len(failed_seg_status)
        failed = len([x for x in failed_seg_status if x.db.valid])
        invalid = self.gparray.get_invalid_segdbs()
        total_segs = len(self.gparray.getSegDbList())
        timed_out = len([x for x in failed_seg_status if x.timedOut])

        if failed > 0 or logging_is_verbose():
            logger.info("------------------------------------------------")
            if logging_is_verbose():
                logger.info("Segment Stop Information")
            else:
                logger.info("Failed Segment Stop Information ")

            logger.info("------------------------------------------------")
            if failed > 0:
                for failure in failed_seg_status:
                    logger.info(failure)
            if logging_is_verbose():
                for stat in success_seg_status:
                    logger.debug(stat)

        tabLog = TableLogger(logger=logger).setWarnWithArrows(True)
        tabLog.addSeparator()
        tabLog.info(["Segments stopped successfully", "= %d" % stopped])
        tabLog.infoOrWarn(failed > 0, ["Segments with errors during stop", "= %d" % failed])
        if invalid:
            tabLog.info([])
            tabLog.warn(["Segments that are currently marked down in configuration", "= %d" % len(invalid)])
            tabLog.info(["         (stop was still attempted on these segments)"])
        tabLog.addSeparator()

        tabLog.outputTable()

        flag = "" if failed == 0 else "<<<<<<<<"
        logger.info("Successfully shutdown %d of %d segment instances %s" % (stopped, total_segs, flag))

        if failed > 0:
            self.hadFailures = True
            logger.warning("------------------------------------------------")
            logger.warning("Segment instance shutdown failures reported")
            logger.warning("Failed to shutdown %d of %d segment instances <<<<<" % (failed, total_segs))
            if timed_out > 0:
                logger.warning("%d segments did not complete their shutdown in the allowed" % timed_out)
                logger.warning("timeout of %d seconds.  These segments are still in the process" % self.timeout)
                logger.warning("of shutting down.  You will not be able to restart the database")
                logger.warning("until all processes have terminated.")
            logger.warning("A total of %d errors were encountered" % failed)
            logger.warning("Review logfile %s" % get_logfile())
            logger.warning("For more details on segment shutdown failure(s)")
            logger.warning("------------------------------------------------")
        else:
            self.hadFailures = False
            logger.info("Database successfully shutdown with no errors reported")
        pass

    ######
    def _sighup_cluster(self):
        """ assumes prepare() has been called """
        workers = min(len(self.gparray.get_hostlist()), self.parallel)

        class SighupWorkerPool(base.WorkerPool):
            """
            This pool knows all the commands are calls to pg_ctl.
            The failed list collects segments without a running postmaster.
            """

            def __init__(self, numWorkers):
                base.WorkerPool.__init__(self, numWorkers)
                self.failed = []

            def check_results(self):
                while not self.completed_queue.empty():
                    item = self.completed_queue.get(False)
                    results = item.get_results()
                    if results.wasSuccessful():
                        continue
                    self.failed.append(item.db)

        self.pool = SighupWorkerPool(numWorkers=workers)
        dbList = self.gparray.getDbList()
        hostname = socket.gethostname()
        logger.info("Signalling all postmaster processes to reload")
        for db in dbList:
            ctxt = REMOTE
            remote_host = db.getSegmentHostName()
            if db.getSegmentHostName() == hostname:
                ctxt = LOCAL
                remote_host = None
            cmd = pg.ReloadDbConf(name="reload segment number " + str(db.getSegmentDbId()),
                                  db=db,
                                  ctxt=ctxt,
                                  remoteHost=remote_host
                                  )
            self.pool.addCommand(cmd)

        if self.quiet:
            self.pool.join()
        else:
            base.join_and_indicate_progress(self.pool)

        self.pool.check_results()
        self.pool.empty_completed_items()

        if len(self.pool.failed) < 1:
            return 0

        logger.info("--------------------------------------------")
        logger.info("Some segment postmasters were not reloaded")
        logger.info("--------------------------------------------")
        tabLog = TableLogger().setWarnWithArrows(True)
        tabLog.info(["Host", "Datadir", "Port", "Status"])
        for db in self.pool.failed:
            tup = [db.getSegmentHostName(), db.getSegmentDataDirectory(), str(db.getSegmentPort()),
                   db.getSegmentStatus()]
            tabLog.info(tup)
        tabLog.outputTable()
        logger.info("--------------------------------------------")
        return 1

    ######
    def _summarize_actions(self, segs):
        logger.info("--------------------------------------------")
        logger.info("Master instance parameters")
        logger.info("--------------------------------------------")

        tabLog = TableLogger(logger=logger).setWarnWithArrows(True)
        tabLog.info(["Master Greenplum instance process active PID", "= %s" % self.pid])
        tabLog.info(["Database", "= %s" % self.dburl.pgdb])
        tabLog.info(["Master port", "= %s" % self.port])
        tabLog.info(["Master directory", "= %s" % self.master_datadir])
        tabLog.info(["Shutdown mode", "= %s" % self.mode])
        tabLog.info(["Timeout", "= %s" % self.timeout])

        standbyMsg = "On" if self.gparray.standbyMaster and self.stopstandby else "Off"
        tabLog.info(["Shutdown Master standby host", "= %s" % standbyMsg])

        tabLog.outputTable()

        logger.info("--------------------------------------------")
        logger.info("Segment instances that will be shutdown:")
        logger.info("--------------------------------------------")

        tabLog = TableLogger(logger=logger).setWarnWithArrows(True)
        tabLog.info(["Host", "Datadir", "Port", "Status"])

        for db in segs:
            tabLog.info([db.getSegmentHostName(), db.getSegmentDataDirectory(),
                         str(db.getSegmentPort()), db.getSegmentStatus()])
        tabLog.outputTable()

    # ----------------------- Command line option parser ----------------------
    @staticmethod
    def createParser():
        parser = OptParser(option_class=OptChecker,
                           description="Stops a GPDB Array.",
                           version='%prog version 6.17.0 build commit:9b887d27cef94c03ce3a3e63e4f6eefb9204631b')
        parser.setHelp([])

        addStandardLoggingAndHelpOptions(parser, includeNonInteractiveOption=True, includeUsageOption=True)

        addTo = OptionGroup(parser, 'Connection options')
        parser.add_option_group(addTo)
        addMasterDirectoryOptionForSingleClusterProgram(addTo)

        addTo = OptionGroup(parser, 'Instance shutdown options: ')
        parser.add_option_group(addTo)
        addTo.add_option('-f', '--fast', action='store_true', default=False,
                         help="<deprecated> Fast shutdown, active transactions interrupted and rolled back")
        addTo.add_option('-i', '--immediate', action='store_true', default=False,
                         help="<deprecated> Immediate shutdown, active transaction aborted.")
        addTo.add_option('-s', '--smart', action='store_true',
                         help="<deprecated> Smart shutdown, wait for active transaction to complete. [default]")
        addTo.add_option('-z', '--force', action='store_true', default=False,
                         help="<deprecated> Force shutdown of segment instances marked as invalid. Kill postmaster PID, " \
                              "delete /tmp lock files and remove segment instance postmaster.pid file.")
        addTo.add_option('-M', '--mode', type='choice', choices=['fast', 'immediate', 'smart'],
                         metavar='fast|immediate|smart', action='store', default='smart',
                         help='set the method of shutdown')

        addTo.add_option('-r', '--restart', action='store_true',
                         help='Restart Greenplum Database instance after successful gpstop.')
        addTo.add_option('-m', '--master_only', action='store_true',
                         help='stop master instance started in maintenance mode')
        addTo.add_option('-y', dest="stop_standby", action='store_false', default=True,
                         help='Do not stop the standby master process.')
        addTo.add_option('-u', dest="request_sighup", action='store_true',
                         help="upload new master postgresql.conf settings, does not stop Greenplum array," \
                              "issues a signal to the master segment postmaster process to reload")

        addTo.add_option('-B', '--parallel', type="int", default=DEFAULT_NUM_WORKERS, metavar="<parallel_processes>",
                         help='number of segment hosts to run in parallel. Default is %d' % DEFAULT_NUM_WORKERS)
        addTo.add_option('-t', '--timeout', dest='timeout', default=SEGMENT_STOP_TIMEOUT_DEFAULT, type='int',
                         help='time to wait for segment stop (in seconds)')
        addTo.add_option('--host', dest='only_this_host', type='string',
                         help='stop all segments on this host (this will only complete if failover segments are available). '
                              'hostname as displayed in gp_segment_configuration')
        addTo.add_option('--skipvalidation', action='store_true', default=False,
                         help="<development testing only> DO NOT USE")

        parser.set_defaults(verbose=False, filters=[], slice=(None, None))
        return parser

    @staticmethod
    def createProgram(options, args):
        logfileDirectory = options.ensure_value("logfileDirectory", False)
        if options.mode != 'smart':
            if options.fast or options.immediate:
                raise ProgramArgumentValidationException("Can not mix --mode options with older deprecated '-f,-i,-s'")

        if options.master_only and options.only_this_host:
            raise ProgramArgumentValidationException("Incompatible flags. Cannot mix '--host' option with '-m' for "
                                                     "master-only.")

        if options.restart and options.only_this_host:
            raise ProgramArgumentValidationException("Incompatible flags. Cannot mix '--host' option with '-r' for "
                                                     "restart.")

        if options.request_sighup and options.only_this_host:
            raise ProgramArgumentValidationException("Incompatible flags. Cannot mix '--host' option with '-u' for "
                                                     "config reload.")

        if (not options.stop_standby) and options.only_this_host:
            raise ProgramArgumentValidationException("Incompatible flags. Cannot mix '--host' option with '-y' for "
                                                     "skipping standby.")

        if options.fast:
            options.mode = "fast"
        if options.immediate:
            options.mode = "immediate"
        if options.smart:
            options.mode = "smart"

        # deprecating force option.  it no longer kills -9 things.
        # just make it stop fast instead.
        if options.force:
            options.mode = "fast"

        proccount = os.environ.get('GP_MGMT_PROCESS_COUNT')
        if options.parallel == 64 and proccount is not None:
            options.parallel = int(proccount)

        # -n sanity check
        if options.parallel > 128 or options.parallel < 1:
            raise ProgramArgumentValidationException("Invalid value for parallel degree: %s" % options.parallel)

        # Don't allow them to go below default
        if options.timeout < SEGMENT_STOP_TIMEOUT_DEFAULT and not options.skipvalidation:
            raise ProgramArgumentValidationException(
                "Invalid timeout value.  Must be greater than %s seconds." % SEGMENT_STOP_TIMEOUT_DEFAULT)

        if args:
            raise ProgramArgumentValidationException(
                "Argument %s is invalid.  Is an option missing a parameter?" % args[-1])

        return GpStop(options.mode,
                      master_datadir=options.masterDataDirectory,
                      parallel=options.parallel,
                      quiet=options.quiet,
                      masteronly=options.master_only,
                      sighup=options.request_sighup,
                      interactive=options.interactive,
                      stopstandby=options.stop_standby,
                      restart=options.restart,
                      timeout=options.timeout,
                      logfileDirectory=logfileDirectory,
                      onlyThisHost=options.only_this_host)


if __name__ == '__main__':
    simple_main(GpStop.createParser, GpStop.createProgram)
