As described on -hackers this is my work so far adding ADD/DROP INHERITS. It
implements the controversial "ALTER TABLE <table> ADD/DROP INHERITS <parent>"
syntax that requires making INHERITS a reserved keyword. I haven't seen a
clear consensus yet on what the best syntax to use here would be.

Also, it doesn't handle default column values yet.

Other than that I think it's complete. There are a number of things I'm not
completely certain I'm on the right track with though so it can certainly use
some more eyeballs on it.



Index: src/backend/commands/tablecmds.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/commands/tablecmds.c,v
retrieving revision 1.184
diff -u -p -c -r1.184 tablecmds.c
cvs diff: conflicting specifications of output style
*** src/backend/commands/tablecmds.c    10 May 2006 23:18:39 -0000      1.184
--- src/backend/commands/tablecmds.c    7 Jun 2006 18:09:56 -0000
*************** typedef struct NewColumnValue
*** 159,166 ****
--- 159,168 ----
  static void truncate_check_rel(Relation rel);
  static List *MergeAttributes(List *schema, List *supers, bool istemp,
                                List **supOids, List **supconstr, int 
*supOidCount);
+ static void MergeAttributesIntoExisting(Relation rel, Relation relation);
  static bool change_varattnos_of_a_node(Node *node, const AttrNumber 
*newattno);
  static void StoreCatalogInheritance(Oid relationId, List *supers);
+ static void StoreCatalogInheritance1(Oid relationId, Oid parentOid, int16 
seqNumber, Relation catalogRelation);
  static int    findAttrByName(const char *attributeName, List *schema);
  static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
  static bool needs_toast_table(Relation rel);
*************** static void ATPrepSetTableSpace(AlteredT
*** 246,251 ****
--- 248,255 ----
  static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace);
  static void ATExecEnableDisableTrigger(Relation rel, char *trigname,
                                                   bool enable, bool 
skip_system);
+ static void ATExecAddInherits(Relation rel, RangeVar *parent);
+ static void ATExecDropInherits(Relation rel, RangeVar *parent);
  static void copy_relation_data(Relation rel, SMgrRelation dst);
  static void update_ri_trigger_args(Oid relid,
                                           const char *oldname,
*************** static void
*** 1156,1165 ****
  StoreCatalogInheritance(Oid relationId, List *supers)
  {
        Relation        relation;
-       TupleDesc       desc;
        int16           seqNumber;
        ListCell   *entry;
-       HeapTuple       tuple;
  
        /*
         * sanity checks
--- 1160,1167 ----
*************** StoreCatalogInheritance(Oid relationId, 
*** 1179,1194 ****
         * anymore, there's no need to look for indirect ancestors.)
         */
        relation = heap_open(InheritsRelationId, RowExclusiveLock);
-       desc = RelationGetDescr(relation);
  
        seqNumber = 1;
        foreach(entry, supers)
        {
!               Oid                     parentOid = lfirst_oid(entry);
                Datum           datum[Natts_pg_inherits];
                char            nullarr[Natts_pg_inherits];
                ObjectAddress childobject,
                                        parentobject;
  
                datum[0] = ObjectIdGetDatum(relationId);                /* 
inhrel */
                datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
--- 1181,1206 ----
         * anymore, there's no need to look for indirect ancestors.)
         */
        relation = heap_open(InheritsRelationId, RowExclusiveLock);
  
        seqNumber = 1;
        foreach(entry, supers)
        {
!               StoreCatalogInheritance1(relationId, lfirst_oid(entry), 
seqNumber, relation);
!               seqNumber += 1;
!       }
! 
!       heap_close(relation, RowExclusiveLock);
! }
! 
! static void
! StoreCatalogInheritance1(Oid relationId, Oid parentOid, int16 seqNumber, 
Relation relation) 
! {
                Datum           datum[Natts_pg_inherits];
                char            nullarr[Natts_pg_inherits];
                ObjectAddress childobject,
                                        parentobject;
+               HeapTuple       tuple;
+               TupleDesc desc = RelationGetDescr(relation);
  
                datum[0] = ObjectIdGetDatum(relationId);                /* 
inhrel */
                datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
*************** StoreCatalogInheritance(Oid relationId, 
*** 1222,1234 ****
                 * Mark the parent as having subclasses.
                 */
                setRelhassubclassInRelation(parentOid, true);
  
-               seqNumber += 1;
-       }
  
-       heap_close(relation, RowExclusiveLock);
  }
  
  /*
   * Look for an existing schema entry with the given name.
   *
--- 1234,1246 ----
                 * Mark the parent as having subclasses.
                 */
                setRelhassubclassInRelation(parentOid, true);
+       
  
  
  }
  
+ 
+ 
  /*
   * Look for an existing schema entry with the given name.
   *
*************** ATPrepCmd(List **wqueue, Relation rel, A
*** 2053,2063 ****
--- 2065,2078 ----
                case AT_DisableTrig:    /* DISABLE TRIGGER variants */
                case AT_DisableTrigAll:
                case AT_DisableTrigUser:
+               case AT_AddInherits:
+               case AT_DropInherits:
                        ATSimplePermissions(rel, false);
                        /* These commands never recurse */
                        /* No command-specific prep needed */
                        pass = AT_PASS_MISC;
                        break;
+ 
                default:                                /* oops */
                        elog(ERROR, "unrecognized alter table type: %d",
                                 (int) cmd->subtype);
*************** ATExecCmd(AlteredTableInfo *tab, Relatio
*** 2233,2238 ****
--- 2248,2259 ----
                case AT_DisableTrigUser:                /* DISABLE TRIGGER USER 
*/
                        ATExecEnableDisableTrigger(rel, NULL, false, true);
                        break;
+               case AT_DropInherits:
+                       ATExecDropInherits(rel, cmd->parent);
+                       break;
+               case AT_AddInherits:
+                       ATExecAddInherits(rel, cmd->parent);
+                       break;
                default:                                /* oops */
                        elog(ERROR, "unrecognized alter table type: %d",
                                 (int) cmd->subtype);
*************** ATExecEnableDisableTrigger(Relation rel,
*** 5880,5885 ****
--- 5901,6198 ----
        EnableDisableTrigger(rel, trigname, enable, skip_system);
  }
  
+ static void
+ ATExecAddInherits(Relation rel, RangeVar *parent)
+ {
+       Relation relation, catalogRelation;
+       SysScanDesc scan;
+       ScanKeyData key;
+       HeapTuple inheritsTuple;
+       int4 inhseqno = 0;
+       ListCell   *child;
+       List       *children;
+ 
+       relation = heap_openrv(parent, AccessShareLock); /* XXX is this enough 
locking? */
+       if (relation->rd_rel->relkind != RELKIND_RELATION)
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("inherited relation \"%s\" is not a 
table",
+                                               parent->relname)));
+ 
+ 
+       /* Permanent rels cannot inherit from temporary ones */
+       if (!rel->rd_istemp && isTempNamespace(RelationGetNamespace(relation)))
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("cannot inherit from temporary relation 
\"%s\"",
+                                               parent->relname)));
+       
+       if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
+               aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+                                          RelationGetRelationName(relation));
+ 
+       /* If parent has OIDs then all children must have OIDs */
+       if (relation->rd_rel->relhasoids && !rel->rd_rel->relhasoids)
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("table \"%s\" without OIDs cannot 
inherit from table \"%s\" with OIDs",
+                                               RelationGetRelationName(rel), 
parent->relname)));
+ 
+       /*
+        * Reject duplications in the list of parents. -- this is the same 
check as
+        * when creating a table, but maybe we should check for the parent 
anywhere
+        * higher in the inheritance structure?
+        */
+       catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
+       ScanKeyInit(&key,
+                               Anum_pg_inherits_inhrelid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(RelationGetRelid(rel)));
+       scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, 
true, SnapshotNow, 1, &key);
+       while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
+       {
+               Form_pg_inherits inh = (Form_pg_inherits) 
GETSTRUCT(inheritsTuple);
+               if (inh->inhparent == RelationGetRelid(relation))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DUPLICATE_TABLE),
+                                        errmsg("inherited relation \"%s\" 
duplicated",
+                                                       parent->relname)));
+               if (inh->inhseqno > inhseqno)
+                       inhseqno = inh->inhseqno;
+       }
+       systable_endscan(scan);
+       heap_close(catalogRelation, RowExclusiveLock);
+ 
+       /* Get children because we have to manually recurse and also because we
+        * have to check for recursive inheritance graphs */
+ 
+       /* this routine is actually in the planner */
+       children = find_all_inheritors(RelationGetRelid(rel));
+ 
+       if (list_member_oid(children, RelationGetRelid(relation)))
+               ereport(ERROR,
+                               (errcode(ERRCODE_DUPLICATE_TABLE),
+                                errmsg("Circular inheritance structure 
found")));
+ 
+       foreach(child, children)
+               {
+                       Oid                     childrelid = lfirst_oid(child);
+                       Relation        childrel;
+ 
+                       childrel = relation_open(childrelid, 
AccessExclusiveLock);
+                       MergeAttributesIntoExisting(childrel, relation);
+                       relation_close(childrel, NoLock);
+               }
+       
+       catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
+       StoreCatalogInheritance1(RelationGetRelid(rel), 
RelationGetRelid(relation), inhseqno+1, catalogRelation);
+       heap_close(catalogRelation, RowExclusiveLock);
+       
+       heap_close(relation, AccessShareLock);
+ }
+ 
+ 
+ static void
+ MergeAttributesIntoExisting(Relation rel, Relation relation) 
+ {
+       Relation        attrdesc;
+       AttrNumber      parent_attno, child_attno;
+       TupleDesc       tupleDesc;
+       TupleConstr *constr;
+       HeapTuple       tuple;
+ 
+       child_attno = RelationGetNumberOfAttributes(rel);
+ 
+       tupleDesc = RelationGetDescr(relation);
+       constr = tupleDesc->constr;
+ 
+       for (parent_attno = 1; parent_attno <= tupleDesc->natts;
+                parent_attno++)
+       {
+               Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 
1];
+               char       *attributeName = NameStr(attribute->attname);
+ 
+               /* Ignore dropped columns in the parent. */
+               if (attribute->attisdropped)
+                       continue;
+ 
+               /* Does it conflict with an existing column? */
+               attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
+ 
+               tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), 
attributeName);
+               if (HeapTupleIsValid(tuple)) {
+                       /*
+                        * Yes, try to merge the two column definitions. They 
must
+                        * have the same type and typmod.
+                        */
+                       Form_pg_attribute childatt = (Form_pg_attribute) 
GETSTRUCT(tuple);
+                       ereport(NOTICE,
+                                       (errmsg("merging column \"%s\" with 
inherited definition",
+                                                       attributeName)));
+                       if (attribute->atttypid  != childatt->atttypid ||
+                               attribute->atttypmod != childatt->atttypmod ||
+                               (attribute->attnotnull && 
!childatt->attnotnull))
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_DATATYPE_MISMATCH),
+                                                errmsg("child table \"%s\" has 
different type for column \"%s\"",
+                                                               
RelationGetRelationName(rel), NameStr(attribute->attname))));
+ 
+                       childatt->attinhcount++;
+                       simple_heap_update(attrdesc, &tuple->t_self, tuple);
+                       CatalogUpdateIndexes(attrdesc, tuple); /* XXX strength 
reduce openindexes to outside loop? */
+                       heap_freetuple(tuple);
+                       
+                       /* XXX defaults */
+ 
+               } else {
+                       /*
+                        * No, create a new inherited column
+                        */
+                       
+                       FormData_pg_attribute attributeD;
+                       HeapTuple attributeTuple = 
heap_addheader(Natts_pg_attribute,
+                                                                               
                          false,
+                                                                               
                          ATTRIBUTE_TUPLE_SIZE,
+                                                                               
                          (void *) &attributeD);
+                       Form_pg_attribute childatt = (Form_pg_attribute) 
GETSTRUCT(attributeTuple);
+ 
+                       if (attribute->attnotnull)
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_DATATYPE_MISMATCH),
+                                                errmsg("Cannot add new 
inherited NOT NULL column \"%s\"",
+                                                               
NameStr(attribute->attname))));
+ 
+                       childatt->attrelid = RelationGetRelid(rel);
+                       namecpy(&childatt->attname, &attribute->attname);
+                       childatt->atttypid = attribute->atttypid;
+                       childatt->attstattarget = -1;
+                       childatt->attlen = attribute->attlen;
+                       childatt->attcacheoff = -1;
+                       childatt->atttypmod = attribute->atttypmod;
+                       childatt->attnum = ++child_attno;
+                       childatt->attbyval = attribute->attbyval;
+                       childatt->attndims = attribute->attndims;
+                       childatt->attstorage = attribute->attstorage;
+                       childatt->attalign = attribute->attalign;
+                       childatt->attnotnull = false;
+                       childatt->atthasdef = false; /* XXX */
+                       childatt->attisdropped = false;
+                       childatt->attislocal = false;
+                       childatt->attinhcount = attribute->attinhcount+1;
+ 
+                       simple_heap_insert(attrdesc, attributeTuple);
+                       CatalogUpdateIndexes(attrdesc, attributeTuple);
+                       heap_freetuple(attributeTuple);
+ 
+                       /* XXX Defaults */
+ 
+               }
+               heap_close(attrdesc, RowExclusiveLock);
+       }
+       
+ }
+ 
+ static void
+ ATExecDropInherits(Relation rel, RangeVar *parent)
+ {
+ 
+ 
+       Relation        catalogRelation;
+       SysScanDesc scan;
+       ScanKeyData key;
+       HeapTuple       inheritsTuple, attributeTuple;
+       Oid                     inhparent;
+       Oid                     dropparent;
+       int             found = 0;
+       
+       /* Get the OID of parent -- if no schema is specified use the regular
+        * search path and only drop the one table that's found. We could try 
to be
+        * clever and look at each parent and see if it matches but that would 
be
+        * inconsistent with other operations I think. */
+       
+       Assert(rel);
+       Assert(parent);
+ 
+       dropparent = RangeVarGetRelid(parent, false);
+ 
+       /* Search through the direct parents of rel looking for dropparent oid 
*/
+ 
+       catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
+       ScanKeyInit(&key,
+                               Anum_pg_inherits_inhrelid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(RelationGetRelid(rel)));
+       scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, 
true, SnapshotNow, 1, &key);
+       while (!found && HeapTupleIsValid(inheritsTuple = 
systable_getnext(scan)))
+       {
+               inhparent = ((Form_pg_inherits) 
GETSTRUCT(inheritsTuple))->inhparent;
+               if (inhparent == dropparent) {
+                       simple_heap_delete(catalogRelation, 
&inheritsTuple->t_self);
+                       found = 1;
+               }
+       }
+       systable_endscan(scan);
+       heap_close(catalogRelation, RowExclusiveLock);
+ 
+ 
+       if (!found) {
+               /* would it be better to look up the actual schema of 
dropparent and
+                * make the error message explicitly name the qualified name 
it's
+                * trying to drop ?*/
+               if (parent->schemaname)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_UNDEFINED_TABLE),
+                                        errmsg("relation \"%s.%s\" is not a 
parent of relation \"%s\"",
+                                                       parent->schemaname, 
parent->relname, RelationGetRelationName(rel))));
+               else
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_UNDEFINED_TABLE),
+                                        errmsg("relation \"%s\" is not a 
parent of relation \"%s\"",
+                                                       parent->relname, 
RelationGetRelationName(rel))));
+       }
+       
+       /* Search through columns looking for matching columns from parent 
table */
+ 
+       catalogRelation = heap_open(AttributeRelationId, RowExclusiveLock);
+       ScanKeyInit(&key,
+                               Anum_pg_attribute_attrelid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(RelationGetRelid(rel)));
+       scan = systable_beginscan(catalogRelation, AttributeRelidNumIndexId, 
true, SnapshotNow, 1, &key);
+       while (HeapTupleIsValid(attributeTuple = systable_getnext(scan))) {
+               Form_pg_attribute att = 
((Form_pg_attribute)GETSTRUCT(attributeTuple));
+               /* Not an inherited column at all
+                * (do NOT use islocal for this test--it can be true for 
inherited columns)
+                */
+               if (att->attinhcount == 0) 
+                       continue; 
+               if (att->attisdropped) /* XXX Is this right? */
+                       continue;
+               if (SearchSysCacheExistsAttName(dropparent, 
NameStr(att->attname))) {
+                       /* Decrement inhcount and possibly set islocal to 1 */
+                       HeapTuple copyTuple = heap_copytuple(attributeTuple);
+                       Form_pg_attribute copy_att = 
((Form_pg_attribute)GETSTRUCT(copyTuple));
+ 
+                       copy_att->attinhcount--;
+                       if (copy_att->attinhcount == 0)
+                               copy_att->attislocal = 1;
+ 
+                       simple_heap_update(catalogRelation, &copyTuple->t_self, 
copyTuple);
+                       /* XXX "Avoid using it for multiple tuples, since 
opening the
+                        * indexes and building the index info structures is 
moderately
+                        * expensive." Perhaps this can be moved outside the 
loop or else
+                        * at least the CatalogOpenIndexes/CatalogCloseIndexes 
moved
+                        * outside the loop but when I try that it seg 
faults?!*/
+                       CatalogUpdateIndexes(catalogRelation, copyTuple);
+                       heap_freetuple(copyTuple);
+               }
+       }
+       systable_endscan(scan);
+       heap_close(catalogRelation, RowExclusiveLock);
+ }
+ 
+ 
+ 
  /*
   * ALTER TABLE CREATE TOAST TABLE
   *
Index: src/backend/nodes/copyfuncs.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/nodes/copyfuncs.c,v
retrieving revision 1.335
diff -u -p -c -r1.335 copyfuncs.c
cvs diff: conflicting specifications of output style
*** src/backend/nodes/copyfuncs.c       30 Apr 2006 18:30:38 -0000      1.335
--- src/backend/nodes/copyfuncs.c       7 Jun 2006 18:09:56 -0000
*************** _copyAlterTableCmd(AlterTableCmd *from)
*** 1799,1804 ****
--- 1799,1805 ----
        COPY_SCALAR_FIELD(subtype);
        COPY_STRING_FIELD(name);
        COPY_NODE_FIELD(def);
+       COPY_NODE_FIELD(parent);
        COPY_NODE_FIELD(transform);
        COPY_SCALAR_FIELD(behavior);
  
Index: src/backend/parser/gram.y
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/parser/gram.y,v
retrieving revision 2.545
diff -u -p -c -r2.545 gram.y
cvs diff: conflicting specifications of output style
*** src/backend/parser/gram.y   27 May 2006 17:38:45 -0000      2.545
--- src/backend/parser/gram.y   7 Jun 2006 18:09:57 -0000
*************** alter_table_cmd:
*** 1514,1519 ****
--- 1514,1535 ----
                                        n->subtype = AT_DisableTrigUser;
                                        $$ = (Node *)n;
                                }
+                       /* ALTER TABLE <name> ADD INHERITS <parent> */
+                       | ADD_P INHERITS qualified_name
+                               {
+                                       AlterTableCmd *n = 
makeNode(AlterTableCmd);
+                                       n->subtype = AT_AddInherits;
+                                       n->parent = $3;
+                                       $$ = (Node *)n;
+                               }
+                       /* ALTER TABLE <name> DROP INHERITS <parent> */
+                       | DROP INHERITS qualified_name
+                               {
+                                       AlterTableCmd *n = 
makeNode(AlterTableCmd);
+                                       n->subtype = AT_DropInherits;
+                                       n->parent = $3;
+                                       $$ = (Node *)n;
+                               }
                        | alter_rel_cmd
                                {
                                        $$ = $1;
*************** unreserved_keyword:
*** 8422,8428 ****
                        | INCREMENT
                        | INDEX
                        | INHERIT
-                       | INHERITS
                        | INPUT_P
                        | INSENSITIVE
                        | INSERT
--- 8438,8443 ----
*************** func_name_keyword:
*** 8640,8645 ****
--- 8655,8661 ----
   */
  reserved_keyword:
                          ALL
+                       | INHERITS
                        | ANALYSE
                        | ANALYZE
                        | AND
Index: src/include/nodes/parsenodes.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/nodes/parsenodes.h,v
retrieving revision 1.310
diff -u -p -c -r1.310 parsenodes.h
cvs diff: conflicting specifications of output style
*** src/include/nodes/parsenodes.h      30 Apr 2006 18:30:40 -0000      1.310
--- src/include/nodes/parsenodes.h      7 Jun 2006 18:10:00 -0000
*************** typedef enum AlterTableType
*** 874,880 ****
        AT_EnableTrigAll,                       /* ENABLE TRIGGER ALL */
        AT_DisableTrigAll,                      /* DISABLE TRIGGER ALL */
        AT_EnableTrigUser,                      /* ENABLE TRIGGER USER */
!       AT_DisableTrigUser                      /* DISABLE TRIGGER USER */
  } AlterTableType;
  
  typedef struct AlterTableCmd  /* one subcommand of an ALTER TABLE */
--- 874,882 ----
        AT_EnableTrigAll,                       /* ENABLE TRIGGER ALL */
        AT_DisableTrigAll,                      /* DISABLE TRIGGER ALL */
        AT_EnableTrigUser,                      /* ENABLE TRIGGER USER */
!       AT_DisableTrigUser,                     /* DISABLE TRIGGER USER */
!       AT_AddInherits,                         /* ADD INHERITS parent */
!       AT_DropInherits                         /* DROP INHERITS parent */
  } AlterTableType;
  
  typedef struct AlterTableCmd  /* one subcommand of an ALTER TABLE */
*************** typedef struct AlterTableCmd    /* one subc
*** 883,888 ****
--- 885,891 ----
        AlterTableType subtype;         /* Type of table alteration to apply */
        char       *name;                       /* column, constraint, or 
trigger to act on,
                                                                 * or new owner 
or tablespace */
+       RangeVar   *parent;                     /* Parent table for add/drop 
inherits */
        Node       *def;                        /* definition of new column, 
column type,
                                                                 * index, or 
constraint */
        Node       *transform;          /* transformation expr for ALTER TYPE */


-- 
greg


---------------------------(end of broadcast)---------------------------
TIP 9: In versions below 8.0, the planner will ignore your desire to
       choose an index scan if your joining column's datatypes do not
       match

Reply via email to