#!/usr/bin/env python
'''
Usage: gpcheckcat [<option>] [dbname]

    -?
    -B parallel: number of worker threads
    -g dir                   : generate SQL to rectify catalog corruption, put it in dir
    -p port                  : DB port number
    -P passwd                : DB password
    -U uname                 : DB User Name
    -v                       : verbose
    -A                       : all databases
    -S option                : shared table options (none, only)
    -O                       : Online
    -l                       : list all tests
    -R test | 'test1, test2' : run this particular test(s) (quoted, comma seperated list for multiple tests)
    -s test | 'test1, test2' : skip this particular test(s) (quoted, comma seperated list for multiple tests)
    -C catname               : run cross consistency, FK and ACL tests for this catalog table

    Test subset options are mutually exclusive, use only one of '-R', '-s', or '-C'.

'''
import datetime
import getopt
import logging
import re
import sys
import time

try:
    from gppylib import gplog
    from gppylib.db import dbconn
    from gppylib.gpcatalog import *
    from gppylib.commands.unix import *
    from gppylib.commands.gp import conflict_with_gpexpand
    from gppylib.system.info import *
    from pygresql.pgdb import DatabaseError
    from pygresql import pg
    from gpcheckcat_modules.unique_index_violation_check import UniqueIndexViolationCheck
    from gpcheckcat_modules.leaked_schema_dropper import LeakedSchemaDropper
    from gpcheckcat_modules.repair import Repair
    from gpcheckcat_modules.foreign_key_check import ForeignKeyCheck
    from gpcheckcat_modules.orphaned_toast_tables_check import OrphanedToastTablesCheck, OrphanToastTableIssue, OrphanedTable


except ImportError, e:
    sys.exit('Error: unable to import module: ' + str(e))

# cache OID -> object name cache
oidmap = {}

# -------------------------------------------------------------------------------
EXECNAME = os.path.split(__file__)[-1]
gplog.setup_tool_logging(EXECNAME, getLocalHostname(), getUserName())
gplog.very_quiet_stdout_logging()
logger = gplog.get_default_logger()

sysinfo = SystemInfo(logger)

parallelism = get_max_available_thread_count()
#-------------------------------------------------------------------------------

################
def parse_int(val):
    try:
        val = int(val)
    except ValueError:
        val = 0
    return val


###############################
def quote_value(name):
    """Add surrounding single quote, double interior single quote."""
    assert isinstance(name, str)
    return "'" + "''".join(name.split("'")) + "'"


SUCCESS = 0
ERROR_REMOVE = 1
ERROR_RESYNC = 2
ERROR_NOREPAIR = 3

FIRST_NORMAL_OID = 16384
PG_CATALOG_OID = 11

def setError(level):
    '''
    Increases the error level to the specified level, if specified level is
    lower than the existing level no change is made.

    error level 0 => success
    error level 1 => error, with repair script removes objects
    error level 2 => error, with repair script that resynchronizes objects
    error level 3 => error, no repair script
    '''
    GV.retcode = max(level, GV.retcode)


###############################
class Global():
    def __init__(self):
        self.timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")

        self.retcode = SUCCESS
        self.opt = {}
        self.opt['-h'] = None
        self.opt['-p'] = None
        self.opt['-P'] = None
        self.opt['-U'] = None
        self.opt['-v'] = False
        self.opt['-V'] = False
        self.opt['-t'] = False

        self.opt['-g'] = 'gpcheckcat.repair.' + self.timestamp
        self.opt['-B'] = parallelism
        self.opt['-T'] = None

        self.opt['-A'] = False
        self.opt['-S'] = None
        self.opt['-O'] = False

        self.opt['-R'] = None
        self.opt['-s'] = None
        self.opt['-C'] = None
        self.opt['-l'] = False

        self.opt['-E'] = False

        self.master_dbid = None
        self.cfg = None
        self.dbname = None
        self.firstdb = None
        self.alldb = []
        self.db = {}
        self.tmpdir = None

        self.reset_stmt_queues()

        self.home = os.environ.get('HOME')
        if self.home is None:
            usage('Error: $HOME must be set')
        self.user = os.environ.get('USER') or os.environ.get('LOGNAME')
        if self.user is None:
            usage('Error: either $USER or $LOGNAME must be set')

        self.catalog = None
        self.max_content = 0
        self.report_cfg = {}

    def reset_stmt_queues(self):

        # dictionary of SQL statements. Key is dbid
        self.Remove = {}

        # dictionary of SQL statements. Key is dbid
        self.AdjustConname = {}

        # dictionary of SQL statements. Key is dbid
        self.DemoteConstraint = {}

        # array of SQL statements. Key is dbid
        self.ReSync = {}

        # constraints which are actually unenforcable
        self.Constraints = []

        # ownership fixups
        self.Owners = []

        # fix distribution policies
        self.Policies = []

        # Indexes to rebuild
        self.Reindex = []

        self.missingEntryStatus = None
        self.inconsistentEntryStatus = None
        self.foreignKeyStatus = None
        self.aclStatus = None

        # the following variables used for reporting purposes
        self.elapsedTime = 0
        self.totalCheckRun = 0
        self.checkStatus = True
        self.failedChecks = []
        self.missing_attr_tables = []
        self.extra_attr_tables = []


GV = Global()


###############################
def usage(exitarg=None):
    print __doc__
    sys.exit(exitarg)


###############################

def getversion():
    db = connect()
    curs = db.query('''
    select regexp_replace(version(),
       E'.*PostgreSQL [^ ]+ .Greenplum Database ([1-9]+.[0-9]+|main).*',
       E'\\\\1') as ver;''')

    row = curs.getresult()[0]
    version = row[0]

    logger.debug('got version %s' % version)
    return version


###############################
def getalldbs():
    """
    get all connectable databases
    """
    db = connect()
    curs = db.query('''
    select datname from pg_database where datallowconn order by datname ''')
    row = curs.getresult()
    return row


###############################
def parseCommandLine():
    try:
        # A colon following the flag indicates an argument is expected
        (options, args) = getopt.getopt(sys.argv[1:], '?p:P:U:B:vg:t:AOS:R:s:C:lE')
    except Exception as e:
        usage('Error: ' + str(e))

    for (switch, val) in options:
        if switch == '-?':
            usage(0)
        elif switch[1] in 'pBPUgSRCs':
            GV.opt[switch] = val
        elif switch[1] in 'vtAOlE':
            GV.opt[switch] = True

    def setdef(x, v):
        if not GV.opt[x]:
            t = os.environ.get(v)
            GV.opt[x] = t
            if t:
                logger.debug('%s not specified; default to %s (from %s)' %
                             (x, t, v))

    if GV.opt['-l']: return

    if GV.opt['-v']:
        gplog.enable_verbose_logging()

    setdef('-P', 'PGPASSWORD')
    setdef('-U', 'PGUSER')
    setdef('-U', 'USER')
    setdef('-p', 'PGPORT')
    if not GV.opt['-p']:
        usage('Please specify -p port')
    GV.opt['-p'] = parse_int(GV.opt['-p'])

    GV.opt['-h'] = 'localhost'
    logger.debug('Set default hostname to %s' % GV.opt['-h'])

    if GV.opt['-R']:
        logger.debug('Run this test: %s' % GV.opt['-R'])

    if GV.opt['-s']:
        logger.debug('Skip these tests: %s' % GV.opt['-s'])

    if GV.opt['-C']:
        GV.opt['-C'] = GV.opt['-C'].lower()
        logger.debug('Run cross consistency test for table: %s' % GV.opt['-C'])

    if (len(args) != 0 and len(args) != 1):
        usage('Too many arguments')

    if len(args) == 0:
        GV.dbname = os.environ.get('PGDATABASE')
        if GV.dbname is None:
            GV.dbname = 'template1'
        logger.debug('database is %s' % GV.dbname)
    else:
        GV.dbname = args[0]

    GV.firstdb = GV.dbname
    GV.alldb.append(GV.firstdb)

    # build list of all connectable databases
    if GV.opt['-A']:
        alldb = getalldbs()
        GV.alldb.pop()
        for adbname in alldb:
            GV.alldb.append(adbname[0])

    if GV.opt['-S']:
        if not re.match("none|only", GV.opt['-S'], re.I):
            usage('Error: invalid value \'%s\' for shared table option. Legal values are (none, only)' % GV.opt['-S'])
        GV.opt['-S'] = GV.opt['-S'].lower()

    logger.debug('master at host %s, port %s, user %s, database %s' %
                 (GV.opt['-h'], GV.opt['-p'], GV.opt['-U'], GV.dbname))

    try:
        GV.opt['-B'] = int(GV.opt['-B'])
    except Exception, e:
        usage('Error: ' + str(e))

    if GV.opt['-B'] < 1:
        usage('Error: parallelism must be 1 or greater')

    logger.debug('degree of parallelism: %s' % GV.opt['-B'])


#############
def connect(user=None, password=None, host=None, port=None,
            database=None, utilityMode=False):
    '''Connect to DB using parameters in GV'''
    # make search path safe
    options = '-c search_path='
    if utilityMode:
        options += ' -c gp_session_role=utility'

    if not user: user = GV.opt['-U']
    if not password: password = GV.opt['-P']
    if not host: host = GV.opt['-h']
    if not port: port = GV.opt['-p']
    if not database: database = GV.dbname

    try:
        logger.debug('connecting to %s:%s %s' % (host, port, database))
        db = pg.connect(host=host, port=port, user=user,
                        passwd=password, dbname=database, opt=options)

    except pg.InternalError, ex:
        logger.fatal('could not connect to %s: "%s"' %
                     (database, str(ex).strip()))
        exit(1)

    logger.debug('connected with %s:%s %s' % (host, port, database))
    return db


#############
def connect2(cfgrec, user=None, password=None, database=None, utilityMode=True):
    host = cfgrec['address']
    port = cfgrec['port']
    datadir = cfgrec['datadir']
    logger.debug('connect %s:%s:%s' % (host, port, datadir))
    if not database: database = GV.dbname
    if cfgrec['content'] == -1:
        utilityMode = False

    key = "%s.%s.%s.%s.%s.%s.%s" % (host, port, datadir, user, password, database,
                                    str(utilityMode))
    conns = GV.db.get(key)
    if conns:
        return conns[0]

    conn = connect(host=host, port=port, user=user, password=password,
                   database=database, utilityMode=utilityMode)
    if conn:
        GV.db[key] = [conn, cfgrec]

    return conn


class execThread(Thread):
    def __init__(self, cfg, db, qry):
        self.cfg = cfg
        self.db = db
        self.qry = qry
        self.curs = None
        self.error = None
        Thread.__init__(self)

    def run(self):
        try:
            self.curs = self.db.query(self.qry)
        except BaseException, e:
            self.error = e


def processThread(threads):
    batch = []
    for th in threads:
        logger.debug('waiting on thread %s' % th.getName())
        th.join()
        if th.error:
            setError(ERROR_NOREPAIR)
            myprint("%s:%d:%s : %s" %
                    (th.cfg['hostname'],
                     th.cfg['port'],
                     th.cfg['datadir'],
                     str(th.error)))
        else:
            batch.append([th.cfg, th.curs])
    return batch


#############
def connect2run(qry, col=None):
    logger.debug('%s' % qry)

    batch = []
    threads = []
    i = 1

    # parallelise queries
    for dbid in GV.cfg:
        c = GV.cfg[dbid]
        db = connect2(c)

        thread = execThread(c, db, qry)
        thread.start()
        logger.debug('launching query thread %s for dbid %i' %
                     (thread.getName(), dbid))
        threads.append(thread)

        # we don't want too much going on at once
        if (i % GV.opt['-B']) == 0:
            # process this batch of threads
            batch.extend(processThread(threads))
            threads = []

        i += 1

    # process the rest of threads
    batch.extend(processThread(threads))

    err = []
    for [cfg, curs] in batch:
        if col is None:
            col = curs.listfields()
        for row in curs.dictresult():
            err.append([cfg, col, row])

    return err


def formatErr(c, col, row):
    s = ('%s:%s:%s, content %s, dbid %s' %
         (c['hostname'], c['port'], c['datadir'],
          c['content'], c['dbid']))
    for i in col:
        s = '%s, %s %s' % (s, i, row[i])
    return s


#############
def getGPConfiguration():
    cfg = {}
    db = connect()
    # note that in 4.0, sql commands cannot be run against the segment mirrors directly
    # so we filter out non-primary segment databases in the query
    qry = '''
          SELECT content, preferred_role = 'p' as definedprimary,
                 dbid, role = 'p' as isprimary, hostname, address, port,
                 datadir
            FROM gp_segment_configuration
           WHERE (role = 'p' or content < 0 )
          '''
    curs = db.query(qry)
    for row in curs.dictresult():
        if row['content'] == -1 and row['isprimary'] != 't':
            continue  # skip standby master
        cfg[row['dbid']] = row
    db.close()
    return cfg

def checkDistribPolicy():
    logger.info('-----------------------------------')
    logger.info('Checking constraints on randomly distributed tables')
    qry = '''
    select n.nspname, rel.relname, pk.conname as constraint
    from   pg_constraint pk
      join pg_class rel on (pk.conrelid = rel.oid)
      join pg_namespace n on (rel.relnamespace = n.oid)
      join gp_distribution_policy d on (rel.oid = d.localoid)
    where pk.contype in('p', 'u') and d.policytype = 'p' and d.distkey = ''
    '''

    db = connect2(GV.cfg[GV.master_dbid])
    try:
        curs = db.query(qry)
        err = []
        for row in curs.dictresult():
            err.append([GV.cfg[GV.master_dbid], ('nspname', 'relname', 'constraint'), row])

        if not err:
            logger.info('[OK] randomly distributed tables')
        else:
            GV.checkStatus = False
            setError(ERROR_REMOVE)
            logger.info('[FAIL] randomly distributed tables')
            logger.error('pg_constraint has %d issue(s)' % len(err))
            logger.error(qry)
            for e in err:
                logger.error(formatErr(e[0], e[1], e[2]))
            for e in err:
                cons = e[2]
                removeIndexConstraint(cons['nspname'], cons['relname'],
                                      cons['constraint'])
    except Exception, e:
        setError(ERROR_NOREPAIR)
        myprint('[ERROR] executing test: checkDistribPolicy')
        myprint('  Execution error: ' + str(e))

    logger.info('-----------------------------------')
    logger.info('Checking that unique constraints are only on distribution columns')

    # final part of the WHERE clause here is a little tricky: we want to make
    # sure that the set of distribution columns is a left subset of the
    # constraint.
    qry = '''
    select  n.nspname, rel.relname, pk.conname as constraint
    from    pg_constraint pk
      join  pg_class rel on (pk.conrelid = rel.oid)
      join  pg_namespace n on (rel.relnamespace = n.oid)
      join  gp_distribution_policy d on (rel.oid = d.localoid)
    where pk.contype in ('p', 'u') and d.policytype = 'p'
      and not d.distkey::int2[] operator(pg_catalog.<@) pk.conkey
    '''
    try:
        curs = db.query(qry)

        err = []
        for row in curs.dictresult():
            err.append([GV.cfg[GV.master_dbid], ('nspname', 'relname', 'constraint'), row])

        if not err:
            logger.info('[OK] unique constraints')
        else:
            GV.checkStatus = False
            setError(ERROR_REMOVE)
            logger.info('[FAIL] unique constraints')
            logger.error('pg_constraint has %d issue(s)' % len(err))
            logger.error(qry)
            for e in err: logger.error(formatErr(e[0], e[1], e[2]))
            for e in err:
                cons = e[2]
                removeIndexConstraint(cons['nspname'], cons['relname'],
                                      cons['constraint'])
    except Exception, e:
        setError(ERROR_NOREPAIR)
        myprint('[ERROR] executing test: checkDistribPolicy')
        myprint('  Execution error: ' + str(e))

    checkConstraintsRepair()

#############
def checkPartitionIntegrity():
    #
    # MPP-8558: look for data in branch (not leaf) partitions
    #
    logger.info('-----------------------------------')
    logger.info('Checking pg_partition ...')
    err = []
    db = connect()
    qry = '''
    select distinct
           quote_ident(n.nspname) || '.' || quote_ident(c.relname) as parname
    from   pg_partition_rule r1
      join pg_partition_rule r2 on (r1.oid = r2.parparentrule)
      join pg_class c on (r1.parchildrelid = c.oid)
      join pg_namespace n on (c.relnamespace = n.oid)
    where  r1.parchildrelid <> 0 and r2.parchildrelid <> 0
    '''
    try:
        curs = db.query(qry)
        for row in curs.dictresult():
            qy2 = ('select count(*) as cc from (select 1 from only %s limit(2)) q'
                   % (row['parname']))

            curs2 = db.query(qy2)
            for row2 in curs2.dictresult():
                if row2['cc'] == 0:
                    continue
                err.append([row['parname']])

        if not err:
            logger.info('[OK] pg_partition branch integrity')
        else:
            GV.checkStatus = False
            setError(ERROR_NOREPAIR)
            logger.info('[FAIL] pg_partition branch integrity')
            logger.error('pg_partition has %d issue(s)' % len(err))
            logger.error('Discovered row data in branch partitions')
            for e in err:
                logger.error('  partition table name:  %s' % (e[0]))

    except Exception, e:
        setError(ERROR_NOREPAIR)
        myprint('[ERROR] executing test: checkPartitionIntegrity')
        myprint('  Execution error: ' + str(e))

    # MPP-9283: look for partitions create "with oids".
    qry = '''
    select n1.nspname, pc.relname, pc.oid
    from   pg_class pc, pg_partition_rule pr, pg_namespace n1
    where  pr.parchildrelid = pc.oid and pc.relnamespace = n1.oid and pc.relhasoids
    union
    select n1.nspname, pc.relname, pc.oid
    from   pg_class pc, pg_partition pp, pg_namespace n1
    where  pp.parrelid = pc.oid and pc.relnamespace = n1.oid and pc.relhasoids
    '''
    try:
        curs = db.query(qry)
        err = []
        for row in curs.dictresult():
            err.append([GV.cfg[GV.master_dbid], ('nspname', 'relname', 'oid'), row])

        if not err:
            logger.info('[OK] partition with oids check')
        else:
            GV.checkStatus = False
            setError(ERROR_NOREPAIR)
            logger.info('[FAIL] partition with oids check')
            logger.error('partition with oids check found %d issue(s)' % len(err))
            logger.error(qry)
            for e in err[0:100]:
                row = e[2]
                logger.error(" table %s.%s oid %s" %
                             (row['nspname'], row['relname'], row['oid']))
            if len(err) > 100:
                logger.error("...")
    except Exception, e:
        setError(ERROR_NOREPAIR)
        myprint('[ERROR] executing test: checkPartitionIntegrity')
        myprint('  Execution error: ' + str(e))

        # TODO: add repair script

    # MPP-11120: check for child partitions with different
    # distribution policies The complexity of this query is due to
    # having to account for the possibility of partitions with "holes"
    # which would make similar distribution policies appear different
    # in gp_distribution_policy.  This is handled by unnesting both
    # distribution
    qry = '''
    select
      parrelid,
      parchildrelid,
      d1.distkey as parpolicykey,
      d1.distclass as parpolicyclass,
      d2.distkey as parchildpolicykey,
      d2.distclass as parchildpolicyclass
    from
      (
        select distinct
          coalesce(p1.parrelid, p2.parrelid)::regclass as parrelid,
          coalesce(p1.parchildrelid, p2.parchildrelid)::regclass as parchildrelid
        from
          (
            select parrelid, parchildrelid, g1.attname, g1.distclass, g1.index
            from   pg_partition_rule pr
                   join pg_partition p on (pr.paroid = p.oid)
                   join (
                     select localoid, attname, distclass, index
                     from (
                       select
                         localoid,
                         unnest(distkey) as offset,
                         unnest(distclass) as distclass,
                         generate_series(1, array_length(distkey, 1)) as index
                       from
                         gp_distribution_policy
                      ) d
                      join pg_attribute a on (a.attrelid = d.localoid and a.attnum = d.offset)
                   ) g1 on (parrelid = g1.localoid)
            where  parchildrelid != 0
          ) p1
          full outer join
          (
            select parrelid, parchildrelid, g2.attname, g2.distclass, g2.index
            from   pg_partition_rule pr
                   join pg_partition p on (pr.paroid = p.oid)
                   join (
                     select localoid, attname, distclass, index
                     from (
                       select
                         localoid,
                         unnest(distkey) as offset,
                         unnest(distclass) as distclass,
                         generate_series(1, array_length(distkey, 1)) as index
                       from
                         gp_distribution_policy
                      ) d
                      join pg_attribute a on (a.attrelid = d.localoid and a.attnum = d.offset)
                   ) g2 on (parchildrelid = g2.localoid)
            where  parchildrelid != 0
          ) p2
          on (p1.parrelid = p2.parrelid and
              p1.parchildrelid = p2.parchildrelid and
              p1.index = p2.index)
        where p1.attname is distinct from p2.attname or p1.distclass is distinct from p2.distclass
      ) p
    join gp_distribution_policy d1 on (d1.localoid = p.parrelid)
    join gp_distribution_policy d2 on (d2.localoid = p.parchildrelid)
    where d2.distkey <> ''
    '''
    try:
        curs = db.query(qry)
        cols = ('parrelid', 'parchildrelid', 'parpolicykey', 'parpolicyclass', 'parchildpolicykey', 'parchildpolicyclass')
        col_names = {
            'parrelid': 'table',
            'parchildrelid': 'affected child',
            'parpolicykey': 'table distribution key',
            'parpolicyclass': 'table distribution class',
            'parchildpolicykey': 'child distribution key',
            'parchildpolicyclass': 'child distribution class',
        }

        err = []
        for row in curs.dictresult():
            err.append([GV.cfg[GV.master_dbid], cols, row])

        if not err:
            logger.info('[OK] partition distribution policy check')
        else:
            GV.checkStatus = False
            setError(ERROR_REMOVE)
            logger.info('[FAIL] partition distribution policy check')
            logger.error('partition distribution policy check found %d issue(s)' % len(err))
            if len(err) > 100:
                logger.error(qry)

            myprint(
                '[ERROR]: child partition(s) are distributed differently from '
                'the root partition, and must be manually redistributed, for '
                'some tables. Check the gpcheckcat log for details.'
            )
            logger.error('The following tables must be manually redistributed:')

            count = 0
            for e in err:
                cfg = e[0]
                col = e[1]
                row = e[2]

                # TODO: generate a repair script for this row. This is
                # difficult, since we can't redistribute child partitions
                # directly.

                # report at most 100 rows, for brevity
                if count == 100:
                    logger.error("...")
                    count += 1
                if count > 100:
                    continue

                if count == 0:
                    logger.error("--------")
                    logger.error("  " + " | ".join(map(col_names.get, col)))
                    logger.error("  " + "-+-".join(['-' * len(col_names[x]) for x in col]))

                logger.error("  " + " | ".join([str(row[x]) for x in col]))
                count += 1

            logger.error(
                'Execute an ALTER TABLE ... SET DISTRIBUTED BY statement, with '
                'the desired distribution key, on the partition root for each '
                'affected table.'
            )

    except Exception, e:
        setError(ERROR_NOREPAIR)
        myprint('[ERROR] executing test: checkPartitionIntegrity')
        myprint('  Execution error: ' + str(e))

    db.close()

    checkPoliciesRepair()

#############
# A dictionary of SQL statements for checkPartitionRegularity()

partitionRegularityChecks = {
    # Query: irreg_user_constraint
    #
    #     A row represents an irregular user-defined constraint.
    #
    #     This view just filters ptable_user_con_info so that it returns no rows
    #     if no irregular user-defined constraints exist.
    #
    #     An irregular user-constraint is a constraint whose definition (DDL expression
    #     format) appears on the root table of a partitioned table, and doesn't appear
    #     on every part table of the partitioned table.  It is possible to construct
    #     these using sequences of DDL operations on release prior to Rio.  In Rio,
    #     it is possible to construct them with FOREIGN KEY constraints, but not with
    #     other constraint types.
    #
    # tableid:  pg_class.oid of the partitioned table
    # conname:  name of the constraint on the partitioned table
    # contype:  type of constraint (same on part and table)
    # condef:  specification of constraint (same on part and table)
    # numexpected:  number of constraint occurrences expected (incl table)
    # numactual:  number of constraint occurrences observed (incl table)
    'irreg_user_constraint':
        """select
            tableid,
            tableconname,
            contype,
            condef,
            numexpected,
            numactual
        from
            (
                select
                    c.tableid,
                    c.tableconname,
                    c.contype,
                    c.condef,
                    i.tableparts + 1,
                    count(c.partid),
                    count(distinct c.partconname)
                from
                    (
                        select
                            u.tableid,
                            u.conname,
                            u.contype,
                            u.condef,
                            p.partid,
                            p.conid,
                            coalesce(p.conname)
                        from
                            (
                                select
                                    p.tableid,
                                    c.conname,
                                    c.contype,
                                    pg_get_constraintdef(c.oid) as condef
                                from
                                    (
                                        select
                                            parrelid::regclass,
                                            max(parlevel)+1
                                        from
                                            pg_partition
                                        group by parrelid
                                    ) as p(tableid, tabledepth) ,
                                    pg_constraint c
                                where
                                    p.tableid = c.conrelid
                            ) as u(tableid, conname, contype, condef)
                                join
                            (
                                select
                                    x.tableid::regclass as tableid,
                                    c.conrelid::regclass as partid,
                                    c.oid as conid,
                                    c.conname,
                                    c.contype,
                                    pg_get_constraintdef(c.oid) as condef
                                from
                                    pg_constraint c,
                                    (
                                        select
                                            tableid,
                                            tabledepth,
                                            tableid::regclass partid,
                                            0 as partdepth,
                                            0 as partordinal,
                                            'r'::char as partstatus
                                        from
                                            (
                                                select
                                                    parrelid::regclass,
                                                    max(parlevel)+1
                                                from
                                                    pg_partition
                                                group by parrelid
                                            ) as ptable(tableid, tabledepth)
                                        union all
                                        select
                                            parrelid::regclass as tableid,
                                            t.tabledepth as tabledepth,
                                            r.parchildrelid::regclass partid,
                                            p.parlevel + 1 as partdepth,
                                            r.parruleord as partordinal,
                                            case
                                                when t.tabledepth = p.parlevel + 1 then 'l'::char
                                                else 'i'::char
                                            end as partstatus
                                        from
                                            pg_partition p,
                                            pg_partition_rule r,
                                             (
                                                select
                                                    parrelid::regclass,
                                                    max(parlevel)+1
                                                from
                                                    pg_partition
                                                group by parrelid
                                            ) as t(tableid, tabledepth)
                                        where
                                            p.oid = r.paroid
                                            and not p.paristemplate
                                            and p.parrelid = t.tableid
                                    ) as x(tableid, tabledepth, partid, partdepth, partordinal, partstatus)
                                where
                                    x.partid = c.conrelid
                            ) as p(tableid, partid, conid, conname, contype, condef)
                                on (
                                    u.tableid = p.tableid and
                                    u.contype = p.contype and
                                    u.condef = p.condef
                                    )
                    ) as c(tableid, tableconname, contype, condef, partid, partconid, partconname) ,
                    (
                        select
                            t.tableid,
                            t.tabledepth,
                            n.nparts as tableparts,
                            r.nspname,
                            r.relname
                        from
                            (
                                select
                                    parrelid::regclass,
                                    max(parlevel)+1
                                from
                                    pg_partition
                                group by parrelid
                            ) as t(tableid, tabledepth) ,
                            (
                                select
                                    c.oid::regclass as relid,
                                    n.nspname,
                                    c.relname,
                                    c.relkind
                                from
                                    pg_class c,
                                    pg_namespace n
                                where
                                    c.relnamespace = n.oid
                            ) as r(relid, nspname, relname, relkind) ,
                            (
                                select tableid, count(*)
                                from (
                                        select
                                            t.tableid::regclass,
                                            p.parchildrelid::regclass as partid
                                        from
                                            (
                                                select pg_partition.parrelid, pg_partition.oid
                                                from pg_partition
                                                where not pg_partition.paristemplate
                                            ) t(tableid, partid),
                                            pg_partition_rule p
                                        where p.paroid = t.partid
                                    ) as contains_part(tableid, partid)
                                group by tableid
                            ) n(tableid, nparts)
                        where
                            t.tableid = r.relid and
                            t.tableid = n.tableid
                    ) as i(tableid, tabledepth, tableparts, nspname, relname)
                where
                    c.tableid = i.tableid
                group by
                    c.tableid,
                    c.tableconname,
                    c.contype,
                    c.condef,
                    i.tableparts
            ) as ptable_user_con_info(tableid, tableconname, contype, condef, numexpected, numactual, numnames)
        where
            contype != 'f' and -- no FK consistency in Rio
            numexpected != numactual""",

    # Query: irreg_sys_constraint
    #
    #     A row represents an irregular system-defined constraint.
    #
    #     An irregular system constraint is a constraint whose definition (DDL
    #     expression format) appears on some part tables and that is not a system-
    #     defined partition constraint.
    #
    #     This view doesn't actually test this.  Instead, it checks for an
    #     expected pattern of system-defined constraints.
    #     1. The constraints checked are those that don't appear on the root.
    #     2. Each part has as many constraints as its depth in the partition
    #        less any default parts on its path.
    #     Violators are listed as results.
    #
    # tableid:  pg_class.oid of the partitioned table
    # partid:  pg_class.oid of the (constrained) part table
    # expected:  number of constraint occurrences expected
    # actual:  number of constraint occurrences observed
    'irreg_sys_constraint':
        """select
            coalesce(x.partid, a.partid) as partid,
            x.tableid,
            x.expected,
            coalesce(a.actual, 0) as actual
        from
            (
            {expected}
            ) x
              full join
            (
            {actual}
            ) a
              on (x.partid = a.partid)
        where
            x.expected != a.actual or
            x.expected is null or
            x.expected != coalesce(a.actual,0);""",

    # Query based on view: unenforced_constraint_info
    #
    #     a row represents an unenforced unique constraint to be fixed.
    #
    # tableid
    # partid
    # indexid
    # partoid
    # indexoid
    # contype
    # conname
    # refobjid

    'unenforced_constraint_info':
        """select
            c.tableid,
            c.partid,
            b.indexid,
            c.partid::int as partoid,
            b.indexid::int as indexoid,
            b.contype,
            b.conname,
            b.refobjid
        from
            (
                select
                    k.tableid,
                    k.distkey,
                    k.partkey,
                    c.conid,
                    c.conname,
                    c.contype,
                    i.indrelid::regclass,
                    i.indkey::smallint[]
                from
                    pg_index i,
                    (
                        select
                            k.tableid,
                            d.attrcnt,
                            d.distkey,
                            sum(k.partkeylen),
                            array_agg(k.partkey[g.g])
                        from
                            (
                                select
                                    p.parrelid::regclass,
                                    t.tabledepth,
                                    p.parlevel,
                                    p.parnatts,
                                    p.paratts
                                from
                                    pg_partition p,
                                    (
                                        select
                                            parrelid::regclass,
                                            max(parlevel)+1
                                        from
                                            pg_partition
                                        group by parrelid
                                    ) as t(tableid, tabledepth)
                                where
                                    p.parrelid = t.tableid and
                                    p.paristemplate is false
                            ) as k(tableid, tabledepth, partdepth, partkeylen, partkey) ,
                            generate_series(0, (select max(partkeylen) from (
                                select
                                    p.parrelid::regclass,
                                    t.tabledepth,
                                    p.parlevel,
                                    p.parnatts,
                                    p.paratts
                                from
                                    pg_partition p,
                                    (
                                        select
                                            parrelid::regclass,
                                            max(parlevel)+1
                                        from
                                            pg_partition
                                        group by parrelid
                                    ) as t(tableid, tabledepth)
                                where
                                    p.parrelid = t.tableid and
                                    p.paristemplate is false
                            ) as partition_level(tableid, tabledepth, partdepth, partkeylen, partkey) )-1) g(g),
                            (
                                select
                                    localoid::regclass,
                                    distkey,
                                    array_length(distkey, 1)
                                from
                                    gp_distribution_policy
                            ) as d(relid, distkey, attrcnt)
                        where
                            g.g < k.partkeylen and
                            k.tableid = d.relid
                        group by
                            k.tableid,
                            d.attrcnt,
                            d.distkey
                    ) as k(tableid, distkeylen, distkey, partkeylen, partkey) ,
                    (
                        select
                            relid,
                            conid,
                            conname,
                            contype,
                            indexid
                        from
                            (
                                select
                                    r.oid::regclass as relid,
                                    c.oid as conid,
                                    c.conname,
                                    c.contype,
                                    c.consrc,
                                    pg_get_constraintdef(c.oid) as condef,
                                    d.objid::regclass as indexid
                                from
                                    (
                                        pg_class r
                                            join
                                            pg_constraint c
                                            on
                                                r.oid = c.conrelid
                                                and r.relkind = 'r'
                                    )
                                        left join
                                        pg_depend d
                                        on
                                            d.refobjid = c.oid
                                            and d.classid = 'pg_class'::regclass
                                            and d.refclassid = 'pg_constraint'::regclass
                                            and d.deptype = 'i'
                                ) as relconstraint(relid, conid, conname, contype, consrc, condef, indexid)
                        where
                            indexid is not null
                            or contype in ('p', 'u')
                    ) as c(relid, conid, conname, contype, indexid)
                where
                    c.relid = k.tableid and
                    i.indrelid = k.tableid and
                    i.indexrelid = c.indexid and
                    not i.indkey::smallint[] @> array_cat(k.distkey::smallint[], k.partkey)
            ) as x(tableid, distkey, partkey, conid, conname, contype, indrelid, indkey) ,
            (
                select
                    tableid,
                    tabledepth,
                    tableid::regclass partid,
                    0 as partdepth,
                    0 as partordinal,
                    'r'::char as partstatus
                from
                    (
                        select
                            parrelid::regclass,
                            max(parlevel)+1
                        from
                            pg_partition
                        group by parrelid
                    ) as ptable(tableid, tabledepth)
                union all
                select
                    parrelid::regclass as tableid,
                    t.tabledepth as tabledepth,
                    r.parchildrelid::regclass partid,
                    p.parlevel + 1 as partdepth,
                    r.parruleord as partordinal,
                    case
                        when t.tabledepth = p.parlevel + 1 then 'l'::char
                        else 'i'::char
                    end as partstatus
                from
                    pg_partition p,
                    pg_partition_rule r,
                     (
                        select
                            parrelid::regclass,
                            max(parlevel)+1
                        from
                            pg_partition
                        group by parrelid
                    ) as t(tableid, tabledepth)
                where
                    p.oid = r.paroid
                    and not p.paristemplate
                    and p.parrelid = t.tableid
            ) as c(tableid, tabledepth, partid, partdepth, partordinal, partstatus) ,
            (
                select
                    r.oid::regclass as relid,
                    i.oid::regclass as indexid,
                    c.oid as conid,
                    c.contype,
                    c.conname,
                    pg_get_constraintdef(c.oid) as condef,
                    d.classid,
                    d.objid,
                    d.objsubid,
                    d.refclassid,
                    d.refobjid,
                    d.refobjsubid,
                    d.deptype
                from
                    pg_constraint c,
                    pg_class i,
                    pg_depend d,
                    pg_index x,
                    pg_class r
                where
                    d.classid = 'pg_class'::regclass and
                    d.objid = i.oid and
                    d.objsubid = 0 and
                    d.refclassid = 'pg_constraint'::regclass and
                    d.refobjid = c.oid and
                    d.refobjsubid = 0 and
                    d.deptype = 'i' and
                    i.relkind = 'i' and
                    i.oid = x.indexrelid and
                    r.oid = x.indrelid
            ) as b(relid, indexid, conid, contype, conname, condef, classid, objid, objsubid, refclassid, refobjid, refobjsubid, deptype)
        where
            x.tableid = c.tableid and
            c.partid = b.relid and
            b.classid = 'pg_class'::regclass and
            b.objsubid = 0 and
            b.refclassid = 'pg_constraint'::regclass and
            b.refobjsubid = 0 and
            b.deptype = 'i'""",

    # Repair sequence for issues identified by unenforced_constraint_info.
    # Must substitute values from a row of that query, before running.

    'unenforced_constraint_repairs':
        """--Demote type-{contype} constraint {conname} on part {partid}
--of partitioned table {tableid} to simple unique index.

--Disconnect index {indexid} from constraint {conname}
delete
from
    pg_depend
where
    (classid, objid, objsubid, refclassid, refobjid, refobjsubid, deptype) =
    (
        select
            d.classid,
            d.objid,
            d.objsubid,
            d.refclassid,
            d.refobjid,
            d.refobjsubid,
            d.deptype
        from
            pg_index i,
            pg_depend d
        where
            i.indexrelid = {indexoid} and
            d.classid = 'pg_class'::regclass and
            d.objid = i.indexrelid and
            d.objsubid = 0 and
            d.refclassid = 'pg_constraint'::regclass and
            d.refobjid = (
                select oid
                from pg_constraint
                where
                   conrelid = {partoid} and
                   conname = {quoted_conname} and
                   contypid = 0 and
                   contype = {quoted_contype}
            ) and
            d.refobjsubid = 0 and
            d.deptype = 'i'
    );
--Replace dependency of constraint {conname} on table columns of {partid}
--with dependency of index {indexoid} on same.
update
    pg_depend
set
    classid = 'pg_class'::regclass,
    objid = {indexoid}
where
    classid = 'pg_constraint'::regclass and
    objid = (
             select oid
               from pg_constraint
              where
                    conrelid = {partoid} and
                    conname = {quoted_conname} and
                    contypid = 0 and
                    contype = {quoted_contype}
         ) and
    objsubid = 0 and
    refclassid = 'pg_class'::regclass and
    refobjid = {partoid} and
    refobjsubid <> 0 and
    deptype = 'a';
--Delete constraint {conname} leaving its index {indexid}
delete
from
    pg_constraint
where
    conrelid = {partoid} and
    conname = {quoted_conname} and
    contypid = 0 and
    contype = {quoted_contype};""",


    # Query based on view: part_sys_actual
    # tableid
    # partid
    # actual observed number of system-constraints

    "actual_part_con_query":
        """select
            tableid,
            partid,
            count(*) as actual
        from (
                select
                    tableid,
                    partid,
                    conid,
                    conname,
                    contype,
                    condef
                from (
                        select
                            x.tableid::regclass as tableid,
                            c.conrelid::regclass as partid,
                            c.oid as conid,
                            c.conname,
                            c.contype,
                            pg_get_constraintdef(c.oid) as condef
                        from
                            pg_constraint c,
                            (
                                select
                                    tableid,
                                    tabledepth,
                                    tableid::regclass partid,
                                    0 as partdepth,
                                    0 as partordinal,
                                    'r'::char as partstatus
                                from
                                    (
                                        select
                                            parrelid::regclass,
                                            max(parlevel)+1
                                        from
                                            pg_partition
                                        group by parrelid
                                    ) as ptable(tableid, tabledepth)
                                union all
                                select
                                    parrelid::regclass as tableid,
                                    t.tabledepth as tabledepth,
                                    r.parchildrelid::regclass partid,
                                    p.parlevel + 1 as partdepth,
                                    r.parruleord as partordinal,
                                    case
                                        when t.tabledepth = p.parlevel + 1 then 'l'::char
                                        else 'i'::char
                                    end as partstatus
                                from
                                    pg_partition p,
                                    pg_partition_rule r,
                                     (
                                        select
                                            parrelid::regclass,
                                            max(parlevel)+1
                                        from
                                            pg_partition
                                        group by parrelid
                                    ) as t(tableid, tabledepth)
                                where
                                    p.oid = r.paroid
                                    and not p.paristemplate
                                    and p.parrelid = t.tableid
                            ) as x(tableid, tabledepth, partid, partdepth, partordinal, partstatus)
                        where
                            x.partid = c.conrelid
                    ) as part_constraint(tableid, partid, conid, conname, contype, condef)
                where
                    (partid, condef) not in
                        (
                            select partid, condef
                            from (
                                select
                                    u.tableid,
                                    u.conname,
                                    u.contype,
                                    u.condef,
                                    p.partid,
                                    p.conid,
                                    coalesce(p.conname)
                                from
                                    (
                                        select
                                            p.tableid,
                                            c.conname,
                                            c.contype,
                                            pg_get_constraintdef(c.oid) as condef
                                        from
                                            (
                                                select
                                                    parrelid::regclass,
                                                    max(parlevel)+1
                                                from
                                                    pg_partition
                                                group by parrelid
                                            ) as p(tableid, tabledepth) ,
                                            pg_constraint c
                                        where
                                            p.tableid = c.conrelid
                                    ) as u(tableid, conname, contype, condef)
                                        join
                                    (
                                        select
                                            x.tableid::regclass as tableid,
                                            c.conrelid::regclass as partid,
                                            c.oid as conid,
                                            c.conname,
                                            c.contype,
                                            pg_get_constraintdef(c.oid) as condef
                                        from
                                            pg_constraint c,
                                            (
                                                select
                                                    tableid,
                                                    tabledepth,
                                                    tableid::regclass partid,
                                                    0 as partdepth,
                                                    0 as partordinal,
                                                    'r'::char as partstatus
                                                from
                                                    (
                                                        select
                                                            parrelid::regclass,
                                                            max(parlevel)+1
                                                        from
                                                            pg_partition
                                                        group by parrelid
                                                    ) as ptable(tableid, tabledepth)
                                                union all
                                                select
                                                    parrelid::regclass as tableid,
                                                    t.tabledepth as tabledepth,
                                                    r.parchildrelid::regclass partid,
                                                    p.parlevel + 1 as partdepth,
                                                    r.parruleord as partordinal,
                                                    case
                                                        when t.tabledepth = p.parlevel + 1 then 'l'::char
                                                        else 'i'::char
                                                    end as partstatus
                                                from
                                                    pg_partition p,
                                                    pg_partition_rule r,
                                                     (
                                                        select
                                                            parrelid::regclass,
                                                            max(parlevel)+1
                                                        from
                                                            pg_partition
                                                        group by parrelid
                                                    ) as t(tableid, tabledepth)
                                                where
                                                    p.oid = r.paroid
                                                    and not p.paristemplate
                                                    and p.parrelid = t.tableid
                                            ) as x(tableid, tabledepth, partid, partdepth, partordinal, partstatus)
                                        where
                                            x.partid = c.conrelid
                                    ) as p(tableid, partid, conid, conname, contype, condef)
                                        on (
                                            u.tableid = p.tableid and
                                            u.contype = p.contype and
                                            u.condef = p.condef
                                            )
                            ) as part_user_constraint(tableid, tableconname, contype, condef, partid, partconid, partconname)
                        )
            ) as part_sys_constraint(tableid, partid, conid, conname, contype, condef)
        group by tableid, partid""",

    # Query contains_directly has a row per step along a path from
    # root to leaf.  The columns are
    # tableid - pg_class.oid of the partitioned table.
    # parentid - pg_class.oid of the "from" table.
    # childid - pg_class.oid of the "to" table, the target of the step.
    # npcon - constraint count for this step: 0 if "to" is default, else 1.
    #

    'contains_directly':
        """(
            select
                r.parrelid::regclass as tableid,
                coalesce(p.parchildrelid, r.parrelid)::regclass as parentid,
                c.parchildrelid::regclass as childid,
                case when c.parisdefault then 0 else 1 end as npcon
            from
                pg_partition r,
                pg_partition_rule c
                    left join pg_partition_rule p on (c.parparentrule = p.oid)
            where
                not r.paristemplate and
                not c.parchildrelid = 0 and
                c.paroid = r.oid
            ) {alias}""",

    # The base case is the length 1 path from root or branch to branch
    # or leaf. This is always the first union term.

    'base_query':
        """select
            start.tableid,
            start.parentid,
            start.childid,
            start.npcon
        from
            {contains_directly_start}""",

    # A union term is an N-way self join of contains_directly and
    # identifies paths of length N.  Note that the length of a path
    # to a part at depth N+1 is N.

    'union_term':
        """select
            start.tableid,
            start.parentid,
            stop.childid,
            start.npcon -- stop.npcon
        from
            {contains_directly_start},
            {contains_directly_stop}{intermediate_path_from}
        where
            start.childid {intermediate_path_where}
            = stop.parentid""",

    #

    'union_query':
        """select tableid, childid as partid, sum(npcon) as expected
        from (
            {open_union}
            ) r
        group by tableid, childid"""

}


#############

# For partitionRegularityChecks.
def make_union_term(depth):
    assert depth > 0

    union_term = partitionRegularityChecks['union_term']
    contains_directly = partitionRegularityChecks['contains_directly']

    path_from = []
    path_where = []

    if depth > 1:  # for later .join()
        path_from.append('')
        path_where.append('')

    for i in range(1, depth):
        path_alias = "path%d" % i
        path_from.append(contains_directly.format(alias=path_alias))
        path_where.append("    = %s.parentid and %s.childid"
                          % (path_alias, path_alias))

    return union_term.format(
        contains_directly_start=contains_directly.format(alias='start'),
        contains_directly_stop=contains_directly.format(alias='stop'),
        intermediate_path_from=",\n    ".join(path_from),
        intermediate_path_where="\n".join(path_where)
    )


# For partitionRegularityChecks.
def expected_part_con_query(maxdepth):
    cds = partitionRegularityChecks['contains_directly'].format(alias='start')
    base_query = partitionRegularityChecks['base_query']
    terms = [base_query.format(contains_directly_start=cds)]

    for depth in range(1, maxdepth + 1):
        terms.append(make_union_term(depth))

    return partitionRegularityChecks['union_query'].format(
        open_union='\n\nunion all\n\n'.join(terms))


# For partitionRegularityChecks.
def make_irregular_sys_con_query(maxdepth):
    expected = expected_part_con_query(maxdepth)
    actual = partitionRegularityChecks['actual_part_con_query']
    qry = partitionRegularityChecks['irreg_sys_constraint']
    return qry.format(expected=expected, actual=actual)


#############
def checkPartitionRegularity():
    #
    # Rio partitioning changes introduce new regularity requirements for constraints
    # on partitioned tables.  These are the checks.
    #
    okayToRun = GV.missingEntryStatus and GV.inconsistentEntryStatus and GV.foreignKeyStatus
    irregular = False

    logger.info('-----------------------------------')
    logger.info('Checking pg_partition/pg_constraint regularity ...')

    if not okayToRun:
        logger.warn('unable to run pg_partition/pg_constraint regularity checks')
        logger.warn('prerequisite tests failed or not run')
        return

    db = connect()

    checkname = 'user-defined constraint regularity on partitioned tables'

    qry = partitionRegularityChecks['irreg_user_constraint']
    err = []

    try:
        curs = db.query(qry)

        for row in curs.dictresult():
            err.append((row['tableconname'], row['tableid'], row['numactual'], row['numexpected']))

        if not err:
            logger.info('[OK]  %s' % checkname)
        else:
            irregular = True
            GV.checkStatus = False
            setError(ERROR_NOREPAIR)
            logger.info('[FAIL]  %s' % checkname)
            logger.error(' %s test has %d issue(s)' % (checkname, len(err)) )
            efmt = '  Constraint "%s" of partitioned table "%s" occurs %d times, %d expected.'
            for e in err:
                logger.error(efmt % e)

    except Exception, e:
        setError(ERROR_NOREPAIR)
        myprint('[ERROR] executing test: %s' % checkname)
        myprint('  Execution error: ' + str(e))

    checkname = 'system-defined partition constraint regularity on partitioned tables'

    # Determine maximum number of partitioning levels in this database
    try:
        qry = """select max(parlevel)+1 as maxdepth from pg_partition;"""
        curs = db.query(qry)
        assert len(curs.dictresult()) == 1  # scalar aggregation!
        maxdepth = curs.dictresult()[0]['maxdepth']

        err = []
        if isinstance(maxdepth, int):
            qry = make_irregular_sys_con_query(maxdepth)
            curs = db.query(qry)
            for row in curs.dictresult():
                err.append((row['partid'], row['tableid'], row['actual'], row['expected']))

        if not err:
            logger.info('[OK]  %s' % checkname)
        else:
            irregular = True
            GV.checkStatus = False
            setError(ERROR_NOREPAIR)
            logger.info('[FAIL]  %s' % checkname)
            logger.error(' %s test has %d issue(s)' % ( checkname, len(err)) )
            efmt = '  Part table "%s" of partitioned table "%s" has %d system-defined constraints, %d expected.'
            for e in err:
                logger.error(efmt % e)

    except Exception, e:
        setError(ERROR_NOREPAIR)
        myprint('[ERROR] executing test: %s' % checkname)
        myprint('  Execution error: ' + str(e))

    checkname = 'unenforced unique constraints on partitioned tables'

    if irregular:
        logger.warn('skipping check %s due to constraint irregularities' % checkname)
    elif GV.Remove:
        logger.warn('skipping check %s due to required removals' % checkname)
    else:
        qry = partitionRegularityChecks['unenforced_constraint_info']
        err = []
        try:
            curs = db.query(qry)
            for row in curs.dictresult():
                row['con_type_phrase'] = 'primary key' if row['contype'] == 'p' else 'unique'
                row['quoted_conname'] = quote_value(row['conname'])
                row['quoted_contype'] = quote_value(row['contype'])
                err.append(row)

            if not err:
                logger.info('[OK]  %s' % checkname)
            else:
                GV.checkStatus = False
                setError(ERROR_REMOVE)
                logger.info('[FAIL]  %s' % checkname)
                logger.error(' %s test has %d issue(s)' % ( checkname, len(err) ) )
                efmt = '  partitioned table "{tableid}" has unenforced {con_type_phrase} constraint: "{partid}"'
                repair_fmt = partitionRegularityChecks['unenforced_constraint_repairs']
                n = 0
                for e in err:
                    n = n + 1
                    if n <= 100:
                        logger.error(efmt.format(**e))
                    repair_sequence = repair_fmt.format(**e)
                    for dbid in GV.cfg:
                        addDemoteConstraint(dbid, repair_sequence)
                    if len(err) > 100:
                        logger.error("...")

        except Exception, e:
            setError(ERROR_NOREPAIR)
            myprint('[ERROR] executing test: %s' % checkname)
            myprint('  Execution error: ' + str(e))

    db.close()


#############
def checkPGClass():
    logger.info('-----------------------------------')
    logger.info('Checking pg_class ...')
    qry = '''
    SELECT relname, relkind, tc.oid as oid
    FROM   pg_class tc left outer join
           pg_attribute ta on (tc.oid = ta.attrelid)
    WHERE  ta.attrelid is NULL
    AND    tc.relnatts > 0
    '''
    err = connect2run(qry, ('relname', 'relkind', 'oid'))
    if not err:
        logger.info('[OK] pg_class')
    else:
        GV.checkStatus = False
        setError(ERROR_NOREPAIR)
        logger.info('[FAIL] pg_class')
        logger.error('pg_class has %d issue(s)' % len(err))
        logger.error(qry)
        for e in err[0:100]:
            logger.error(formatErr(e[0], e[1], e[2]))
        if len(err) > 100:
            logger.error("...")


#############
def checkPGNamespace():
    # Check for objects in various catalogs that are in a schema that has
    # been dropped.
    logger.info('Checking missing schema definitions ...')
    qry = '''
    SELECT o.catalog, o.nsp
    FROM pg_namespace n right outer join
         (select 'pg_class' as catalog, relnamespace as nsp from pg_class
          union
          select 'pg_type' as catalog, typnamespace as nsp from pg_type
          union
          select 'pg_operator' as catalog, oprnamespace as nsp from pg_operator
          union
          select 'pg_proc' as catalog,pronamespace as nsp from pg_proc) o on
          (n.oid = o.nsp)
    WHERE n.oid is NULL
    '''
    err = connect2run(qry, ('catalog', 'nsp'))
    if not err:
        logger.info('[OK] missing schema definitions')
    else:
        GV.checkStatus = False
        setError(ERROR_NOREPAIR)
        logger.info('[FAIL] missing schema definitons')
        logger.error('found %d references to non-existent schemas' % len(err))
        logger.error(qry)
        for e in err:
            logger.error(formatErr(e[0], e[1], e[2]))


#############
'''
Produce repair scripts to remove dangling entries of gp_fastsequence:
	- one file per segment dbid
	- contains DELETE statements to remove catalog entry
'''


def removeFastSequence(db):
    '''
    MPP-14758: gp_fastsequence does not get cleanup after a failed transaction (AO/CO)
    Note: this is slightly different from the normal foreign key check
          because it streches the cross reference across segments.
          This makes it safe in the event of cross consistency issues with pg_class,
          but may not repair some issues when there are cross consistency problems
    '''
    try:
        qry = """
              SELECT dbid, objid
                FROM gp_segment_configuration AS cfg JOIN
			         (SELECT gp_segment_id, objid
                        FROM (select gp_segment_id, objid from gp_fastsequence
                              union
                              select gp_segment_id, objid from gp_dist_random('gp_fastsequence')) AS fs
                        LEFT OUTER JOIN
                             (select oid from pg_class
                              union
                              select oid from gp_dist_random('pg_class')) AS c
                        ON (fs.objid = c.oid)
                        WHERE c.oid IS NULL
                        ) AS r
                  ON r.gp_segment_id = cfg.content
               WHERE cfg.role = 'p';
              """
        curs = db.query(qry)
        for row in curs.dictresult():
            seg = row['dbid']  # dbid of targetted segment
            name = 'gp_fastsequence tuple'  # for comment purposes
            table = 'gp_fastsequence'  # table name
            cols = {'objid': row['objid']}  # column name and value
            objname = 'gp_fastsequence'  # for comment purposes
            buildRemove(seg, name, table, cols, objname)
    except Exception, e:
        logger.error('removeFastSequence: ' + str(e))


#############

def removeIndexConstraint(nspname, relname, constraint):
    GV.Constraints.append('ALTER TABLE "%s"."%s" DROP CONSTRAINT "%s" CASCADE;' % \
                          (nspname, relname, constraint))


def buildRemove(seg, name, table, cols, objname):
    first = False
    fullstr = ''
    str = ''
    for col in cols:
        if not first:
            fullstr = '--Object Name: %s\n--Remove %s for %i\n' % \
                      (objname, name, cols[col])
        if len(str) > 0:
            str += ' or '
        str += '%s = \'%s\'' % (col, cols[col])
    fullstr += 'DELETE FROM %s WHERE %s;' % (table, str)
    addRemove(seg, fullstr)


def addRemove(seg, line):
    if not GV.Remove.has_key(seg):
        GV.Remove[seg] = []
    GV.Remove[seg].append(line)


def buildAdjustConname(seg, relname, relid, oldconname, newconname):
    # relname text
    # relid oid as int
    # old/newconname text
    stmt = '--Constraint Name: %s on %s becomes %s\n' % \
           (oldconname, relname, newconname)
    stmt += 'UPDATE pg_constraint\n'
    stmt += 'SET conname = %s\n' % newconname
    stmt += 'WHERE conrelid = %d and conname = %s;' % (relid, oldconname)
    addAdjustConname(seg, stmt)


def addAdjustConname(seg, stmt):
    if not GV.AdjustConname.has_key(seg):
        GV.AdjustConname[seg] = []
    GV.AdjustConname[seg].append(stmt)


def addDemoteConstraint(seg, repair_sequence):
    if not GV.DemoteConstraint.has_key(seg):
        GV.DemoteConstraint[seg] = []
    GV.DemoteConstraint[seg].append(repair_sequence)


#############
def drop_leaked_schemas(leaked_schema_dropper, dbname):
    logger.info('-----------------------------------')
    logger.info('Checking for leaked temporary schemas')

    db_connection = connect(database=dbname)
    try:
        dropped_schemas = leaked_schema_dropper.drop_leaked_schemas(db_connection)
        if not dropped_schemas:
            logger.info('[OK] temporary schemas')
        else:
            logger.info('[FAIL] temporary schemas')
            myprint("Found and dropped %d unbound temporary schemas" % len(dropped_schemas))
            logger.error('Dropped leaked schemas \'%s\' in the database \'%s\'' % (dropped_schemas, dbname))
    except Exception, e:
        setError(ERROR_NOREPAIR)
        myprint('  Execution error: ' + str(e))
    finally:
        db_connection.close()

def checkDepend():
    # Check for dependencies on non-existent objects
    logger.info('-----------------------------------')
    logger.info('Checking Object Dependencies')

    db = connect2(GV.cfg[GV.master_dbid], utilityMode=False)

    # Catalogs that link up to pg_depend/pg_shdepend
    qry = "select relname from pg_class where relnamespace=%d and relhasoids" % PG_CATALOG_OID
    curs = db.query(qry)
    catalogs = []
    for row in curs.getresult():
        catalogs.append(row[0])

    checkDependJoinCatalog(catalogs)
    checkCatalogJoinDepend(catalogs)

def checkDependJoinCatalog(catalogs):
    # Construct subquery that will verify that all (classid, objid)
    # and (refclassid, refobjid) pairs existing in pg_depend actually
    # exist in that catalog table (classid::regclass or
    # refclassid::regclass)
    deps = []
    for cat in catalogs:
        qry = """
          SELECT '{catalog}' as catalog, objid FROM (
            SELECT objid FROM pg_depend
              WHERE classid = '{catalog}'::regclass
            UNION ALL
            SELECT refobjid FROM pg_depend
              WHERE refclassid = '{catalog}'::regclass
          ) d
          LEFT OUTER JOIN {catalog} c on (d.objid = c.oid)
          WHERE c.oid is NULL
          UNION ALL
          SELECT '{catalog}' as catalog, objid FROM (
            SELECT dbid, objid FROM pg_shdepend
              WHERE classid = '{catalog}'::regclass
            UNION ALL
            SELECT dbid, refobjid FROM pg_shdepend
              WHERE refclassid = '{catalog}'::regclass
          ) d JOIN pg_database db
            ON (d.dbid = db.oid and datname= current_database())
          LEFT OUTER JOIN {catalog} c on (d.objid = c.oid)
          WHERE c.oid is NULL
          """.format(catalog=cat)
        deps.append(qry)

    qry = """
    SELECT distinct catalog, objid
    FROM (%s
    ) q
    """ % "UNION ALL".join(deps)

    try:
        err = connect2run(qry)
        if not err:
            logger.info('[OK] extra object dependencies')
        else:
            GV.checkStatus = False
            setError(ERROR_NOREPAIR)
            logger.info('[FAIL] extra object dependencies')
            logger.error('  found %d dependencies on dropped objects' % len(err))
            for config, column, row in err:
                gpObj = getGPObject(row['objid'], row['catalog'])
                issue = CatDependencyIssue(row['catalog'], row['objid'], config['content'])
                gpObj.addDependencyIssue(issue)

    except Exception, e:
        setError(ERROR_NOREPAIR)
        myprint("[ERROR] executing test: extra object dependencies")
        myprint("  Execution error: " + str(e))
        myprint(qry)

def checkCatalogJoinDepend(catalogs):
    # Construct subqueries to aggregate all OIDs from catalog tables
    # not in the DEPENDENCY_EXCLUSION
    deps = []
    for cat in catalogs:
        if cat not in DEPENDENCY_EXCLUSION:
            qry = """
            SELECT '{catalog}' AS catalog, oid FROM {catalog} WHERE oid >= {oid}
            """.format(catalog=cat, oid=FIRST_NORMAL_OID)

            # Exclude pg_proc entries in pg_catalog namespace. This is
            # mainly to avoid flagging language handler functions that
            # linger after the language is dropped. Extensions (which
            # do drop their proclang handler functions) will be
            # replacing language so this exclusion should be fine.
            if cat == 'pg_proc':
                qry += """
                AND pronamespace != {oid}
                """.format(oid=PG_CATALOG_OID)

            deps.append(qry)

    # Construct query that will check that each OID in our aggregated
    # catalog OIDs list has a reference in pg_depend under objid or
    # refobjid column
    qry = """
    SELECT distinct catalog, c.oid as objid
    FROM (
        {subquery}
    ) c
    LEFT OUTER JOIN
    (
        SELECT objid AS oid FROM pg_depend WHERE objid >= {oid}
        UNION
        SELECT refobjid AS oid FROM pg_depend WHERE refobjid >= {oid}
    ) d
    ON (c.oid = d.oid)
    WHERE d.oid IS NULL;
    """.format(subquery="UNION ALL".join(deps), oid=FIRST_NORMAL_OID)

    try:
        err = connect2run(qry)
        if not err:
            logger.info('[OK] missing object dependencies')
        else:
            GV.checkStatus = False
            setError(ERROR_NOREPAIR)
            logger.info('[FAIL] missing object dependencies')
            logger.error('  found %d existing objects without dependencies' % len(err))
            for config, column, row in err:
                gpObj = getGPObject(row['objid'], row['catalog'])
                issue = CatDependencyIssue(row['catalog'], row['objid'], config['content'])
                gpObj.addDependencyIssue(issue)

    except Exception, e:
        setError(ERROR_NOREPAIR)
        myprint("[ERROR] executing test: missing object dependencies")
        myprint("  Execution error: " + str(e))
        myprint(qry)

def fixupowners(nspname, relname, oldrole, newrole):
    # Must first alter the table to the wrong owner, then back to the right
    # owner.  The purpose of this is that AT doesn't dispatch the change unless
    # it think the table actually changed owner.
    #
    # Note: this means that the Owners list must be run in the order given.
    #
    GV.Owners.append('-- owner for "%s"."%s"' % (nspname, relname))
    GV.Owners.append('ALTER TABLE "%s"."%s" OWNER TO "%s";' %
                     (nspname, relname, oldrole))
    GV.Owners.append('ALTER TABLE "%s"."%s" OWNER TO "%s";' %
                     (nspname, relname, newrole))


def checkOwners():
    logger.info('-----------------------------------')
    logger.info('Checking table ownership')

    # Check owners in pg_class
    #
    #  - Report each table that has an inconsistency with the master database,
    #    this can be on the table directly, or on one of the table subobjects
    #    (pg_toast, pg_aoseg, etc)
    #
    #  - Theoretically this could result in multiple corrections for a given
    #    table based on having multiple "wrong" owners.  Realistically most
    #    of the problems we have seen with this problem the wrong owner is
    #    almost always the gpadmin user, so this is not expected.  If it does
    #    occur it won't be a problem, but we will issue more ALTER TABLE
    #    commands than is strictly necessary.
    #
    #  - Between 3.3 and 4.0 the ao segment columns migrated from pg_class
    #    to pg_appendonly.
    db = connect2(GV.cfg[GV.master_dbid], utilityMode=False)
    qry = '''
    select distinct n.nspname, coalesce(o.relname, c.relname) as relname,
                    a.rolname, m.rolname as master_rolname
    from gp_dist_random('pg_class') r
      join pg_class c on (c.oid = r.oid)
      left join pg_appendonly ao on (c.oid = ao.segrelid or
                                     c.oid = ao.blkdirrelid or
                                     c.oid = ao.blkdiridxid)
      left join pg_class o on (o.oid = ao.relid or
                               o.reltoastrelid = c.oid)
      join pg_authid a on (a.oid = r.relowner)
      join pg_authid m on (m.oid = coalesce(o.relowner, c.relowner))
      join pg_namespace n on (n.oid = coalesce(o.relnamespace, c.relnamespace))
    where c.relowner <> r.relowner
    '''
    try:
        curs = db.query(qry)

        rows = []
        for row in curs.dictresult():
            rows.append(row)

        if len(rows) == 0:
            logger.info('[OK] table ownership')
        else:
            GV.checkStatus = False
            setError(ERROR_REMOVE)
            logger.info('[FAIL] table ownership')
            logger.error('found %d table ownership issue(s)' % len(rows))
            logger.error('%s' % qry)
            for row in rows[0:100]:
                logger.error('  %s.%s relowner %s != %s'
                             % (row['nspname'], row['relname'], row['rolname'],
                                row['master_rolname']))
            if len(rows) > 100:
                logger.error("...")
            for row in rows:
                fixupowners(row['nspname'], row['relname'], row['rolname'],
                            row['master_rolname'])
    except Exception, e:
        setError(ERROR_NOREPAIR)
        myprint("[ERROR] executing: check table ownership")
        myprint("  Execution error: " + str(e))
        myprint(qry)

    # TODO: add a check that subobject owners agree with the main object owner.
    # (The above checks only compare master vs segment)


    # Check owners in pg_type
    #  - Ignore implementation types of pg_class entries - they should be
    #    in the check above since ALTER TABLE is required to fix them, not
    #    ALTER TYPE.
    db = connect2(GV.cfg[GV.master_dbid], utilityMode=False)
    qry = '''
    select distinct n.nspname, t.typname, a.rolname, m.rolname as master_rolname
    from gp_dist_random('pg_type') r
      join pg_type t on (t.oid = r.oid)
      join pg_namespace n on (n.oid = t.typnamespace)
      join pg_authid a on (a.oid = r.typowner)
      join pg_authid m on (m.oid = t.typowner)
    where r.typowner <> t.typowner
    '''
    try:
        curs = db.query(qry)

        rows = []
        for row in curs.dictresult():
            rows.append(row)

        if len(rows) == 0:
            logger.info('[OK] type ownership')
        else:
            GV.checkStatus = False
            setError(ERROR_NOREPAIR)
            logger.info('[FAIL] type ownership')
            logger.error('found %d type ownership issue(s)' % len(rows))
            logger.error('%s' % qry)
            for row in rows[0:100]:
                logger.error('  %s.%s typeowner %s != %s'
                             % (row['nspname'], row['typname'], row['rolname'],
                                row['master_rolname']))
            if len(rows) > 100:
                logger.error("...")

    except Exception, e:
        setError(ERROR_NOREPAIR)
        myprint("[ERROR] executing test: check type ownership")
        myprint("  Execution error: " + str(e))
        myprint(qry)

        # FIXME: add repair script
        # NOTE: types with typrelid probably will be repaired by the
        # script generated by the table ownership check above.

        # TODO:
        # Check owners in pg_proc
        # Check owners in pg_database
        # Check owners in pg_tablespace
        # Check owners in pg_namespace
        # Check owners in pg_operator
        # Check owners in pg_opclass
        # ...

    checkOwnersRepair()


def closeDbs():
    for key, conns in GV.db.iteritems():
        db = conns[0]
        db.close()
    GV.db = {}  # remove everything


# -------------------------------------------------------------------------------
def getCatObj(namestr):
    db = connect2(GV.cfg[GV.master_dbid], utilityMode=False)
    try:
        cat = GV.catalog.getCatalogTable(namestr)
    except Exception, e:
        myprint("No such catalog table: %s\n" % str(namestr))
        raise
    return cat


# -------------------------------------------------------------------------------
def checkACL():
    logger.info('-----------------------------------')
    logger.info('Performing cross database ACL tests')

    # looks up information in the catalog:
    tables = GV.catalog.getCatalogTables()

    for cat in sorted(tables):
        checkTableACL(cat)
# -------------------------------------------------------------------------------


def checkTableACL(cat):
    catname = cat.getTableName()
    pkey = cat.getPrimaryKey()
    master = cat.isMasterOnly()
    isShared = cat.isShared()
    acl = cat.getTableAcl()

    if GV.aclStatus is None:
        GV.aclStatus = True

    # Skip:
    #   - master only tables
    #   - tables without a primary key
    #   - tables without acls
    if master or pkey == [] or acl is None:
        return

    # skip shared/non-shared tables
    if GV.opt['-S']:
        if re.match("none", GV.opt['-S'], re.I) and isShared:
            return
        if re.match("only", GV.opt['-S'], re.I) and not isShared:
            return

    # Comparing ACLs cannot be done with a simple equality comparison
    # since it is valid for the ACLs to have different order.  Instead
    # we compare that both acls are subsets of each other => equality.

    mPkey = ['m.' + i for i in pkey]
    if cat.tableHasConsistentOids():
        mPkey = ['m.oid']

    qry = """
    SELECT s.gp_segment_id as segid, {mPkey},
           m.{acl} as master_acl, s.{acl} as segment_acl
    FROM {catalog} m
    JOIN gp_dist_random('{catalog}') s using ({primary_key})
    WHERE  not (m.{acl} @> s.{acl} and s.{acl} @> m.{acl}) or
           (m.{acl} is null and s.{acl} is not null) or
           (s.{acl} is null and m.{acl} is not null)
    ORDER BY {primary_key}, s.gp_segment_id
    """.format(catalog=catname,
               primary_key=', '.join(pkey),
               mPkey=', '.join(mPkey),
               acl=acl)

    # Execute the query
    try:
        db = connect2(GV.cfg[GV.master_dbid], utilityMode=False)
        curs = db.query(qry)
        nrows = curs.ntuples()

        if nrows == 0:
            logger.info('[OK] Cross consistency acl check for ' + catname)
        else:
            GV.checkStatus = False
            setError(ERROR_NOREPAIR)
            GV.aclStatus = False
            logger.info('[FAIL] Cross consistency acl check for ' + catname)
            logger.error('  %s acl check has %d issue(s)' % (catname, nrows))

            fields = curs.listfields()
            gplog.log_literal(logger, logging.ERROR, "    " + " | ".join(fields))
            for row in curs.getresult():
                gplog.log_literal(logger, logging.ERROR, "    " + " | ".join(map(str, row)))
            processACLResult(catname, fields, curs.getresult())

    except Exception, e:
        setError(ERROR_NOREPAIR)
        GV.aclStatus = False
        myprint('[ERROR] executing: Cross consistency check for ' + catname)
        myprint('  Execution error: ' + str(e))
        myprint(qry)
# -------------------------------------------------------------------------------


def checkForeignKey(cat_tables=None):
    logger.info('-----------------------------------')
    logger.info('Performing foreign key tests')

    if GV.foreignKeyStatus is None:
        GV.foreignKeyStatus = True

    # looks up information in the catalog:
    if not cat_tables:
        cat_tables = GV.catalog.getCatalogTables()

    db_connection = connect2(GV.cfg[GV.master_dbid], utilityMode=False)
    try:
        foreign_key_check = ForeignKeyCheck(db_connection, logger, GV.opt['-S'], autoCast)
        foreign_key_issues = foreign_key_check.runCheck(cat_tables)
        if foreign_key_issues:
            GV.checkStatus = False

        for catname, tuples in foreign_key_issues.iteritems():
                for pkcatname, fields, results in tuples:
                    processForeignKeyResult(catname, pkcatname, fields, results)
                    if catname == 'gp_fastsequence' and pkcatname == 'pg_class':
                        setError(ERROR_REMOVE)
                        removeFastSequence(db_connection)
                    else:
                        setError(ERROR_NOREPAIR)
    except Exception, ex:
        setError(ERROR_NOREPAIR)
        GV.foreignKeyStatus = False
        myprint('  Execution error: ' + str(ex))
# -------------------------------------------------------------------------------


def checkMissingEntry():
    logger.info('-----------------------------------')
    logger.info('Performing cross consistency tests: check for missing or extraneous issues')

    if GV.missingEntryStatus is None:
        GV.missingEntryStatus = True

    catalog_issues = _checkAllTablesForMissingEntries()

    if len(catalog_issues) == 0:
        return

    if GV.opt['-E']:
        do_repair_for_extra(catalog_issues)
    else:
        setError(ERROR_NOREPAIR)


def _checkAllTablesForMissingEntries():
    catalog_issues = {}
    tables = GV.catalog.getCatalogTables()
    for catalog_table_obj in sorted(tables):
        issues = checkTableMissingEntry(catalog_table_obj)
        if not issues:
            continue
        catalog_name = catalog_table_obj.getTableName()
        _, pk_name = getPrimaryKeyColumn(catalog_name, catalog_table_obj.getPrimaryKey())
        # We do this check in this function rather than getPrimaryKeyColumn as
        # it is used in checkACL and we do not want to modify that functionality
        catalog_issues[(catalog_table_obj, pk_name)] = issues
    return catalog_issues
# -------------------------------------------------------------------------------

def checkTableMissingEntry(cat):
    catname = cat.getTableName()
    pkey = cat.getPrimaryKey()
    master = cat.isMasterOnly()
    isShared = cat.isShared()
    coltypes = cat.getTableColtypes()

    # Skip master only tables
    if master:
        return

    # skip shared/non-shared tables
    if GV.opt['-S']:
        if re.match("none", GV.opt['-S'], re.I) and isShared:
            return
        if re.match("only", GV.opt['-S'], re.I) and not isShared:
            return

    # Skip catalogs without primary key
    if len(pkey) == 0:
        logger.warn("[WARN] Skipped missing/extra entry check for %s" % catname)
        return

    castedPkey = cat.getPrimaryKey()
    castedPkey = [c + autoCast.get(coltypes[c], '') for c in castedPkey]

    if cat.tableHasConsistentOids():
        qry = missingEntryQuery(GV.max_content, catname, ['oid'], ['oid'])
    else:
        qry = missingEntryQuery(GV.max_content, catname, pkey, castedPkey)

    # Execute the query
    try:
        db = connect2(GV.cfg[GV.master_dbid], utilityMode=False)
        curs = db.query(qry)
        nrows = curs.ntuples()

        if nrows == 0:
            logger.info('[OK] Checking for missing or extraneous entries for ' + catname)
        else:
            if catname in ['pg_constraint']:
                logger_with_level = logger.warning
                log_level = logging.WARNING
            else:
                GV.checkStatus = False
                GV.missingEntryStatus = False
                logger_with_level = logger.error
                log_level = logging.ERROR

            logger.info(('[%s] Checking for missing or extraneous entries for ' + catname) %
                        ('WARNING' if log_level == logging.WARNING else 'FAIL'))
            logger_with_level('  %s has %d issue(s)' % (catname, nrows))
            fields = curs.listfields()
            gplog.log_literal(logger, log_level, "    " + " | ".join(fields))
            results = curs.getresult()
            for row in results:
                gplog.log_literal(logger, log_level, "    " + " | ".join(map(str, row)))
            processMissingDuplicateEntryResult(catname, fields, results, "missing")
            if catname == 'pg_type':
                generateVerifyFile(catname, fields, results, 'missing_extraneous')

            return results

    except Exception, e:
        setError(ERROR_NOREPAIR)
        GV.missingEntryStatus = False
        myprint('[ERROR] executing: Missing or extraneous entries check for ' + catname)
        myprint('  Execution error: ' + str(e))
        myprint(qry)

class checkAOSegVpinfoThread(execThread):
    def __init__(self, cfg, db):
        execThread.__init__(self, cfg, db, None)

    def run(self):
        aoseg_query = """
           SELECT a.relname, a.relid, a.segrelid, cl.relname
           FROM (SELECT p.relid, p.segrelid, c.relname FROM pg_appendonly p LEFT JOIN pg_class c ON p.relid = c.oid WHERE p.columnstore = true) a
           LEFT JOIN pg_class cl ON a.segrelid = cl.oid;
        """

        try:
            # Read the list of aoseg tables from the database
            curs = self.db.query(aoseg_query)

            for relname, relid, segrelid, segrelname in curs.getresult():
                qry = "SELECT count(*) FROM pg_attribute WHERE attrelid=%d AND attnum > 0;" % (relid)
                attr_count = self.db.query(qry).getresult()[0][0]

                qry = "SELECT distinct(length(vpinfo)) FROM pg_aoseg.%s WHERE state = 1;" % (segrelname)
                vpinfo_curs = self.db.query(qry)
                nrows = vpinfo_curs.ntuples()
                if nrows == 0:
                    continue
                elif nrows > 1:
                    GV.checkStatus = False
                    setError(ERROR_NOREPAIR)
                    logger.info('[FAIL] inconsistent vpinfo')
                    logger.error("found {nrows} vpinfo(s) with different length in 'pg_aoseg.{segrelname}' of table '{relname}' on segment {content}"
                                 .format(nrows = nrows,
                                         segrelname = segrelname,
                                         relname = relname,
                                         content = self.cfg['content']))
                    logger.error(qry)
                    continue

                vpinfo_length = vpinfo_curs.getresult()[0][0]

                # vpinfo is bytea type, the length of the first 3 fields is 12 bytes, and the size of AOCSVPInfoEntry is 16
                # typedef struct AOCSVPInfo
                # {
                # 	int32		_len;
                # 	int32		version;
                # 	int32		nEntry;
                #
                # 	AOCSVPInfoEntry entry[1];
                # } AOCSVPInfo;
                vpinfo_attr_count = (vpinfo_length - 12) / 16
                if vpinfo_attr_count != attr_count:
                    GV.checkStatus = False
                    setError(ERROR_NOREPAIR)
                    logger.info('[FAIL] inconsistent vpinfo')
                    logger.error("vpinfo in 'pg_aoseg.{segrelname}' of table '{relname}' contains {vpinfo_attr_count} attributes, while pg_attribute has {attr_count} attributes on segment {content}"
                                 .format(segrelname = segrelname,
                                         relname = relname,
                                         vpinfo_attr_count = vpinfo_attr_count,
                                         attr_count = attr_count,
                                         content = self.cfg['content']))
                    logger.error(qry)
        except Exception, e:
            GV.checkStatus = False
            self.error = e

def checkAOSegVpinfo():
    threads = []
    i = 1
    # parallelise check
    for dbid in GV.cfg:
        cfg = GV.cfg[dbid]
        db_connection = connect2(cfg)
        thread = checkAOSegVpinfoThread(cfg, db_connection)
        thread.start()
        logger.debug('launching check thread %s for dbid %i' %
                     (thread.getName(), dbid))
        threads.append(thread)

        if (i % GV.opt['-B']) == 0:
            processThread(threads)
            threads = []

        i += 1

    processThread(threads)

# -------------------------------------------------------------------------------

# Exclude these tuples from the catalog table scan
# pg_depend: classid 2603 (pg_amproc) is excluded because their OIDs can be
#            nonsynchronous (catalog.c:RelationNeedsSynchronizedOIDs())
MISSING_ENTRY_EXCLUDE = {'pg_depend': 'WHERE classid != 2603'}

def missingEntryQuery(max_content, catname, pkey, castedPkey):
    # =================
    #  Missing / Extra
    # =================
    #
    # (Fetch all the entries from segments. For each entry, collect the
    #  segment IDs of all the segments where the entry is present, in an array)
    #
    # Full outer join
    #
    # (Fetch all entries in master)
    #
    #
    # The WHERE clause at the bottom filters out all the boring rows, leaving
    # only rows that are missing from one of the segments, or from the master.

    catalog_exclude = MISSING_ENTRY_EXCLUDE.get(catname, "")

    qry = """
          SELECT {primary_key},
                 case when master is null then segids
                      when segids is null then array[master.segid]
                      else master.segid || segids end as segids
          FROM
          (
            SELECT {primary_key}, array_agg(gp_segment_id order by gp_segment_id) as segids
            FROM gp_dist_random('{catalog}') {catalog_exclude} GROUP BY {primary_key}
          ) as seg
          FULL OUTER JOIN
          (
            SELECT gp_segment_id as segid, {primary_key} FROM {catalog} {catalog_exclude}
          ) as master
          USING ({primary_key})
          WHERE master.segid is null
             OR segids is null
             OR NOT segids @> (select array_agg(content::int4) from gp_segment_configuration WHERE content >= 0)
          """.format(primary_key=','.join(pkey),
                     catalog=catname,
                     catalog_exclude=catalog_exclude)

    return qry


def transformTextArrayCols(catname, columns, colnames, coltypes):
    # Distributing by hash(text[]) is potentially dangerous,
    # so if the table has text[] column, then we need array_to_string
    #
    # We should fix the text[] hash expression issue, but
    # for this script, this seems like a reasonable workaround.

    transformed = []
    for i in range(len(columns)):
        if coltypes[colnames[i]] == '_text':
            transformed.append("array_to_string(" + columns[i] + ", ',')")
        else:
            transformed.append(columns[i])

    return transformed


# -------------------------------------------------------------------------------
def checkInconsistentEntry():
    logger.info('-----------------------------------')
    logger.info('Performing cross consistency test: check for inconsistent entries')

    if GV.inconsistentEntryStatus is None:
        GV.inconsistentEntryStatus = True

    # looks up information in the catalog:
    tables = GV.catalog.getCatalogTables()

    for cat in sorted(tables):
        checkTableInconsistentEntry(cat)


# -------------------------------------------------------------------------------
def checkTableInconsistentEntry(cat):
    catname = cat.getTableName()
    pkey = cat.getPrimaryKey()
    master = cat.isMasterOnly()
    isShared = cat.isShared()
    columns = cat.getTableColumns(with_acl=False)
    coltypes = cat.getTableColtypes()

    # Skip master only tables
    if master:
        return

    # skip shared/non-shared tables
    if GV.opt['-S']:
        if re.match("none", GV.opt['-S'], re.I) and isShared:
            return
        if re.match("only", GV.opt['-S'], re.I) and not isShared:
            return

    # Skip catalogs without primary key
    if pkey == []:
        logger.warn("[WARN] Skipped cross consistency check for %s" %
                    catname)
        return

    castedPkey = cat.getPrimaryKey()
    castedPkey = [c + autoCast.get(coltypes[c], '') for c in castedPkey]
    castcols = [c + autoCast.get(coltypes[c], '') for c in columns]

    # transform text[] cols for sure. See comments on transformTextArrayCols
    castcols = transformTextArrayCols(catname, castcols, columns, cat.getTableColtypes())
    castcols = [castcols[i] + ' AS ' + columns[i] for i in range(len(columns))]
    if cat.tableHasConsistentOids():
        qry = inconsistentEntryQuery(GV.max_content, catname, ['oid'], columns, castcols)
    else:
        qry = inconsistentEntryQuery(GV.max_content, catname, castedPkey, columns, castcols)

    # Execute the query
    try:
        db = connect2(GV.cfg[GV.master_dbid], utilityMode=False)
        curs = db.query(qry)
        nrows = curs.ntuples()

        if nrows == 0:
            logger.info('[OK] Checking for inconsistent entries for ' + catname)
        else:
            GV.checkStatus = False
            setError(ERROR_NOREPAIR)
            GV.inconsistentEntryStatus = False
            logger.info('[FAIL] Checking for inconsistent entries for ' + catname)
            logger.error('  %s has %d issue(s)' % (catname, nrows))

            fields = curs.listfields()
            gplog.log_literal(logger, logging.ERROR, "    " + " | ".join(fields))
            for row in curs.getresult():
                gplog.log_literal(logger, logging.ERROR, "    " + " | ".join(map(str, row)))
            results = curs.getresult()
            processInconsistentEntryResult(catname, pkey, fields, results)
            if catname == 'pg_type':
                generateVerifyFile(catname, fields, results, 'duplicate')


    except Exception, e:
        setError(ERROR_NOREPAIR)
        GV.inconsistentEntryStatus = False
        myprint('[ERROR] executing: Inconsistent entries check for ' + catname)
        myprint('  Execution error: ' + str(e))
        myprint(qry)


# -------------------------------------------------------------------------------
def inconsistentEntryQuery(max_content, catname, pkey, columns, castcols):
    # -- ==============
    # --  Inconsistent
    # -- ==============
    # --   Build set of all values in the instance
    # --
    # --   Count number of unique values for the primary key
    # --     - Ignore rows where this does not equal number of segments
    # --       (These are the missing/extra rows and are covered by other checks)
    # --
    # --   Count number of unique values for all columns
    # --     - Ignore rows where this equals number of segments
    # --
    # --   Group the majority opinion into one bucket and report everyone who disagrees with
    # --   the majority individually.
    # --
    # --   Majority gets reported with gp_segment_id == null
    # --
    # -- SET GUC due to MPP-14531:

    qry = """
              SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;

              SET gp_enable_mk_sort=off;

              -- distribute catalog table from master, so that we can avoid to gather
              CREATE TEMPORARY TABLE _tmp_master ON COMMIT DROP AS
                  SELECT gp_segment_id segid, {columns} FROM {catalog};
              SELECT {columns}, array_agg(gp_segment_id order by gp_segment_id) as segids
              FROM
              (
                 SELECT DISTINCT
                   case when dcount <= ({max_content}+2)/2.0 then gp_segment_id else null end as gp_segment_id, {columns}
                 FROM
                 (
                   select count(*) over (partition by {columns}) as dcount,
                          count(*) over (partition by {pkey}) as pcount,
                          gp_segment_id, {columns}
                   from (
                     select segid gp_segment_id, {castcols} from _tmp_master
                     union all
                     select gp_segment_id, {castcols}
                     from   gp_dist_random('{catalog}')
                   ) all_segments
                 ) counted_segments
                 WHERE dcount != {max_content}+2 and pcount = {max_content}+2
                 ORDER BY {pkey}, gp_segment_id
              ) rowresult
              GROUP BY {columns}
              ORDER BY {pkey}, segids;
              """.format(pkey=','.join(pkey),
                         catalog=catname,
                         columns=','.join(columns),
                         castcols=','.join(castcols),
                         max_content=max_content)

    return qry


# -------------------------------------------------------------------------------
def checkDuplicateEntry():
    logger.info('-----------------------------------')
    logger.info('Performing test: checking for duplicate entries')

    # looks up information in the catalog:
    tables = GV.catalog.getCatalogTables()

    for cat in sorted(tables):
        ## pg_depend does not care about duplicates at the moment
        if cat == 'pg_depend':
            continue
        checkTableDuplicateEntry(cat)


# -------------------------------------------------------------------------------
def checkTableDuplicateEntry(cat):
    catname = cat.getTableName()
    pkey = cat.getPrimaryKey()
    master = cat.isMasterOnly()
    isShared = cat.isShared()
    columns = cat.getTableColumns(with_acl=False)
    coltypes = cat.getTableColtypes()

    # Skip master only tables
    if master:
        return

    # skip shared/non-shared tables
    if GV.opt['-S']:
        if re.match("none", GV.opt['-S'], re.I) and isShared:
            return
        if re.match("only", GV.opt['-S'], re.I) and not isShared:
            return

    # Skip catalogs without primary key
    if pkey == []:
        logger.warn("[WARN] Skipped duplicate check for %s" %
                    catname)
        return

    pkey = [c + autoCast.get(coltypes[c], '') for c in pkey]
    if cat.tableHasConsistentOids():
        qry = duplicateEntryQuery(catname, ['oid'])
    else:
        qry = duplicateEntryQuery(catname, pkey)

    # Execute the query
    try:
        db = connect2(GV.cfg[GV.master_dbid], utilityMode=False)
        curs = db.query(qry)
        nrows = curs.ntuples()

        if nrows == 0:
            logger.info('[OK] Checking for duplicate entries for ' + catname)
        else:
            GV.checkStatus = False
            setError(ERROR_NOREPAIR)
            logger.error('[FAIL] Checking for duplicate entries for ' + catname)
            logger.error('  %s has %d issue(s)' % (catname, nrows))

            fields = curs.listfields()
            gplog.log_literal(logger, logging.ERROR, "    " + " | ".join(fields))
            results = curs.getresult()
            for row in results:
                gplog.log_literal(logger, logging.ERROR, "    " + " | ".join(map(str, row)))
            processMissingDuplicateEntryResult(catname, fields, results, "duplicate")
            if catname == 'pg_type':
                generateVerifyFile(catname, fields, results, 'duplicate')
    except Exception, e:
        setError(ERROR_NOREPAIR)
        myprint('[ERROR] executing: duplicate entries check for ' + catname)
        myprint('  Execution error: ' + str(e))
        myprint(qry)


# -------------------------------------------------------------------------------
def duplicateEntryQuery(catname, pkey):
    # -- ===========
    # --  Duplicate
    # -- ===========
    # --   Return any rows having count > 1 for a given segid, {unique_key} pair
    # --

    qry = """
          SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;

          -- distribute catalog table from master, so that we can avoid to gather
          CREATE TEMPORARY TABLE _tmp_master ON COMMIT DROP AS
              SELECT gp_segment_id segid, {pkey} FROM {catalog};
          SELECT {pkey}, total, array_agg(segid order by segid) as segids
          FROM (
              SELECT segid, {pkey}, count(*) as total
              FROM (
                   select segid, {pkey} FROM _tmp_master
                   union all
                   select gp_segment_id as segid, {pkey} FROM gp_dist_random('{catalog}')
              ) all_segments
              GROUP BY segid, {pkey}
              HAVING count(*) > 1
          ) rowresult
          GROUP BY {pkey}, total
          """.format(catalog=catname,
                     pkey=','.join(pkey))

    return qry


# -------------------------------------------------------------------------------
def checkUniqueIndexViolation():
    logger.info('-----------------------------------')
    logger.info('Performing check: checking for violated unique indexes')
    db_connection = connect2(GV.cfg[GV.master_dbid], utilityMode=False)

    violations = UniqueIndexViolationCheck().runCheck(db_connection)

    checkname = 'unique index violation(s)'
    if violations:
        logger.info('[FAIL]  %s' % checkname)
        GV.checkStatus = False
        setError(ERROR_NOREPAIR)
        log_unique_index_violations(violations)
    else:
        logger.info('[OK]  %s' % checkname)

def log_unique_index_violations(violations):
    log_output = ['\n    segment_id | index_name | table_name | column_names']
    log_output.append('    ---------------------------------------------------')

    for violation in violations:
        error_object = getGPObject(violation['table_oid'], violation['table_name'])
        issue = CatUniqueIndexViolationIssue(violation['table_name'],
                                             violation['index_name'])
        error_object.addDuplicateIssue(issue)

        for seg_id in violation['violated_segments']:
            log_output.append('    %s | %s | %s | %s' % (
                seg_id,
                violation['index_name'],
                violation['table_name'],
                violation['column_names'],
            ))

    logger.error('\n'.join(log_output))


# -------------------------------------------------------------------------------
def checkOrphanedToastTables():
    logger.info('-----------------------------------')
    logger.info('Performing check: checking for orphaned TOAST tables')

    db_connection = connect2(GV.cfg[GV.master_dbid], utilityMode=False)
    checker = OrphanedToastTablesCheck()
    check_passed = checker.runCheck(db_connection)

    checkname = 'orphaned toast table(s)'
    if check_passed:
        logger.info('[OK]  %s' % checkname)
    else:
        logger.info('[FAIL]  %s' % checkname)
        GV.checkStatus = False
        setError(ERROR_REMOVE)

        # log raw orphan toast table query results only to the log file
        log_output = ['Orphan toast tables detected for the following scenarios:']
        for issue_type in checker.issue_types():
            log_output.append('\n' + issue_type.header)
            log_output.append('content_id | toast_table_oid | toast_table_name | expected_table_oid | expected_table_name | dependent_table_oid | dependent_table_name')
            log_output.append('-----------+-----------------+------------------+--------------------+---------------------+---------------------+---------------------')
            for row in checker.rows_of_type(issue_type):
                log_output.append('{} | {} | {} | {} | {} | {} | {}'.format(
                    row['content_id'],
                    row['toast_table_oid'],
                    row['toast_table_name'],
                    row['expected_table_oid'],
                    row['expected_table_name'],
                    row['dependent_table_oid'],
                    row['dependent_table_name']))
        log_output = '\n'.join(log_output)
        log_output = log_output + '\n'
        logger.error(log_output)

        # log fix instructions for the orphaned toast tables to stdout and log file
        gplog.log_literal(logger, logging.CRITICAL, checker.get_fix_text())

        # log per-orphan table issue and cause to stdout and log file
        for issue, segments in checker.iterate_issues():
            cat_issue_obj = CatOrphanToastTableIssue(issue.table.oid,
                                                     issue.table.catname,
                                                     issue,
                                                     segments)
            error_object = getGPObject(issue.table.oid, issue.table.catname)
            error_object.addOrphanToastTableIssue(cat_issue_obj)

        do_repair_for_segments(segments_to_repair_statements=checker.add_repair_statements(GV.cfg),
                               issue_type="orphaned_toast_tables",
                               description='Repairing orphaned TOAST tables')


############################################################################
# Help populating repair part for all checked types
# All functions dependent on results stored in global variable GV
############################################################################
name_repair = {
    "segment":
        {
            "description": "Check if needs repairing segment",
            "fn": lambda maybeRemove, catmod_guc, seg: checkSegmentRepair(maybeRemove, catmod_guc, seg)
        },
}

############################################################################
# version = X.X : run this check as part of running all checks on gpdb <= X.X
#                 individually, any check should be ok run on any version
#  online = True: okie to run gpcheckcat online
#   order = X   : the order check should be run when running all checks
############################################################################

all_checks = {
    "unique_index_violation":
        {
            "description": "Check for violated unique indexes",
            "fn": lambda: checkUniqueIndexViolation(),
            "version": "main",
            "order": 1,
            "online": True
        },
    "duplicate":
        {
            "description": "Check for duplicate entries",
            "fn": lambda: checkDuplicateEntry(),
            "version": 'main',
            "order": 2,
            "online": True
        },
    "missing_extraneous":
        {
            "description": "Cross consistency check for missing or extraneous entries",
            "fn": lambda: checkMissingEntry(),
            "version": 'main',
            "order": 3,
            "online": True
        },
    "inconsistent":
        {
            "description": "Cross consistency check for master segment inconsistency",
            "fn": lambda: checkInconsistentEntry(),
            "version": 'main',
            "order": 4,
            "online": True
        },
    "foreign_key":
        {
            "description": "Check foreign keys",
            "fn": lambda: checkForeignKey(),
            "version": 'main',
            "order": 5,
            "online": True
        },
    "acl":
        {
            "description": "Cross consistency check for access control privileges",
            "fn": lambda: checkACL(),
            "version": 'main',
            "order": 6,
            "online": True
        },
    "pgclass":
        {
            "description": "Check pg_class entry that does not have any correspond pg_attribute entry",
            "fn": lambda: checkPGClass(),
            "version": 'main',
            "order": 7,
            "online": False
        },
    "namespace":
        {
            "description": "Check for schemas with a missing schema definition",
            "fn": lambda: checkPGNamespace(),
            "version": 'main',
            "order": 8,
            "online": False
        },
    "distribution_policy":
        {
            "description": "Check constraints on randomly distributed tables",
            "fn": lambda: checkDistribPolicy(),
            "version": 'main',
            "order": 9,
            "online": False
        },
    "dependency":
        {
            "description": "Check for dependency on non-existent objects",
            "fn": lambda: checkDepend(),
            "version": 'main',
            "order": 10,
            "online": False
        },
    "owner":
        {
            "description": "Check table ownership that is inconsistent with the master database",
            "fn": lambda: checkOwners(),
            "version": 'main',
            "order": 11,
            "online": True
        },
    "part_integrity":
        {
            "description": "Check pg_partition branch integrity, partition with oids, partition distribution policy",
            "fn": lambda: checkPartitionIntegrity(),
            "version": 'main',
            "order": 12,
            "online": True
        },
    "part_constraint":
        {
            "description": "Check constraints on partitioned tables",
            "fn": lambda: checkPartitionRegularity(),
            "version": 'main',
            "order": 13,
            "online": True
        },
    "orphaned_toast_tables":
        {
            "description": "Check pg_class and pg_depend for orphaned TOAST tables",
            "fn": checkOrphanedToastTables,
            "version": 'main',
            "order": 14,
            "online": True
        },
    "aoseg_table":
        {
            "description": "Check that vpinfo in aoseg table is consistent with pg_attribute",
            "fn": lambda: checkAOSegVpinfo(),
            "version": 'main',
            "order": 15,
            "online": False 
        }
}


#-------------------------------------------------------------------------------
def listAllChecks():

    myprint('\nList of gpcheckcat tests:\n')
    for name in sorted(all_checks, key=lambda x: all_checks[x]["order"]):
        myprint(" %24s: %s" % \
                (name, all_checks[name]["description"]))
    myprint('')


#-------------------------------------------------------------------------------
def runCheckCatname(catalog_table_obj):

    checks = {'missing_extraneous': lambda: checkTableMissingEntry(catalog_table_obj),
             'inconsistent': lambda: checkTableInconsistentEntry(catalog_table_obj),
             # We assume that everything is passed as a list to checkForeignKey
             'foreign_key': lambda:  checkForeignKey([catalog_table_obj]),
             'duplicate': lambda: checkTableDuplicateEntry(catalog_table_obj),
             'acl': lambda: checkTableACL(catalog_table_obj)}

    for check in sorted(checks):
        myprint("Performing test '%s' for %s" % (check, catalog_table_obj.getTableName()))
        stime = time.time()
        checks[check]()
        etime = time.time()
        elapsed = etime - stime
        GV.elapsedTime += elapsed
        elapsed = str(datetime.timedelta(seconds=elapsed))[:-4]
        GV.totalCheckRun += 1
        myprint("Total runtime for test '%s': %s" % (check, elapsed))


#-------------------------------------------------------------------------------
def runOneCheck(name):

    if GV.opt['-O'] and not all_checks[name]["online"]:
        logger.info("%s: Skip this test in online mode" % (name))
        return
    else:
        myprint("Performing test '%s'" % name)
        GV.totalCheckRun += 1
        GV.checkStatus = True
        stime = time.time()
        all_checks[name]["fn"]()
        etime = time.time()
        elapsed = etime - stime
        GV.elapsedTime += elapsed
        elapsed = str(datetime.timedelta(seconds=elapsed))[:-4]
        myprint("Total runtime for test '%s': %s" % (name, elapsed))
        if GV.checkStatus == False:
            GV.failedChecks.append(name)


#-------------------------------------------------------------------------------
def runAllChecks(run_tests):
    '''
    perform catalog check for specified database
    '''
    for name in sorted(all_checks, key=lambda x: all_checks[x]["order"]):
        if all_checks[name]["version"] >= GV.version and name in run_tests:
            runOneCheck(name)

    closeDbs()
    logger.info("------------------------------------")
    fixes = (len(GV.Remove) > 0 or
             len(GV.AdjustConname) > 0 or
             len(GV.DemoteConstraint) > 0 or
             len(GV.ReSync) > 0)
    if GV.opt['-g'] != None and fixes:
        repair_obj = Repair(context=GV)
        repair_dir_path = repair_obj.create_repair_dir()
        logger.debug('Building catalog repair scripts')
        # build a script to run these files
        script = ''
        catmod_guc = "-c allow_system_table_mods=true"

        # don't remove anything if ReSync possible --
        # comment out the delete statements from the repair scripts
        maybeRemove = ""
        if len(GV.ReSync):
            maybeRemove = "# "

        # use the per-dbid loop for removal and for contraint
        # name adjustments

        dbids = set(GV.Remove.keys())
        dbids = dbids.union(GV.AdjustConname.keys())
        dbids = dbids.union(GV.DemoteConstraint.keys())

        for seg in dbids:
            script += '\n%secho "Repairing segment %i"\n' % (maybeRemove, seg)
            segment_repair_script = name_repair["segment"]["fn"](maybeRemove, catmod_guc, seg)
            script += segment_repair_script

        repair_obj.append_content_to_bash_script(repair_dir_path, script)

        print_repair_issues(repair_dir_path)

    if GV.retcode >= ERROR_NOREPAIR:
        logger.warn('[WARN]: unable to generate repairs for some issues')
    logger.info("Check complete")


def do_repair(sql_repair_contents, issue_type, description):
    logger.info("Starting repair of %s with %d issues" % (description, len(sql_repair_contents)))
    if len(sql_repair_contents) == 0:
        return

    repair_dir_path = ""
    try:
        repair_obj = Repair(context=GV, issue_type=issue_type, desc=description)
        repair_dir_path = repair_obj.create_repair(sql_repair_contents=sql_repair_contents)
    except Exception, ex:
        logger.fatal(str(ex))

    print_repair_issues(repair_dir_path)

def do_repair_for_segments(segments_to_repair_statements, issue_type, description):
    logger.info("Starting repair of %s" % description)

    repair_dir_path = ""
    try:
        repair_obj = Repair(context=GV, issue_type=issue_type, desc=description)
        repair_dir_path = repair_obj.create_segment_repair_scripts(segments_to_repair_statements)
    except Exception, ex:
        logger.fatal(str(ex))

    print_repair_issues(repair_dir_path)


def do_repair_for_extra(catalog_issues):
    setError(ERROR_REMOVE)
    repair_dir_path = ""
    for (catalog_table_obj, pk_name), issues in catalog_issues.iteritems():
        try:
            repair_obj = Repair(context=GV, issue_type="extra", desc="Removing extra entries")
            repair_dir_path = repair_obj.create_repair_for_extra_missing(catalog_table_obj=catalog_table_obj,
                                                                         issues=issues,
                                                                         pk_name=pk_name,
                                                                         segments=GV.cfg)
        except Exception, ex:
            logger.fatal(str(ex))

    print_repair_issues(repair_dir_path)


def print_repair_issues(repair_dir_path):
    myprint('')
    myprint("catalog issue(s) found , repair script(s) generated in dir {0}".format(repair_dir_path))
    myprint('')


def checkReindexRepair():
    if len(GV.Reindex) > 0:
        # dbname.type.timestamp.sql
        filename = '%s.%s.%s.sql' % (GV.dbname, "reindex", GV.timestamp)
        fullpath = '%s/%s' % (GV.opt['-g'], filename)
        try:
            file = open(fullpath, 'w')
        except Exception, e:
            logger.fatal('Unable to create file "%s": %s' % (fullpath, str(e)))
            sys.exit(1)

        description = '\necho "Reindexing potentially damaged bitmap indexes"\n'
        for r in GV.Reindex:
            file.write('REINDEX INDEX %s;\n' % r)
        file.close()
        return description, filename
    else:
        return None, None

def checkConstraintsRepair():
    do_repair(GV.Constraints, issue_type="removeconstraints", description='Dropping invalid unique constraints')

def checkOwnersRepair():
    # Previously we removed duplicate statements from the "Owners" list
    # by casting it through a "list(set(...))", however this does not work
    # since the order of items in the list is important for correct results.
    do_repair(GV.Owners, issue_type="fixowner", description='Correcting table ownership')

def checkPoliciesRepair():
    # changes to distribution policies
    do_repair(GV.Policies, issue_type="fixdistribution", description='Fixing distribution policies')

def checkSegmentRepair(maybeRemove, catmod_guc, seg):
    # dbid.host.port.dbname.timestamp.sql
    c = GV.cfg[seg]
    filename = '%i.%s.%i.%s.%s.sql' % (seg, c['hostname'], c['port'], GV.dbname, GV.timestamp)
    filename = filename.replace(' ', '_')
    fullpath = '%s/%s' % (GV.opt['-g'], filename)
    try:
        file = open(fullpath, 'w')
    except Exception, e:
        logger.fatal('Unable to create file "%s": %s' % (fullpath, str(e)))
        sys.exit(1)

    # unique
    lines = set()
    if GV.Remove.has_key(seg):#foreign_key check
        lines = lines.union(GV.Remove[seg])
    if GV.AdjustConname.has_key(seg):#part_constraint
        lines = lines.union(GV.AdjustConname[seg])
    if GV.DemoteConstraint.has_key(seg):
        lines = lines.union(GV.DemoteConstraint[seg])

    # make sure it is sorted
    li = list(lines)
    li.sort()
    file.write('BEGIN;\n')
    for line in li:
        file.write(line + '\n')
    file.write('COMMIT;\n')
    file.close()

    run_psql_script = '''{maybe}env PGOPTIONS="-c gp_session_role=utility {guc}" psql -X -a -h {hostname} -p {port} -f {fname} "{dbname}" > {fname}.out'''.format(
        maybe=maybeRemove, guc=catmod_guc,
        hostname=c['hostname'],
        port=c['port'],
        fname=filename,
        dbname=GV.dbname)

    return run_psql_script


# -------------------------------------------------------------------------------
def getCatalog():
    # Establish a connection to the master & looks up info in the catalog
    db = connect2(GV.cfg[GV.master_dbid], utilityMode=False)
    return GPCatalog(db)


# -------------------------------------------------------------------------------
def getReportConfiguration():
    cfg = {}
    for _, each in GV.cfg.iteritems():
        cfg[each['content']] = {'segname': "content " + str(each['content']),
                                'hostname': each['hostname'],
                                'port': each['port']}
    return cfg


# ===============================================================================
# GPCHECKCAT REPORT
# ===============================================================================

GPObjects = {}  # key = (catname, oid), value = GPObject
GPObjectGraph = {}  # key = GPObject, value=[GPObject]
# -------------------------------------------------------------------------------


# TableMainColumn[catname] = [colname, catname2]
#    - report this catname's issues as part of catname2
#    - catname.colname references catname2.oid
# -------------------------------------------------------------------------------
TableMainColumn = {}
# Table with no OID
TableMainColumn['gp_version_at_initdb'] = ['schemaversion', 'gp_version_at_initdb']
TableMainColumn['pg_aggregate'] = ['aggfnoid', 'pg_proc']
TableMainColumn['pg_amop'] = ['amopclaid', 'pg_opclass']
TableMainColumn['pg_amproc'] = ['amopclaid', 'pg_opclass']
TableMainColumn['pg_appendonly'] = ['relid', 'pg_class']
TableMainColumn['pg_attribute'] = ['attrelid', 'pg_class']
TableMainColumn['pg_attribute_encoding'] = ['attrelid', 'pg_class']
TableMainColumn['pg_auth_members'] = ['roleid', 'pg_authid']
TableMainColumn['pg_autovacuum'] = ['vacrelid', 'pg_class']
TableMainColumn['pg_depend'] = ['objid', 'pg_class']
TableMainColumn['pg_exttable'] = ['reloid', 'pg_class']
TableMainColumn['pg_foreign_table'] = ['reloid', 'pg_class']
TableMainColumn['pg_index'] = ['indexrelid', 'pg_class']
TableMainColumn['pg_inherits'] = ['inhrelid', 'pg_class']
TableMainColumn['pg_largeobject'] = ['loid', 'pg_largeobject']
TableMainColumn['pg_pltemplate'] = ['tmplname', 'pg_pltemplate']
TableMainColumn['pg_proc_callback'] = ['profnoid', 'pg_proc']
TableMainColumn['pg_type_encoding'] = ['typid', 'pg_type']
TableMainColumn['pg_window'] = ['winfnoid', 'pg_proc']
TableMainColumn['gp_distribution_policy'] = ['localoid', 'pg_class']

# Table with OID (special case), these OIDs are known to be inconsistent
TableMainColumn['pg_attrdef'] = ['adrelid', 'pg_class']
TableMainColumn['pg_constraint'] = ['conrelid', 'pg_class']
TableMainColumn['pg_trigger'] = ['tgrelid', 'pg_class']
TableMainColumn['pg_rewrite'] = ['ev_class', 'pg_class']

# Cast OID alias type to OID since our graph of objects is based on OID
# int2vector is also casted to int2[] due to the lack of an <(int2vector, int2vector) operator
autoCast = {
    'regproc': '::oid',
    'regprocedure': '::oid',
    'regoper': '::oid',
    'regoperator': '::oid',
    'regclass': '::oid',
    'regtype': '::oid',
    'int2vector': '::int2[]'
}


# -------------------------------------------------------------------------------
class QueryException(Exception): pass


# -------------------------------------------------------------------------------
# Get gpObj from GPObjects, instantiate a new one & add to GPObjects if not found
def getGPObject(oid, catname):
    gpObj = GPObjects.get((oid, catname), None)
    if gpObj is None:
        if catname == 'pg_class':
            gpObj = RelationObject(oid, catname)
        else:
            gpObj = GPObject(oid, catname)
        GPObjects[(oid, catname)] = gpObj
    return gpObj


# -------------------------------------------------------------------------------
def getOidFromPK(catname, pkeys):
    # pkeys: name-value pair
    pkeystrList = ["%s='%s'" % (key, value) for key, value in pkeys.iteritems()]
    pkeystr = ' and '.join(pkeystrList)

    qry = """
          SELECT oid
          FROM (
                SELECT oid FROM {catname}
                WHERE {pkeystr}
                UNION ALL
                SELECT oid FROM gp_dist_random('{catname}')
                WHERE {pkeystr}
          ) alloids
          GROUP BY oid
          ORDER BY count(*) desc, oid
          """.format(catname=catname,
                     pkeystr=pkeystr)

    try:
        db = connect2(GV.cfg[GV.master_dbid], utilityMode=False)
        curs = db.query(qry)
        if (len(curs.dictresult()) == 0):
            raise QueryException("No such entry '%s' in %s" % (pkeystr, catname))

        return curs.dictresult().pop()['oid']

    except Exception, e:
        setError(ERROR_NOREPAIR)
        myprint('  Execution error: ' + str(e))
        myprint(qry)


# -------------------------------------------------------------------------------
def getClassOidForRelfilenode(relfilenode):
    qry = "SELECT oid FROM pg_class WHERE relfilenode = %d;" % (relfilenode)
    try:
        dburl = dbconn.DbURL(hostname=GV.opt['-h'], port=GV.opt['-p'], dbname=GV.dbname)
        conn = dbconn.connect(dburl)
        oid = dbconn.execSQLForSingletonRow(conn, qry)[0]
        return oid
    except Exception, e:
        setError(ERROR_NOREPAIR)
        myprint('  Execution error: ' + str(e))


# -------------------------------------------------------------------------------
def getResourceTypeOid(oid):
    qry = """
          SELECT oid
          FROM (
               SELECT oid FROM pg_resourcetype WHERE restypid = %d
               UNION ALL
               SELECT oid FROM gp_dist_random('pg_resourcetype')
                WHERE restypid = %d
          ) alloids
          GROUP BY oid ORDER BY count(*) desc LIMIT 1
          """ % (oid, oid)

    try:
        db = connect()
        curs = db.query(qry)
        if len(curs.dictresult()) == 0: return 0
        return curs.dictresult().pop()['oid']
    except Exception, e:
        setError(ERROR_NOREPAIR)
        myprint('  Execution error: ' + str(e))


#-------------------------------------------------------------------------------
# Process results of checks (CC and FK) for a particular catalog:
#   - Categorize entry into GPObject
#   - Instantiate GPObject if needed
# -------------------------------------------------------------------------------
def processMissingDuplicateEntryResult(catname, colname, allValues, type):
    # type = {"missing" | "duplicate"}
    '''
    colname:       proname | pronamespace | proargtypes | segids
    allValues:         add | 2200         | 23 23       | {2,3}
                     scube | 2200         | 1700        | {-1,0,1,3}
               scube_accum | 2200         | 1700 1700   | {-1,0,1,3}

    colname:        oid | total | segids
    allValues:    18853 | 2     | {-1,1}
                  18853 | 3     | {0}
    '''
    gpObjName = catname
    gpColName = None
    pknames = [i for i in colname[:-1]]  # Everything except the last column

    # This catname may not be a GPObject (e.g. pg_attribute)
    # If these catnames have oids, they are known to be inconsistent
    if catname in TableMainColumn:
        gpColName = TableMainColumn[catname][0]
        gpObjName = TableMainColumn[catname][1]
    elif 'oid' in pknames:
        gpColName = 'oid'

    # Process entries
    for row in allValues:
        rowObjName = gpObjName
        segids = row[-1]
        pkeys = dict((pk, row[colname.index(pk)]) for pk in pknames)

        if gpColName != None:
            oid = row[colname.index(gpColName)]
        else:
            oid = getOidFromPK(catname, pkeys)
            pkeys = {'oid': oid}

        # Special case: constraint for domain, report pg_type.contypid
        if catname == 'pg_constraint' and oid == 0:
            oid = row[colname.index('contypid')]
            rowObjName = 'pg_type'

        gpObj = getGPObject(oid, rowObjName)

        if type == "missing":

            # In how many segments (counting master as segment -1) was the
            # entry present?
            #
            # If it was present in half or more, then report the ones that
            # it was *not* present as 'missing'. If it was present in half
            # or less, then report the ones that it was present as 'extra'
            # Note that if it was present in exactly half of the segments,
            # we will report it as both missing and extra.

            ids = [int(i) for i in segids[1:-1].split(',')]

            if len(ids) <= (GV.max_content + 2) / 2:
                issue = CatMissingIssue(catname, pkeys, segids, 'extra')
                gpObj.addMissingIssue(issue)

            if len(ids) >= (GV.max_content + 2) / 2:
                allids = [int(i) for i in range(-1, GV.max_content+1)]
                diffids = set(allids) - set(ids)
                # convert the difference set back into a string that looks
                # like a PostgreSQL array.
                segids = ",".join(str(x) for x in diffids)
                segids = "{%s}" % (segids)
                issue = CatMissingIssue(catname, pkeys, segids, 'missing')
                gpObj.addMissingIssue(issue)

        else:
            assert (type == "duplicate")
            issue = CatDuplicateIssue(catname, pkeys, segids, row[-2])
            gpObj.addDuplicateIssue(issue)


# -------------------------------------------------------------------------------
def processInconsistentEntryResult(catname, pknames, colname, allValues):
    # If tableHasInconsistentOid, columns does not have oid
    '''
    17365 | test10 | 2200 | 17366 | 0 | 0 | 17366 | 0 | 0 | 0 | f | r | h | 3 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | {0}     -- row1
    17365 | test10 | 2200 | 17366 | 0 | 0 | 0     | 0 | 0 | 0 | f | r | h | 2 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | {NULL}  -- row2

    176954 | test1 | 2200 | 176955 | 0 | 0 | 176956 | 0 | 0 | 0 | f | r | h | 4 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | {-1}
    176954 | test1 | 2200 | 176955 | 0 | 0 | 176956 | 0 | 0 | 0 | f | r | h | 2 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | {0,1}
    176954 | test1 | 2200 | 176955 | 0 | 0 | 176956 | 0 | 0 | 0 | f | r | h | 3 | 0 | 0 | 0 | 0 | 0 | f | f | f | f | None | {2,3}
    '''

    # Group allValues rows by its key (oid or primary key) into a dictionary:
    groupedValues = {}
    for row in allValues:
        if 'oid' in colname:
            keys = row[colname.index('oid')]
        else:
            keys = tuple((row[colname.index(n)]) for n in pknames)
        if keys in groupedValues:
            groupedValues[keys].append(row)
        else:
            groupedValues[keys] = [row]

    gpObjName = catname
    gpColName = None

    # This catname may not be a GPObject (e.g. pg_attribute)
    # If these catnames have oids, they are known to be inconsistent
    if catname in TableMainColumn:
        gpColName = TableMainColumn[catname][0]
        gpObjName = TableMainColumn[catname][1]
    elif 'oid' in colname:
        gpColName = 'oid'

    for keys, values in groupedValues.iteritems():
        rowObjName = gpObjName
        pkeys = dict((n, values[0][colname.index(n)]) for n in pknames)

        if gpColName != None:
            oid = values[0][colname.index(gpColName)]
        else:
            oid = getOidFromPK(catname, pkeys)
        issue = CatInconsistentIssue(catname, colname, list(values))

        # Special case: constraint for domain, report pg_type.contypid
        if catname == 'pg_constraint' and oid == 0:
            oid = values[0][colname.index('contypid')]
            rowObjName = 'pg_type'

        gpObj = getGPObject(oid, rowObjName)
        gpObj.addInconsistentIssue(issue)


# -------------------------------------------------------------------------------
def processForeignKeyResult(catname, pkcatname, colname, allValues):
    '''
    colname:    pg_attribute_attrelid | pg_attribute_attname | missing_catalog | present_key | pg_class_oid | segids
    allValues:  None                  | None                 | pg_attribute    | oid         | 190683       | {-1}
    or
    allValues:  12344                 | foo                  | pg_class        | attrelid    | 190683       | {-1}
    '''

    gpObjName = pkcatname
    gpColName = colname[-2].rsplit('_', 1)[1]
    fkeytab = catname
    # The foreign keys are all entries except the last 4
    # NOTE: We can't assume the first few rows are the expected issues anymore...
    fkeylist = [name.rsplit('_', 1)[1] for name in colname[:-4]]

    # This catname may not be a GPObject (e.g. pg_attribute)
    #    1. pg_attrdef(adrelid) referencing pg_attribute(attrelid)
    #    2. pg_rewrite(ev_class) referencing pg_attribute(attrelid)
    if pkcatname in TableMainColumn:
        if gpColName == TableMainColumn[catname][0]:
            gpObjName = TableMainColumn[catname][1]

    # Process each row
    for row in allValues:
        rowObjName = gpObjName
        segids = row[-1]

        oid = row[len(colname) - 2]
        missing_cat = row[-4]

        # Build fkeys dict: used to find oid of pg_class
        fkeys = dict((n, row[colname.index(catname + '_' + n)]) for n in fkeylist)

        # Must get third column from end since fkeylist length can vary
        if not fkeys:
            fkeys = row[-3]

        # We are flipping the values of the primary key and the foreign key
        # depending on the missing catalog, since we want to print out the
        # missing catalog at the front of the stderr
        existing_cat = pkcatname
        if missing_cat == pkcatname:
            existing_cat = fkeytab

        missing_pkeys={gpColName: oid}
        existing_fkeys=fkeys
        if catname == missing_cat:
            missing_pkeys=fkeys
            existing_fkeys={gpColName: oid}

        issue = CatForeignKeyIssue(catname=missing_cat, pkeys=missing_pkeys,
                                   segids=segids, fcatname=existing_cat, fkeys=existing_fkeys)

        # Special cases:
        #    1. pg_class(oid) referencing pg_type(oid) - relation & composite type
        #    2. pg_resqueuecapability(restypid) referencing pg_resourcetype(restypid)
        if pkcatname == 'pg_type' and fkeytab == 'pg_class':
            rowObjName = 'pg_class'
            oid = getOidFromPK(rowObjName, fkeys)
        elif pkcatname == 'pg_resourcetype' and gpColName == 'restypid':
            oid = getResourceTypeOid(oid)
        gpObj = getGPObject(oid, rowObjName)
        gpObj.addForeignKeyIssue(issue)

def getPrimaryKeyColumn(catname, pknames):
    # We have to look up key to report (pg_pltemplate)
    gpObjName = catname
    gpColName = None
    if catname in TableMainColumn:
        gpColName = TableMainColumn[catname][0]
        gpObjName = TableMainColumn[catname][1]
    elif 'oid' in pknames:
        gpColName = 'oid'
    return gpObjName, gpColName

# -------------------------------------------------------------------------------
def processACLResult(catname, colname, allValues):
    '''
    colname:	segid | datname | master_acl | segment_acl
    allValues:		2 | acldb | {=Tc/nmiller,nmiller=CTc/nmiller,person1=C/nmiller} | {=Tc/nmiller,nmiller=CTc/nmiller}
    				1 | gptest | None | {=Tc/nmiller,nmiller=CTc/nmiller,person2=CTc/nmiller}
    '''

    pknames = [i for i in colname[1:-2]]

    gpObjName, gpColName = getPrimaryKeyColumn(catname, pknames)

    # Process entries
    for row in allValues:
        segid = row[0]
        pkeys = dict((pk, row[colname.index(pk)]) for pk in pknames)

        if gpColName != None:
            oid = row[colname.index(gpColName)]
        else:
            oid = getOidFromPK(catname, pkeys)
            pkeys = {'oid': oid}

        macl = row[-2][1:-1].split(',') if row[-2] != None else []
        sacl = row[-1][1:-1].split(',') if row[-1] != None else []
        macl_only = list(set(macl).difference(sacl))
        sacl_only = list(set(sacl).difference(macl))

        issue = CatACLIssue(catname, pkeys, segid, macl_only, sacl_only)
        gpObj = getGPObject(oid, gpObjName)
        gpObj.addACLIssue(issue)


# -------------------------------------------------------------------------------
class CatInconsistentIssue:
    def __init__(self, catname, columns, rows):

        self.catname = catname
        self.columns = columns
        self.rows = rows

    def report(self):

        def format_segids(segids):
            if segids == '{NULL}':
                idstr = 'all other segments'
            else:
                ids = [int(i) for i in segids[1:-1].split(',')]
                idstr = ''
                for i in ids:
                    idstr += '%s (%s:%d) ' % \
                             (GV.report_cfg[i]['segname'], GV.report_cfg[i]['hostname'],
                              GV.report_cfg[i]['port'])
            return idstr

        for i in range(0, len(self.columns) - 1):
            colset = set(j[i] for j in self.rows)
            if len(colset) > 1:
                for row in self.rows:
                    myprint("%20s is '%s' on %s" % (self.columns[i], row[i], format_segids(row[-1])))


# -------------------------------------------------------------------------------
class CatForeignKeyIssue:
    def __init__(self, catname, pkeys, segids, fcatname, fkeys):

        self.catname = catname
        self.pkeys = pkeys  # string of pkey=xxxx or oid=xxxx
        self.segids = segids  # list of affected segids
        self.fcatname = fcatname  # checked object catname
        self.fkeys = fkeys  # string of fkeys=xxxx

    def report(self):

        ids = [int(i) for i in self.segids[1:-1].split(',')]
        idstr = ''
        if len(ids) == len(GV.report_cfg):
            idstr = 'all segments'
        else:
            for i in ids:
                idstr += '%s (%s:%d) ' % \
                         (GV.report_cfg[i]['segname'], GV.report_cfg[i]['hostname'],
                          GV.report_cfg[i]['port'])

        myprint("        No %s %s entry for %s %s on %s" \
                % (self.catname, self.pkeys, self.fcatname, self.fkeys, idstr))


# -------------------------------------------------------------------------------
class CatMissingIssue:
    def __init__(self, catname, pkeys, segids, type):

        self.catname = catname
        self.pkeys = pkeys  # string of pkey=xxxx or oid=xxxx
        self.segids = segids  # list of affected segids
        self.type = type  # missing or extra
        assert (self.type in ['missing', 'extra'])

    def report(self):

        ids = [int(i) for i in self.segids[1:-1].split(',')]
        idstr = ''
        if len(ids) == len(GV.report_cfg):
            idstr = 'all segments'
        else:
            for i in ids:
                idstr += '%s (%s:%d) ' % \
                         (GV.report_cfg[i]['segname'], GV.report_cfg[i]['hostname'],
                          GV.report_cfg[i]['port'])

        if self.catname == 'pg_attribute':
            myprint("        %s column '%s' on %s" \
                    % (self.type.capitalize(), self.pkeys['attname'], idstr))
        elif self.catname == 'pg_class':
            myprint('        %s relation metadata for %s on %s' \
                    % (self.type.capitalize(), str(self.pkeys), idstr))
        else:
            myprint('        %s %s metadata of %s on %s' \
                    % (self.type.capitalize(), self.catname[3:], str(self.pkeys), idstr))


# -------------------------------------------------------------------------------
class CatDuplicateIssue:
    def __init__(self, catname, pkeys, segids, enum):
        self.catname = catname
        self.pkeys = pkeys  # string of pkey=xxxx or oid=xxxx
        self.segids = segids  # list of affected segids
        self.enum = enum  # number of entries

    def report(self):
        """ Found # catname entries of "oid=XXXXX" on segment {Y,Z} """
        myprint('       Found %d %s entries of %s on segment %s' \
                % (self.enum, self.catname, str(self.pkeys), str(self.segids)))


# -------------------------------------------------------------------------------
class CatACLIssue:
    def __init__(self, catname, pkeys, segid, macl_only, sacl_only):

        self.catname = catname
        self.pkeys = pkeys  # string of pkey=xxxx or oid=xxxx
        self.segid = segid  # the affected segid
        self.macl_only = macl_only  # acl appears only on master
        self.sacl_only = sacl_only  # acl appears only on segment

    def report(self):
        """ Master (host:port) and seg# (host:port) have different ACL:
                Exist(s) on master only: [...........]
                Exist(s) on   seg# only: [...........]
        """
        mstr = 'Master (%s:%s)' % (GV.report_cfg[-1]['hostname'], GV.report_cfg[-1]['port'])
        sstr = '%s (%s:%s)' % (GV.report_cfg[self.segid]['segname'],
                               GV.report_cfg[self.segid]['hostname'],
                               GV.report_cfg[self.segid]['port'])
        myprint('        %s and %s have different ACLs:' % (mstr, sstr))
        if len(self.macl_only) > 0:
            myprint('            Exist(s) on master only: %s' % (self.macl_only))
        if len(self.sacl_only) > 0:
            myprint('            Exist(s) on %s only: %s ' % \
                    (GV.report_cfg[self.segid]['segname'], self.sacl_only))

# -------------------------------------------------------------------------------
class CatUniqueIndexViolationIssue:
    def __init__(self, table_name, index_name):
        self.catname = table_name
        self.index_name = index_name

    def report(self):
        myprint(
            '    Table %s has a violated unique index: %s'
            % (self.catname, self.index_name)
        )

class CatDependencyIssue:
    def __init__(self, table_name, oid, content):
        self.catname = table_name
        self.oid = oid
        self.content = content

    def report(self):
        myprint(
            '    Table %s has a dependency issue on oid %s at content %s'
            % (self.catname, self.oid, self.content)
        )

# -------------------------------------------------------------------------------
class CatOrphanToastTableIssue:
    def __init__(self, oid, catname, issue, segments):
        self.oid = oid
        self.catname = catname
        self.issue = issue
        self.segments = segments

    def report(self):
        myprint('%s' % self.issue.description)
        myprint('''On segment(s) %s table '%s' (oid: %s) %s''' % (', '.join(map(str, sorted(self.segments))), self.catname, self.oid, self.issue.cause))

# -------------------------------------------------------------------------------
class GPObject:
    def __init__(self, oid, catname):
        self.oid = oid
        self.catname = catname
        self.missingIssues = {}  # key=issue.catname, value=list of catMissingIssue
        self.inconsistentIssues = {}  # key=issue.catname, value=list of catInconsistentIssue
        self.duplicateIssues = {}  # key=issue.catname, value=list of catDuplicateIssue
        self.aclIssues = {}  # key=issue.catname, value=list of catACLIssue
        self.foreignkeyIssues = {}  # key=issue.catname, value=list of catForeignKeyIssue
        self.dependencyIssues = {} # key=issue.catname, value=list of catDependencyIssue
        self.orphanToastTableIssues = {} # key=issue.catname, value=list of orphanToastTableIssues

    def addDependencyIssue(self, issue):
        if issue.catname in self.dependencyIssues:
            self.dependencyIssues[issue.catname].append(issue)
        else:
            self.dependencyIssues[issue.catname] = [issue]

    def addMissingIssue(self, issue):
        if issue.catname in self.missingIssues:
            self.missingIssues[issue.catname].append(issue)
        else:
            self.missingIssues[issue.catname] = [issue]

    def addInconsistentIssue(self, issue):
        if issue.catname in self.inconsistentIssues:
            self.inconsistentIssues[issue.catname].append(issue)
        else:
            self.inconsistentIssues[issue.catname] = [issue]

    def addDuplicateIssue(self, issue):
        if issue.catname in self.duplicateIssues:
            self.duplicateIssues[issue.catname].append(issue)
        else:
            self.duplicateIssues[issue.catname] = [issue]

    def addACLIssue(self, issue):
        if issue.catname in self.aclIssues:
            self.aclIssues[issue.catname].append(issue)
        else:
            self.aclIssues[issue.catname] = [issue]

    def addForeignKeyIssue(self, issue):
        if issue.catname in self.foreignkeyIssues:
            self.foreignkeyIssues[issue.catname].append(issue)
        else:
            self.foreignkeyIssues[issue.catname] = [issue]

    def addOrphanToastTableIssue(self, issue):
        if issue.catname in self.orphanToastTableIssues:
            self.orphanToastTableIssues[issue.catname].append(issue)
        else:
            self.orphanToastTableIssues[issue.catname] = [issue]

    def isTopLevel(self):
        return True

    def reportAllIssues(self):
        """
        We want to prevent oids from certain catalog tables to appear on
        stdout. Hence we create a separate function for myprint which
        will log all the oid inconsistencies as WARNING instead of
        CRITICAL in order to log it in the log file.
        """

        def __myprint(string):
            if self.catname in ['pg_constraint']:
                _myprint(string, logging.WARNING)
            else:
                _myprint(string)

        global myprint
        myprint = __myprint

        if self.__class__ == GPObject:
            myprint('')
            myprint('----------------------------------------------------')
            myprint('Object oid: %s' % (self.oid))
            myprint('Table name: %s' % (self.catname))
            myprint('')

        # Report dependency issues
        if len(self.dependencyIssues):
            for catname, issues in self.dependencyIssues.iteritems():
                myprint('    Name of test which found this issue: dependency_%s' % catname)
                for each in issues:
                    each.report()
            myprint('')

        # Report inconsistent issues
        if len(self.inconsistentIssues):
            for catname, issues in self.inconsistentIssues.iteritems():
                myprint('    Name of test which found this issue: inconsistent_%s' % catname)
                for each in issues:
                    each.report()
            myprint('')

        # Report missing issues
        if len(self.missingIssues):
            omitlist = ['pg_attribute', 'pg_attribute_encoding', 'pg_type', 'pg_appendonly', 'pg_index']
            if 'pg_class' in self.missingIssues:
                myprint('    Name of test which found this issue: missing_extraneous_pg_class')
                for name in omitlist:
                    if name in self.missingIssues:
                        myprint('    Name of test which found this issue: missing_extraneous_%s' % name)
                for each in self.missingIssues['pg_class']:
                    each.report()

                for catname, issues in self.missingIssues.iteritems():
                    if catname != 'pg_class' and catname not in omitlist:
                        myprint('    Name of test which found this issue: missing_extraneous_%s' % catname)
                        for each in issues:
                            each.report()
            else:
                for catname, issues in self.missingIssues.iteritems():
                    myprint('    Name of test which found this issue: missing_extraneous_%s' % catname)
                    for each in issues:
                        each.report()
            myprint('')



        # Report foreign key issues
        if len(self.foreignkeyIssues):
            for catname, issues in self.foreignkeyIssues.iteritems():
                myprint('    Name of test which found this issue: foreign_key_%s' % catname)
                for each in issues:
                    each.report()
            myprint('')

        # Report duplicate issues
        if len(self.duplicateIssues):
            for catname, issues in self.duplicateIssues.iteritems():
                myprint('    Name of test which found this issue: duplicate_%s' % catname)
                for each in issues:
                    each.report()
            myprint('')

        # Report ACL issues
        if len(self.aclIssues):
            for catname, issues in self.aclIssues.iteritems():
                myprint('    Name of test which found this issue: acl_%s' % catname)
                for each in issues:
                    each.report()
            myprint('')

        # Report Orphan Toast Table issues
        if len(self.orphanToastTableIssues):
            for catname, issues in self.orphanToastTableIssues.iteritems():
                for each in issues:
                    each.report()
            myprint('')

        myprint = _myprint

        # Collect all tables with missing issues for later reporting
        if len(self.missingIssues):
            db = connect2(GV.cfg[GV.master_dbid], utilityMode=False)
            oid_query = "select (select nspname from pg_namespace where oid=relnamespace) || '.' || relname from pg_class where oid=%d"
            type_query = "select (select nspname from pg_namespace where oid=relnamespace) || '.' || relname from pg_class where reltype=%d"
            for issues in self.missingIssues.values() :
                for issue in issues:
                    # Get schemaname.tablename corresponding to oid
                    for key in issue.pkeys:
                        if 'relid' in key or key in ['ev_class', 'reloid']:
                            table_list = db.query(oid_query % issue.pkeys[key]).getresult()
                            if table_list:
                                if issue.type == 'missing':
                                    GV.missing_attr_tables.append( (table_list[0][0], issue.segids) )
                                else:
                                    GV.extra_attr_tables.append( (table_list[0][0], issue.segids) )
                        elif key == 'oid':
                            table_list = db.query(type_query % issue.pkeys[key]).getresult()
                            if table_list:
                                if issue.type == 'missing':
                                    GV.missing_attr_tables.append( (table_list[0][0], issue.segids) )
                                else:
                                    GV.extra_attr_tables.append( (table_list[0][0], issue.segids) )


    def __cmp__(self, other):
        if isinstance(other, GPObject):
            return cmp((self.oid, self.catname), (other.oid, other.catname))
        else:
            return NotImplemented

    def __hash__(self):
        return hash((self.oid, self.catname))


# -------------------------------------------------------------------------------
class RelationObject(GPObject):
    def __init__(self, oid, catname):
        GPObject.__init__(self, oid, catname)
        self.relname = None
        self.nspname = None
        self.relkind = None
        self.paroid = None

    def reportAllIssues(self):
        oid = 'N/A' if self.oid is None else str(self.oid)
        nspname = 'N/A' if self.nspname is None else self.nspname
        relname = 'N/A' if self.relname is None else self.relname

        if self.isTopLevel():
            myprint('')
            myprint('----------------------------------------------------')

            objname = 'Type' if self.relkind == 'c' else 'Relation'
            myprint('%s oid: %s' % (objname, oid))
            myprint('%s schema: %s' % (objname, nspname))
            myprint('%s name: %s' % (objname, relname))

        else:
            myprint('    Sub-object: ')
            myprint('    ----------------------------------------------------')
            myprint('    Relation oid: %s' % oid)
            myprint('    Relation schema: %s' % nspname)
            myprint('    Relation name: %s' % relname)
        myprint('')
        GPObject.reportAllIssues(self)

    def setRelInfo(self, relname, nspname, relkind, paroid):
        self.relname = relname
        self.nspname = nspname
        self.relkind = relkind
        self.paroid = paroid

    def isTopLevel(self):
        if self.relkind == 'i' or self.relkind == 't' or self.relkind == 'o':
            return False
        return True


# -------------------------------------------------------------------------------
def getRelInfo(objects):
    # Get relinfo: relname, relkind, par oid for all oids
    #   get relname, relkind
    #   left outer join with union of
    #    - get par oid for toast
    #    - get par oid for aoco
    #    - get par oid for index
    # for each query: if there is inconsistency, majority wins

    if objects == {}: return

    oids = [oid for (oid, catname) in objects if catname == 'pg_class']
    if oids == [] or None in oids: return

    qry = """
          SELECT oid, relname, nspname, relkind, paroid
          FROM (
             SELECT oid, relname, nspname, relkind
             FROM (
               SELECT oid, relname, nspname, relkind, rank() over (partition by oid order by count(*) desc)
               FROM
               (
                 SELECT c.oid, c.relname, c.relkind, n.nspname
                 FROM   pg_class c
                        left outer join pg_namespace n
                        on (c.relnamespace = n.oid)
                 WHERE  c.oid in ({oids})
                 UNION ALL
                 SELECT c.oid, c.relname, c.relkind, n.nspname
                 FROM   gp_dist_random('pg_class') c
                        left outer join gp_dist_random('pg_namespace') n
                        on (c.relnamespace = n.oid and c.gp_segment_id = n.gp_segment_id)
                 WHERE  c.oid in ({oids})
               ) allrelinfo
               GROUP BY oid, relname, nspname, relkind
             ) relinfo
             WHERE rank=1
         ) relinfo

         LEFT OUTER JOIN

         (
            SELECT reltoastrelid as childoid, oid as paroid
            FROM (
              SELECT reltoastrelid, oid, rank() over (partition by reltoastrelid order by count(*) desc)
                FROM
                  ( SELECT reltoastrelid, oid FROM pg_class
                    WHERE reltoastrelid in ({oids})
                    UNION ALL
                    SELECT reltoastrelid, oid FROM gp_dist_random('pg_class')
                    WHERE reltoastrelid in ({oids})
                  ) allpar_toast
               GROUP BY reltoastrelid, oid
            ) par_toast
            WHERE rank=1

            UNION ALL

            SELECT segrelid as childoid, relid as paroid
            FROM (
               SELECT segrelid, relid, rank() over (partition by segrelid order by count(*) desc)
                FROM
                 ( SELECT segrelid, relid FROM pg_appendonly
                   WHERE segrelid in ({oids})
                   UNION ALL
                   SELECT segrelid, relid FROM gp_dist_random('pg_appendonly')
                   WHERE segrelid in ({oids})
                 ) allpar_aoco
               GROUP BY segrelid, relid
            ) par_aoco
            WHERE rank=1

            UNION ALL

            SELECT indexrelid as childoid, indrelid as paroid
            FROM (
               SELECT indexrelid, indrelid, rank() over (partition by indexrelid order by count(*) desc)
               FROM
                ( SELECT indexrelid, indrelid FROM pg_index
                   WHERE  indexrelid in ({oids})
                   UNION ALL
                  SELECT indexrelid, indrelid FROM gp_dist_random('pg_index')
                   WHERE  indexrelid in ({oids})
                ) allpar_index
               GROUP BY indexrelid, indrelid
            ) par_index
            WHERE rank=1
          ) par ON childoid = oid
          """.format(oids=','.join(map(str, oids)))

    try:
        db = connect2(GV.cfg[GV.master_dbid], utilityMode=False)
        curs = db.query(qry)
        for row in curs.getresult():
            (oid, relname, nspname, relkind, paroid) = row
            objects[oid, 'pg_class'].setRelInfo(relname, nspname, relkind, paroid)

    except Exception, e:
        setError(ERROR_NOREPAIR)
        myprint('  Execution error: ' + str(e))
        myprint(qry)


# -------------------------------------------------------------------------------
def buildGraph():
    def buildGraphRecursive(objects, graph):
        if objects == {}: return
        getRelInfo(objects)
        localObjects = {}
        for (oid, catname), obj in objects.iteritems():
            # Top level object, add to graph
            if obj.isTopLevel():
                if obj not in graph:
                    graph[obj] = []
            # Not top level object, find parent, add to localObjects if needed
            else:
                parobj = objects.get((obj.paroid, catname), None)
                if parobj is None:
                    parobj = localObjects.get((obj.paroid, catname), None)
                    if parobj is None:
                        parobj = RelationObject(obj.paroid, catname)
                        localObjects[(obj.paroid, catname)] = parobj
                assert (parobj is not None)
                # Add obj and parobj to graph
                if parobj not in graph:
                    graph[parobj] = []
                if obj not in graph[parobj]:
                    graph[parobj].append(obj)
        buildGraphRecursive(localObjects, graph)

    buildGraphRecursive(GPObjects, GPObjectGraph)


# -------------------------------------------------------------------------------
def checkcatReport():
    def reportAllIssuesRecursive(par, graph):
        par.reportAllIssues()
        if par not in graph:
            return
        for child in sorted(graph[par]):
            reportAllIssuesRecursive(child, graph)

    buildGraph()
    reportedCheck = ['duplicate','missing_extraneous','inconsistent','foreign_key','acl', 'orphaned_toast_tables']
    myprint('')
    myprint('SUMMARY REPORT')
    myprint('===================================================================')
    elapsed = datetime.timedelta(seconds=int(GV.elapsedTime))
    endTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    myprint('Completed %d test(s) on database \'%s\' at %s with elapsed time %s' \
            % (GV.totalCheckRun, GV.dbname, endTime, elapsed))

    if len(GPObjectGraph) == 0 and len(GV.failedChecks) == 0:
        myprint('Found no catalog issue\n')
        return

    if len(GPObjectGraph) > 0:
        total = 0
        for par in GPObjectGraph:
            if par.isTopLevel() and par.catname not in ['pg_constraint']: total += 1
        if total == 0:
            myprint('Found no catalog issue\n')
            if par.catname in ['pg_constraint']:
                myprint('Warnings were generated, see %s for detail\n' % gplog.get_logfile())
        else:
            myprint('Found a total of %d issue(s)' % total)

        for par in sorted(GPObjectGraph):
            if par.isTopLevel():
                reportAllIssuesRecursive(par, GPObjectGraph)
        myprint('')

        # Report tables with missing attributes in a more usable format
        if len(GV.missing_attr_tables) or len(GV.extra_attr_tables):
            # Expand partition tables
            db = connect2(GV.cfg[GV.master_dbid], utilityMode=False)
            parent_tables = [t[0] for t in db.query("SELECT DISTINCT (schemaname || '.' || tablename) FROM pg_partitions").getresult()]
            partition_leaves_sql = """
            SELECT x.partitionschemaname || '.' || x.partitiontablename
            FROM (
                 SELECT distinct schemaname, tablename, partitionschemaname, partitiontablename, partitionlevel
                 FROM pg_partitions
                 WHERE schemaname || '.' || tablename in (%s)
                 ) as X,
            (SELECT schemaname, tablename maxtable, max(partitionlevel) maxlevel
             FROM pg_partitions
             group by (tablename, schemaname)
            ) as Y
            WHERE x.schemaname = y.schemaname and x.tablename = Y.maxtable and x.partitionlevel = Y.maxlevel;
            """ % ("'" + "', '".join([pg.escape_string(t) for t in parent_tables]) + "'")

            GV.missing_attr_tables = list(set(GV.missing_attr_tables))
            if len(GV.missing_attr_tables) > 0:
                myprint('----------------------------------------------------')
                myprint("    Tables with missing issues:")
                myprint("        Format [database name].[schema name].[table name].[segment id]:")
                for table, segids in GV.missing_attr_tables:
                    if table in parent_tables:
                        for part_table in db.query(partition_leaves_sql).getresult():
                            GV.missing_attr_tables.append( (part_table[0], segids) )
                for table, segids in sorted(GV.missing_attr_tables):
                    for id in segids[1:-1].split(','):
                        myprint("        Table %s.%s.%s" % (GV.dbname, table, id))
                myprint('')

            GV.extra_attr_tables = list(set(GV.extra_attr_tables))
            if len(GV.extra_attr_tables) > 0:
                myprint('----------------------------------------------------')
                myprint("    Tables with extra issues:")
                myprint("        Format [database name].[schema name].[table name].[segment id]:")
                for table, segids in GV.extra_attr_tables:
                    if table in parent_tables:
                        for part_table in db.query(partition_leaves_sql).getresult():
                            GV.extra_attr_tables.append( (part_table[0], segids) )
                for table, segids in sorted(GV.extra_attr_tables):
                    for id in segids[1:-1].split(','):
                        myprint("        Table %s.%s.%s" % (GV.dbname, table, id))
                myprint('')
            myprint('')

    notReported = set(GV.failedChecks).difference(reportedCheck)
    if len(notReported) > 0:
        myprint('Failed test(s) that are not reported here: %s' % (', '.join(notReported)))
        myprint('See %s for detail\n' % gplog.get_logfile())


def _myprint(str, level=logging.CRITICAL):
    gplog.log_literal(logger, level, str)


myprint = _myprint

def generateVerifyFile(catname, fields, results, checkname):
    in_clause = reduce(lambda x, y: x + ('' if not x else ',') + str(y[fields.index('oid')]), results, '')
    verify_sql = '''
          SELECT *
          FROM (
               SELECT relname, oid FROM pg_class WHERE reltype IN ({in_clause})
               UNION ALL
               SELECT relname, oid FROM gp_dist_random('pg_class') WHERE reltype IN ({in_clause})
          ) alltyprelids
          GROUP BY relname, oid ORDER BY count(*) desc
    '''.format(in_clause=in_clause)
    filename = 'gpcheckcat.verify.%s.%s.%s.%s.sql' % (GV.dbname, catname, checkname, GV.timestamp)
    try:
        with open(filename, 'w') as fp:
            fp.write(verify_sql + '\n')
    except Exception as e:
        logger.warning('Unable to generate verify file for %s (%s)' % (catname, str(e)))

def truncate_batch_size(primaries):
    if GV.opt['-B'] > primaries:
        GV.opt['-B'] = primaries
        myprint("Truncated batch size to number of primaries: %d" % primaries)

def check_gpexpand():
    check_result, msg = conflict_with_gpexpand("gpcheckcat",
                                               refuse_phase1=True,
                                               refuse_phase2=False)
    if not check_result:
        myprint(msg)
        sys.exit(1)

def check_test_subset_parameter_count():
    test_option_count = 0
    if GV.opt['-R']:
        test_option_count += 1
    if GV.opt['-s']:
        test_option_count += 1
    if GV.opt['-C']:
        test_option_count += 1
    if test_option_count > 1:
        myprint("Error: multiple test subset options are selected. Please pass only one of [-R, -s, -C] if necessary.\n")
        setError(ERROR_NOREPAIR)
        sys.exit(GV.retcode)
    return

def check_test_names_parsed(tests):
    correct_tests = []
    for t in tests:
        if t not in all_checks:
            myprint("'%s' is not a valid test" % t)
        else:
            correct_tests.append(t)
    return correct_tests


def main():
    parseCommandLine()
    if GV.opt['-l']:
        listAllChecks()
        sys.exit(GV.retcode)

    GV.version = getversion()
    if GV.version < "4.0":
        myprint("Error: only Greenplum database version >= 4.0 are supported\n")
        sys.exit(GV.retcode)

    check_test_subset_parameter_count()

    # gpcheckcat should check gpexpand running status
    check_gpexpand()

    GV.cfg = getGPConfiguration()
    truncate_batch_size(len(GV.cfg.keys()))

    GV.report_cfg = getReportConfiguration()
    GV.max_content = max([GV.cfg[dbid]['content'] for dbid in GV.cfg])

    for dbid in GV.cfg:
        if (GV.cfg[dbid]['content'] == -1):
            GV.master_dbid = dbid
            break

    if GV.master_dbid is None:
        myprint("Error: master configuration info not found in gp_segment_configuration\n")
        setError(ERROR_NOREPAIR)
        sys.exit(GV.retcode)

    GV.catalog = getCatalog()
    leaked_schema_dropper = LeakedSchemaDropper()

    for dbname in GV.alldb:

        # Reset global variables
        GV.reset_stmt_queues()
        GPObjects.clear()
        GPObjectGraph.clear()
        GV.dbname = dbname
        myprint('')
        myprint("Connected as user \'%s\' to database '%s', port '%d', gpdb version '%s'" \
                % (GV.opt['-U'], GV.dbname, GV.report_cfg[-1]['port'], GV.version))
        myprint('-------------------------------------------------------------------')
        myprint('Batch size: %s' % GV.opt['-B'])

        drop_leaked_schemas(leaked_schema_dropper, dbname)

        if GV.opt['-C']:
            namestr = GV.opt['-C']
            try:
                catalog_table_obj = getCatObj(namestr)
                runCheckCatname(catalog_table_obj)
            except Exception, e:
                setError(ERROR_NOREPAIR)
                sys.exit(GV.retcode)
        else:
            if GV.opt['-R']:
                run_names = GV.opt['-R']
                run_names_array = [x.strip() for x in run_names.split(",")]
                run_tests = check_test_names_parsed(run_names_array)
            elif GV.opt['-s']:
                skip_names = GV.opt['-s']
                skip_names_array = [x.strip() for x in skip_names.split(",")]
                skip_tests = check_test_names_parsed(skip_names_array)
                run_tests = [x for x in all_checks if x not in skip_tests]
            else:
                run_tests = all_checks.keys()

            runAllChecks(run_tests)

        checkcatReport()

        # skip shared tables on subsequent passes
        if not GV.opt['-S']:
            GV.opt['-S'] = "none"

    sys.exit(GV.retcode)

#############
if __name__ == '__main__':
    main()
