#!/usr/bin/env python
#
# Copyright (c) Greenplum Inc 2008. All Rights Reserved.
#
#
# THIS IMPORT MUST COME FIRST
#
# import mainUtils FIRST to get python version check
import sys
from optparse import OptionGroup, SUPPRESS_HELP

from gppylib.mainUtils import *

try:
    import pickle

    from gppylib.db import dbconn
    from gppylib.gpparseopts import OptParser, OptChecker
    from gppylib.gparray import *
    from gppylib.gplog import get_default_logger, log_to_file_only
    from gppylib import gphostcache
    from gppylib import userinput
    from gppylib.db import catalog
    from gppylib.commands import unix
    from gppylib.commands import gp
    from gppylib.commands.gp import SEGMENT_TIMEOUT_DEFAULT
    from gppylib.commands import base
    from gppylib.commands import pg
    from gppylib.commands import dca
    from gppylib import pgconf
    from gppylib.heapchecksum import HeapChecksum
    from gppylib.commands.pg import PgControlData
    from gppylib.operations.startSegments import *
    from gppylib.operations.detect_unreachable_hosts import get_unreachable_segment_hosts, mark_segments_down_for_unreachable_hosts
    from gppylib.utils import TableLogger
    from gppylib.gp_era import GpEraFile
except ImportError, e:
    sys.exit('Cannot import modules.  Please check that you have sourced greenplum_path.sh.  Detail: ' + str(e))

logger = get_default_logger()


# ---------------------------------------------------------------
class GpStart:
    ######
    def __init__(self, specialMode, restricted, start_standby, master_datadir,
                 wrapper,
                 wrapper_args,
                 skip_standby_check,
                 parallel=gp.DEFAULT_GPSTART_NUM_WORKERS,
                 quiet=False,
                 masteronly=False,
                 interactive=False,
                 timeout=SEGMENT_TIMEOUT_DEFAULT,
                 logfileDirectory=False,
                 skip_heap_checksum_validation=False,
                 ):
        assert (specialMode in [None, 'maintenance'])
        self.specialMode = specialMode
        self.restricted = restricted
        self.start_standby = start_standby
        self.pool = None
        self.parallel = parallel
        self.attempt_standby_start = False
        self.quiet = quiet
        self.masteronly = masteronly
        self.master_datadir = master_datadir
        self.interactive = interactive
        self.timeout = timeout
        self.wrapper = wrapper
        self.wrapper_args = wrapper_args
        self.skip_standby_check = skip_standby_check
        self.logfileDirectory = logfileDirectory
        self.skip_heap_checksum_validation = skip_heap_checksum_validation

        #
        # Some variables that are set during execution
        #
        self.era = None
        self.gpversion = None
        self.gparray = None
        self.port = None
        self.gphome = None
        self.dburl = None
        self.max_connections = None
        logger.debug("Setting level of parallelism to: %d" % self.parallel)

    ######
    def run(self):
        self._prepare()

        # MPP-13700
        if self.masteronly:
            if os.getenv('GPSTART_INTERNAL_MASTER_ONLY'):
                logger.info('Master-only start requested for management utilities.')
            else:
                logger.warning("****************************************************************************")
                logger.warning("Master-only start requested. If a standby master is configured, this command")
                logger.warning("may lead to a split-brain condition and possible unrecoverable data loss.")
                logger.warning("Maintenance mode should only be used with direction from Greenplum Support.")
                logger.warning("****************************************************************************")
                if self.interactive:
                    if not userinput.ask_yesno(None, "\nContinue with master-only startup", 'N'):
                        raise UserAbortedException()

        try:
            # Disable Ctrl-C
            signal.signal(signal.SIGINT, signal.SIG_IGN)

            self._startMaster()
            logger.info("Master Started...")

            if self.masteronly:
                return 0

            num_workers = min(len(self.gparray.get_hostlist()), self.parallel)
            hosts = set(self.gparray.get_hostlist(includeMaster=False))
            # We check for unreachable segment hosts first thing, because if a host is down but its segments
            # are marked up, later checks can return invalid or misleading results and the cluster may not
            # start in a good state.
            unreachable_hosts = get_unreachable_segment_hosts(hosts, num_workers)
            if unreachable_hosts:
                mark_segments_down_for_unreachable_hosts(self.gparray.segmentPairs, unreachable_hosts)

            if self.skip_heap_checksum_validation:
                self.master_checksum_value = None
                logger.warning("Because of --skip-heap-checksum-validation, the GUC for data_checksums "
                                   "will not be checked between master and segments")
            else:
                self.master_checksum_value = HeapChecksum(gparray=self.gparray, num_workers=num_workers,
                                                          logger=logger).get_master_value()

            if not self.skip_standby_check:
                self.check_standby()
            else:
                logger.info("Skipping Standby activation status checking.")

            logger.info("Shutting down master")
            self.shutdown_master_only()
            # TODO: check results of command.

        finally:
            # Reenable Ctrl-C
            signal.signal(signal.SIGINT, signal.default_int_handler)

        (segmentsToStart, invalidSegments) = self._prepare_segment_start()

        if self.interactive:
            self._summarize_actions(segmentsToStart)
            if not userinput.ask_yesno(None, "\nContinue with Greenplum instance startup", 'N'):
                raise UserAbortedException()

        try:
            # Disable Ctrl-C
            signal.signal(signal.SIGINT, signal.SIG_IGN)

            success = self._start(segmentsToStart, invalidSegments)
        finally:
            # Reenable Ctrl-C
            signal.signal(signal.SIGINT, signal.default_int_handler)

        if dca.is_dca_appliance():
            logger.info("Initializing DCA settings")
            dca.DcaGpdbInitialized.local()
            logger.info("DCA settings initialized")

        return 0 if success else 1

    ######
    def cleanup(self):
        if self.pool:
            self.pool.haltWork()

            # ------------------------------- Internal Helper --------------------------------

    ######
    def _prepare(self):
        logger.info("Gathering information and validating the environment...")
        self._basic_setup()

        self._check_version()
        self._check_master_running()

    ######
    def _basic_setup(self):
        self.gphome = gp.get_gphome()
        if self.master_datadir is None:
            self.master_datadir = gp.get_masterdatadir()
        self.user = unix.getUserName()
        gp.check_permissions(self.user)
        self._read_postgresqlconf()

    ######
    def _read_postgresqlconf(self):
        logger.debug("Obtaining master's port from master data directory")
        pgconf_dict = pgconf.readfile(self.master_datadir + "/postgresql.conf")
        self.port = pgconf_dict.int('port')
        logger.debug("Read from postgresql.conf port=%s" % self.port)
        self.max_connections = pgconf_dict.int('max_connections')
        logger.debug("Read from postgresql.conf max_connections=%s" % self.max_connections)

    ######
    def _check_version(self):
        self.gpversion = gp.GpVersion.local('local GP software version check', self.gphome)
        logger.info("Greenplum Binary Version: '%s'" % self.gpversion)

        # It would be nice to work out the catalog version => greenplum version
        # calculation so that we can print out nicer error messages when
        # version doesn't match.
        bin_catversion = gp.GpCatVersion.local('local GP software catalag version check', self.gphome)
        logger.info("Greenplum Catalog Version: '%s'" % bin_catversion)

        dir_catversion = gp.GpCatVersionDirectory.local('local GP directory catalog version check', self.master_datadir)

        if bin_catversion != dir_catversion:
            logger.info("MASTER_DIRECTORY Catalog Version: '%s'" % dir_catversion)
            logger.info("Catalog Version of master directory incompatible with binaries")
            raise ExceptionNoStackTraceNeeded("Catalog Versions are incompatible")

    ######
    def _check_master_running(self):
        logger.debug("Check if Master is already running...")
        if os.path.exists(self.master_datadir + '/postmaster.pid'):
            logger.warning("postmaster.pid file exists on Master, checking if recovery startup required")
            self._recovery_startup()

        self._remove_postmaster_tmpfile(self.port)

    def shutdown_master_only(self):
        cmd = gp.GpStop("Shutting down master", masterOnly=True,
                        fast=True, quiet=logging_is_quiet(),
                        verbose=logging_is_verbose(),
                        datadir=self.master_datadir,
                        parallel=self.parallel,
                        logfileDirectory=self.logfileDirectory)
        cmd.run()
        logger.debug("results of forcing master shutdown: %s" % cmd)

    def fetch_tli(self, data_dir_path, remoteHost=None):
        if not remoteHost:
            controldata = PgControlData("fetching pg_controldata locally", data_dir_path)
        else:
            controldata = PgControlData("fetching pg_controldata remotely", data_dir_path, REMOTE, remoteHost)

        controldata.run(validateAfter=True)
        return int(controldata.get_value("Latest checkpoint's TimeLineID"))

    class StandbyUnreachable(Exception):
        pass

    def _standby_activated(self):
        logger.debug("Checking if standby has been activated...")

        if not self.gparray.standbyMaster:
            return False

        # fetch timelineids for both primary and standby (post-promote)
        primary_tli = self.fetch_tli(self.master_datadir)
        try:
            standby_tli = self.fetch_tli(self.gparray.standbyMaster.getSegmentDataDirectory(),
                                         self.gparray.standbyMaster.getSegmentHostName())
        except base.ExecutionError as err:
            raise GpStart.StandbyUnreachable(err)

        logger.debug("Primary TLI = %d" % primary_tli)
        logger.debug("Standby TLI = %d" % standby_tli)
        return primary_tli < standby_tli

    def check_standby(self):
        try:
            standby_activated = self._standby_activated()
        except GpStart.StandbyUnreachable as err:
            logger.warning("Standby host is unreachable, cannot determine whether the standby is currently acting as the master. Received error: %s" % err)
            logger.warning("Continue only if you are certain that the standby is not acting as the master.")
            if not self.interactive or not userinput.ask_yesno(None, "\nContinue with startup", 'N'):
                if not self.interactive:
                    logger.warning("Non interactive mode detected. Not starting the cluster. Start the cluster in interactive mode.")
                self.shutdown_master_only()
                raise UserAbortedException()

            # If the user wants to continue when the standby is unreachable,
            # set start_standby to False to prevent starting the unreachable
            # standy later in the startup process.
            self.start_standby = False
            return

        if not standby_activated:
            return

        # stop the master we've started up.
        cmd = gp.GpStop("Shutting down master", masterOnly=True,
                        fast=True, quiet=logging_is_quiet(),
                        verbose=logging_is_verbose(),
                        datadir=self.master_datadir,
                        parallel=self.parallel)
        cmd.run(validateAfter=True)
        logger.info("Master Stopped...")
        raise ExceptionNoStackTraceNeeded("Standby activated, this node no more can act as master.")

    ######
    def _recovery_startup(self):
        logger.info("Commencing recovery startup checks")

        lockfile = "/tmp/.s.PGSQL.%s" % self.port
        tmpfile_exists = os.path.exists(lockfile)

        netstat_port_active = unix.PgPortIsActive.local('check netstat for master port',
                                                        lockfile, self.port)
        if tmpfile_exists and netstat_port_active:
            logger.info("Have lock file %s and a process running on port %s" % (lockfile, self.port))
            raise ExceptionNoStackTraceNeeded("Master instance process running")
        elif tmpfile_exists and not netstat_port_active:
            logger.info("Have lock file %s but no process running on port %s" % (lockfile, self.port))
        elif not tmpfile_exists and netstat_port_active:
            logger.info("No lock file %s but a process running on port %s" % (lockfile, self.port))
            raise ExceptionNoStackTraceNeeded("Port %s is already in use" % self.port)
        elif not tmpfile_exists and not netstat_port_active:
            logger.info("No socket connection or lock file in /tmp found for port=%s" % self.port)

        logger.info("No Master instance process, entering recovery startup mode")

        if tmpfile_exists:
            logger.info("Clearing Master instance lock files")
            os.remove(lockfile)

        postmaster_pid_file = "%s/postmaster.pid" % self.master_datadir
        if os.path.exists(postmaster_pid_file):
            logger.info("Clearing Master instance pid file")
            os.remove("%s/postmaster.pid" % self.master_datadir)

        self._startMaster()

        logger.info("Commencing forced instance shutdown")

        gp.GpStop.local("forcing master shutdown", masterOnly=True,
                        verbose=logging_is_verbose,
                        quiet=self.quiet, fast=False,
                        force=True, datadir=self.master_datadir, parallel=self.parallel)

    ######
    def _remove_postmaster_tmpfile(self, port):
        lockfile = "/tmp/.s.PGSQL.%s" % port
        tmpfile_exists = os.path.exists(lockfile)

        if tmpfile_exists:
            logger.info("Clearing Master instance lock files")
            os.remove(lockfile)
        pass

    ######
    def _summarize_actions(self, segmentsToStart):
        logger.info("--------------------------")
        logger.info("Master instance parameters")
        logger.info("--------------------------")
        logger.info("Database                 = %s" % self.dburl.pgdb)
        logger.info("Master Port              = %s" % self.port)
        logger.info("Master directory         = %s" % self.master_datadir)
        logger.info("Timeout                  = %d seconds" % self.timeout)
        if self.gparray.standbyMaster:
            if self.start_standby:
                logger.info("Master standby start     = On")
            else:
                logger.info("Master standby start     = Off")
        else:
            logger.info("Master standby           = Off ")

        logger.info("--------------------------------------")
        logger.info("Segment instances that will be started")
        logger.info("--------------------------------------")

        isFileReplication = self.gparray.hasMirrors

        tabLog = TableLogger().setWarnWithArrows(True)
        header = ["Host", "Datadir", "Port"]
        if isFileReplication:
            header.append("Role")
        tabLog.info(header)
        for db in segmentsToStart:
            line = [db.getSegmentHostName(), db.getSegmentDataDirectory(), str(db.getSegmentPort())]
            if isFileReplication:
                line.append("Primary" if db.isSegmentPrimary(True) else "Mirror")
            tabLog.info(line)
        tabLog.outputTable()

    ######
    def _get_format_string(self):
        host_len = 0
        dir_len = 0
        port_len = 0
        for db in self.gparray.getSegDbList():
            if len(db.hostname) > host_len:
                host_len = len(db.hostname)
            if len(db.datadir) > dir_len:
                dir_len = len(db.datadir)
            if len(str(db.port)) > port_len:
                port_len = len(str(db.port))

        return "%-" + str(host_len) + "s  %-" + str(dir_len) + "s  %-" + str(port_len) + "s  %s"

    ######
    def _startMaster(self):
        logger.info("Starting Master instance in admin mode")

        cmd = gp.MasterStart('master in utility mode', self.master_datadir,
                             self.port, self.era,
                             wrapper=self.wrapper, wrapper_args=self.wrapper_args,
                             specialMode=self.specialMode, timeout=self.timeout, utilityMode=True
                             )
        cmd.run()

        if cmd.get_results().rc != 0:
            logger.fatal("Failed to start Master instance in admin mode")
            cmd.validate()

        logger.info("Obtaining Greenplum Master catalog information")

        logger.info("Obtaining Segment details from master...")
        self.dburl = dbconn.DbURL(port=self.port, dbname='template1')
        self.gparray = GpArray.initFromCatalog(self.dburl, utility=True)

        logger.info("Setting new master era")
        e = GpEraFile(self.master_datadir, logger=get_logger_if_verbose())
        e.new_era(self.gparray.master.hostname, self.port, time.strftime('%y%m%d%H%M%S'))
        self.era = e.era

    ######
    def _start(self, segmentsToStart, invalidSegments):
        """ starts all of the segments, the master and the standby master

            returns whether all segments that should be started were started successfully

            note that the parameters do not list master/standby, they only list data segments
        """
        workers = min(len(self.gparray.get_hostlist()), self.parallel)
        self.pool = base.WorkerPool(numWorkers=workers)

        if os.path.exists(self.master_datadir + "/gpexpand.status") and not self.restricted:
            raise ExceptionNoStackTraceNeeded(
                "Found a System Expansion Setup in progress. Please run 'gpexpand --rollback'")

        logger.debug("gparray does%s have mirrors" % ("" if self.gparray.hasMirrors else " not"))

        if self.gparray.hasMirrors:
            startMode = START_AS_PRIMARY_OR_MIRROR
        else:
            startMode = START_AS_MIRRORLESS

        # this will eventually start gpsegstart.py
        segmentStartOp = StartSegmentsOperation(self.pool, self.quiet, self.gpversion,
                                                self.gphome, self.master_datadir, self.master_checksum_value,
                                                self.timeout, self.specialMode, self.wrapper, self.wrapper_args, self.parallel,
                                                logfileDirectory=self.logfileDirectory)
        segmentStartResult = segmentStartOp.startSegments(self.gparray, segmentsToStart, startMode, self.era)

        # see if we have at least one segment per content
        willShutdownSegments = not self._verify_enough_segments(segmentStartResult, self.gparray)

        # process the result of segment startup
        self._print_segment_start(segmentStartResult, invalidSegments, willShutdownSegments)

        if willShutdownSegments:
            # go through and remove any segments that we did start so that we keep everything
            # shutdown cleanly
            self._shutdown_segments(segmentStartResult)
            raise ExceptionNoStackTraceNeeded("Do not have enough valid segments to start the array.")

        failedToStart = segmentStartResult.getFailedSegmentObjs()
        master_result, message = self._start_final_master()
        if not master_result:
            return False

        # start standby after master in dispatch mode comes up
        standby_was_started = self._start_standby()

        # report if we complete operations
        return self._check_final_result(
            not standby_was_started and self.attempt_standby_start,
            failedToStart, invalidSegments, message)

    ######
    def _prepare_segment_start(self):
        segs = self.gparray.get_valid_segdbs()

        logger.debug("gp_segment_configuration indicates following valid segments")
        for seg in segs:
            logger.debug("SegDB: %s" % seg)

        # segments marked down
        invalid_segs = self.gparray.get_invalid_segdbs()

        # now produce the list of segments to actually start
        dbIdsToNotStart = {}
        for segment in invalid_segs:
            dbIdsToNotStart[segment.getSegmentDbId()] = True

        for seg in invalid_segs:
            logger.warning("Skipping startup of segment marked down in configuration: on %s directory %s <<<<<" % \
                           (seg.getSegmentHostName(), seg.getSegmentDataDirectory()))
        logger.debug("dbIdsToNotStart has %d entries" % len(dbIdsToNotStart))

        # we intend to dedup invalid_segs
        assert len(dbIdsToNotStart) == len(invalid_segs)

        toStart = [seg for seg in segs if dbIdsToNotStart.get(seg.getSegmentDbId()) is None]
        return (toStart, invalid_segs)

    ####
    def _verify_enough_segments(self, startResult, gparray):
        successfulSegments = startResult.getSuccessfulSegments()

        allSegmentsByContent = GpArray.getSegmentsByContentId(gparray.getSegDbList())
        successfulSegmentsByDbId = GpArray.getSegmentsGroupedByValue(successfulSegments, Segment.getSegmentDbId)

        #
        # look at each content, see if there is a segment available (or one
        # which can be made available by failing over)
        #
        for primary in gparray.getSegDbList():

            if not primary.isSegmentPrimary(current_role=True):
                continue

            # find the mirror
            segs = allSegmentsByContent[primary.getSegmentContentId()]
            mirror = None
            if len(segs) > 1:
                mirror = [s for s in segs if s.isSegmentMirror(current_role=True)][0]

            if primary.getSegmentDbId() in successfulSegmentsByDbId:
                # good, we can continue!
                continue

            if mirror is not None \
                    and mirror.getSegmentDbId() in successfulSegmentsByDbId \
                    and primary.isSegmentModeSynchronized():
                #
                # we could fail over to that mirror, so it's okay to start up like this
                #
                continue

            logger.error("No segment started for content: %d." % primary.getSegmentContentId())
            logger.info("dumping success segments: %s" % [s.__str__() for s in startResult.getSuccessfulSegments()])
            return False
        return True

    ######
    def _shutdown_segments(self, segmentStartResult):

        logger.info("Commencing parallel segment instance shutdown, please wait...")

        #
        # Note that a future optimization would be to only stop the segments that we actually started.
        #    This requires knowing which ones are left in a partially up state
        #
        #
        # gather the list of those that we actually tried to start
        toStop = []
        toStop.extend(segmentStartResult.getSuccessfulSegments())
        toStop.extend([f.getSegment() for f in segmentStartResult.getFailedSegmentObjs()])

        segmentsByHost = GpArray.getSegmentsByHostName(toStop)

        #
        # stop them, stopping primaries before mirrors
        #
        for type in ["primary", "mirror"]:
            for hostName, segments in segmentsByHost.iteritems():

                if type == "primary":
                    segments = [seg for seg in segments if seg.isSegmentPrimary(current_role=True)]
                else:
                    segments = [seg for seg in segments if seg.isSegmentMirror(current_role=True)]

                if len(segments) > 0:
                    logger.debug("Dispatching command to shutdown %s segments on host: %s" % (type, hostName))
                    cmd = gp.GpSegStopCmd("remote segment stop on host '%s'" % hostName,
                                          self.gphome, self.gpversion,
                                          mode='immediate', dbs=segments,
                                          verbose=logging_is_verbose(),
                                          ctxt=base.REMOTE, remoteHost=hostName)
                    self.pool.addCommand(cmd)

            if self.quiet:
                self.pool.join()
            else:
                base.join_and_indicate_progress(self.pool)

    ######
    def _print_segment_start(self, segmentStartResult, invalidSegments, willShutdownSegments):
        """
        Print the results of segment startup

        segmentStartResult is the StartSegmentsResult from the actual start
        invalidSegments are those that we didn't even try to start because they are marked as down or should otherwise
           not be started
        """
        started = len(segmentStartResult.getSuccessfulSegments())
        failed = len(segmentStartResult.getFailedSegmentObjs())
        totalTriedToStart = started + failed

        if failed or logging_is_verbose():
            logger.info("----------------------------------------------------")
            for failure in segmentStartResult.getFailedSegmentObjs():
                segment = failure.getSegment()
                logger.info("DBID:%d  FAILED  host:'%s' datadir:'%s' with reason:'%s'" % (
                    segment.getSegmentDbId(), segment.getSegmentHostName(),
                    segment.getSegmentDataDirectory(), failure.getReason()))
            for segment in segmentStartResult.getSuccessfulSegments():
                logger.debug("DBID:%d  STARTED" % segment.getSegmentDbId())
            logger.info("----------------------------------------------------\n\n")

        tableLog = TableLogger().setWarnWithArrows(True)

        tableLog.addSeparator()
        tableLog.info(["Successful segment starts", "= %d" % started])

        tableLog.infoOrWarn(failed, ["Failed segment starts", "= %d" % failed])

        tableLog.infoOrWarn(len(invalidSegments) > 0,
                            ["Skipped segment starts (segments are marked down in configuration)",
                             "= %d" % len(invalidSegments)])
        tableLog.addSeparator()
        tableLog.outputTable()

        attentionFlag = "<<<<<<<<" if started != totalTriedToStart else ""
        if len(invalidSegments) > 0:
            skippedMsg = ", skipped %s other segments" % len(invalidSegments)
        else:
            skippedMsg = ""
        logger.info("Successfully started %d of %d segment instances%s %s" %
                    (started, totalTriedToStart, skippedMsg, attentionFlag))
        logger.info("----------------------------------------------------")

        if failed:
            logger.warning("Segment instance startup failures reported")
            logger.warning("Failed start %d of %d segment instances %s" % \
                           (failed, totalTriedToStart, attentionFlag))
            logger.warning("Review %s" % get_logfile())
            if not willShutdownSegments:
                logger.warning("For more details on segment startup failure(s)")
                logger.warning("Run  gpstate -s  to review current segment instance status")

            logger.info("----------------------------------------------------")

        if len(invalidSegments) > 0:
            logger.warning("****************************************************************************")
            logger.warning("There are %d segment(s) marked down in the database" % len(invalidSegments))
            logger.warning("To recover from this current state, review usage of the gprecoverseg")
            logger.warning("management utility which will recover failed segment instance databases.")
            logger.warning("****************************************************************************")

    ######
    def _check_final_result(self, standbyFailure,
                            failedToStart, invalidSegments, msg):
        if standbyFailure:
            logger.warning("Standby Master could not be started")
        if len(failedToStart) > 0:
            logger.warning("Number of segments which failed to start:  %d", len(failedToStart))

        if standbyFailure or len(failedToStart) > 0:
            return False

        if len(invalidSegments) > 0:
            logger.warning("Number of segments not attempted to start: %d", len(invalidSegments))

        if (len(failedToStart) > 0 or len(invalidSegments) > 0 or
                    msg is not None):
            logger.info("Check status of database with gpstate utility")
        else:
            logger.info("Database successfully started")

        return True

    ######
    def _start_final_master(self):
        """ Last item in the startup sequence is to start the master.

            After starting the master we connect to it.  This is done both as a check that the system is
            actually started but its also done because certain backend processes don't get kickstarted
            until the first connection.  The DTM is an example of this and it allows those initialization
            messages to end up in the gpstart log as opposed to the user's psql session.
            Returns a tuple of (result[bool], message[string])
        """
        restrict_txt = ""
        if self.restricted:
            restrict_txt = "in RESTRICTED mode"

        logger.info("Starting Master instance %s directory %s %s" % (
        self.gparray.master.hostname, self.master_datadir, restrict_txt))

        # attempt to start master
        gp.MasterStart.local("Starting Master instance",
                             self.master_datadir, self.port, self.era,
                             wrapper=self.wrapper, wrapper_args=self.wrapper_args,
                             specialMode=self.specialMode, restrictedMode=self.restricted, timeout=self.timeout,
                             max_connections=self.max_connections
                             )

        # check that master is running now
        if not pg.DbStatus.local('master instance', self.gparray.master):
            logger.warning("Command pg_ctl reports Master %s on port %d not running" % (
            self.gparray.master.datadir, self.gparray.master.port))
            logger.warning("Master could not be started")
            return False, None

        logger.info("Command pg_ctl reports Master %s instance active" % self.gparray.master.hostname)

        msg = None
        try:
            self.dburl.retries = 4
            self.dburl.timeout = 15
            masterconn = dbconn.connect(self.dburl)
            masterconn.close()

        except Exception, e:
            # MPP-14016.  While certain fts scenarios will trigger initial connection failures
            # we still need watch for PANIC events.
            msg = str(e)
            if 'PANIC' in msg:
                logger.critical(msg)
                return False
            logger.warning(msg)

        # set the era we used when starting the segments
        e = GpEraFile(self.master_datadir, logger=get_logger_if_verbose())
        e.set_era(self.era)

        return True, msg

    ######
    def _start_standby(self):
        """ used to start the standbymaster if necessary.

            returns if the standby master was started or not
        """
        if self.start_standby and self.gparray.standbyMaster is not None:
            try:
                self.attempt_standby_start = True
                host = self.gparray.standbyMaster.hostname
                datadir = self.gparray.standbyMaster.datadir
                port = self.gparray.standbyMaster.port
                return gp.start_standbymaster(host, datadir, port, era=self.era,
                                              wrapper=self.wrapper,
                                              wrapper_args=self.wrapper_args)
            except base.ExecutionError, e:
                logger.warning("Error occured while starting the standby master: %s" % e)
                return False
        else:
            logger.info("No standby master configured.  skipping...")
            return False

    # ----------------------- Command line option parser ----------------------
    @staticmethod
    def createParser():
        parser = OptParser(option_class=OptChecker,
                           description="Starts a GPDB Array.",
                           version='%prog version 6.17.0 build commit:9b887d27cef94c03ce3a3e63e4f6eefb9204631b')
        parser.setHelp([])

        addStandardLoggingAndHelpOptions(parser, includeNonInteractiveOption=True)

        addTo = OptionGroup(parser, 'Connection options')
        parser.add_option_group(addTo)
        addMasterDirectoryOptionForSingleClusterProgram(addTo)

        addTo = OptionGroup(parser, 'Database startup options: ')
        parser.add_option_group(addTo)
        addTo.add_option('-U', '--specialMode', type='choice', choices=['maintenance'],
                         metavar='maintenance', action='store', default=None,
                         help=SUPPRESS_HELP)
        addTo.add_option('-m', '--master_only', action='store_true',
                         help='start master instance only in maintenance mode')
        addTo.add_option('-y', '--no_standby', dest="start_standby", action='store_false', default=True,
                         help='do not start master standby server')
        addTo.add_option('-B', '--parallel', type="int", default=gp.DEFAULT_GPSTART_NUM_WORKERS, metavar="<parallel_processes>",
                         help='number of segment hosts to run in parallel. Default is %d' % gp.DEFAULT_GPSTART_NUM_WORKERS)
        addTo.add_option('-R', '--restricted', action='store_true',
                         help='start in restricted mode. Only users with superuser privilege are allowed to connect.')
        addTo.add_option('-t', '--timeout', dest='timeout', default=SEGMENT_TIMEOUT_DEFAULT, type='int',
                         help='time to wait for segment startup (in seconds)')
        addTo.add_option('', '--wrapper', dest="wrapper", default=None, type='string')
        addTo.add_option('', '--wrapper-args', dest="wrapper_args", default=None, type='string')
        addTo.add_option('-S', '--skip_standby_check', dest="skip_standby_check", action='store_true', default=False)
        addTo.add_option('--skip-heap-checksum-validation', dest='skip_heap_checksum_validation',
                         action='store_true', default=False, help='Skip the validation of data_checksums GUC. '
                                                                  'Note: Starting up the cluster without this '
                                                                  'validation could lead to data loss.')

        parser.set_defaults(verbose=False, filters=[], slice=(None, None))

        return parser

    @staticmethod
    def createProgram(options, args):
        logfileDirectory = options.ensure_value("logfileDirectory", False)
        proccount = os.environ.get('GP_MGMT_PROCESS_COUNT')
        if options.parallel == 64 and proccount is not None:
            options.parallel = int(proccount)

        # -n sanity check
        if options.parallel > 128 or options.parallel < 1:
            raise ProgramArgumentValidationException("Invalid value for parallel degree: %s" % options.parallel)

        if args:
            raise ProgramArgumentValidationException(
                "Argument %s is invalid.  Is an option missing a parameter?" % args[-1])

        return GpStart(options.specialMode, options.restricted,
                       options.start_standby,
                       master_datadir=options.masterDataDirectory,
                       parallel=options.parallel,
                       quiet=options.quiet,
                       masteronly=options.master_only,
                       interactive=options.interactive,
                       timeout=options.timeout,
                       wrapper=options.wrapper,
                       wrapper_args=options.wrapper_args,
                       skip_standby_check=options.skip_standby_check,
                       logfileDirectory=logfileDirectory,
                       skip_heap_checksum_validation=options.skip_heap_checksum_validation
                       )


if __name__ == '__main__':
    simple_main(GpStart.createParser, GpStart.createProgram)
