On Mon, 2007-21-05 at 12:23 +0530, NikhilS wrote:
> I had spent some time on this earlier so decided to complete and send
> the patch to you for review. This patch supports copying of
> expressions, predicates, opclass, amorder, reloptions etc. The test
> case also contains some more additions with this patch. Please let me
> know if there are any issues. 

Attached is a revised version of this patch. Note that this pattern is
always unsafe:

        ht_am = SearchSysCache(AMOID, ...);
        if (!HeapTupleIsValid(ht_am))
                elog(ERROR, "...");
        amrec = (Form_pg_am) GETSTRUCT(ht_am);
        index->accessMethod = NameStr(amrec->amname);

        /* ... */
        ReleaseSysCache(ht_am);

        return index;

Before calling ReleaseSysCache(), all the data you need from the
syscache entry needs to be deep-copied to allow subsequent access, but
NameStr() doesn't do a deep-copy. Adding "-DFORCE_CATCACHE_RELEASE" is a
useful way to catch these kinds of problems (I wonder if this is worth
adding to the default CFLAGS when assertions are enabled?)

I also made a bunch of editorial changes, including moving the
varattnos_map_schema() call out of the loop in transformInhRelation().

BTW, comments like "This function is based on code from ruleutils.c"
would be helpful for reviewers (whether in the patch itself or just in
the email containing the patch).

There's still a few things I need to fix in the patch, but I'll apply a
revised version of the attached patch to HEAD tomorrow, barring any
objections.

-Neil

Index: doc/src/sgml/ref/create_table.sgml
===================================================================
RCS file: /home/neilc/postgres/cvs_root/pgsql/doc/src/sgml/ref/create_table.sgml,v
retrieving revision 1.107
diff -p -c -r1.107 create_table.sgml
*** doc/src/sgml/ref/create_table.sgml	1 Feb 2007 00:28:18 -0000	1.107
--- doc/src/sgml/ref/create_table.sgml	2 Jun 2007 19:52:22 -0000
*************** and <replaceable class="PARAMETER">table
*** 259,275 ****
       </para>
       <para>
        Not-null constraints are always copied to the new table.
!       <literal>CHECK</literal> constraints will only be copied if
!       <literal>INCLUDING CONSTRAINTS</literal> is specified; other types of
!       constraints will never be copied. Also, no distinction is made between
!       column constraints and table constraints &mdash; when constraints are
!       requested, all check constraints are copied.
       </para>
       <para>
        Note also that unlike <literal>INHERITS</literal>, copied columns and
        constraints are not merged with similarly named columns and constraints.
        If the same name is specified explicitly or in another
!       <literal>LIKE</literal> clause an error is signalled.
       </para>
      </listitem>
     </varlistentry>
--- 259,276 ----
       </para>
       <para>
        Not-null constraints are always copied to the new table.
!       <literal>CHECK</literal>, <literal>UNIQUE</literal>, and
!       <literal>PRIMARY KEY</literal> constraints will only be copied
!       if <literal>INCLUDING CONSTRAINTS</literal> is specified. Also,
!       no distinction is made between column constraints and table
!       constraints &mdash; when constraints are requested, all eligible
!       constraints are copied.
       </para>
       <para>
        Note also that unlike <literal>INHERITS</literal>, copied columns and
        constraints are not merged with similarly named columns and constraints.
        If the same name is specified explicitly or in another
!       <literal>LIKE</literal> clause, an error is signalled.
       </para>
      </listitem>
     </varlistentry>
Index: src/backend/bootstrap/bootparse.y
===================================================================
RCS file: /home/neilc/postgres/cvs_root/pgsql/src/backend/bootstrap/bootparse.y,v
retrieving revision 1.88
diff -p -c -r1.88 bootparse.y
*** src/backend/bootstrap/bootparse.y	13 Mar 2007 00:33:39 -0000	1.88
--- src/backend/bootstrap/bootparse.y	2 Jun 2007 19:46:07 -0000
*************** Boot_DeclareIndexStmt:
*** 252,258 ****
  								LexIDStr($8),
  								NULL,
  								$10,
! 								NULL, NIL,
  								false, false, false,
  								false, false, true, false, false);
  					do_end();
--- 252,258 ----
  								LexIDStr($8),
  								NULL,
  								$10,
! 								NULL, NIL, NULL,
  								false, false, false,
  								false, false, true, false, false);
  					do_end();
*************** Boot_DeclareUniqueIndexStmt:
*** 270,276 ****
  								LexIDStr($9),
  								NULL,
  								$11,
! 								NULL, NIL,
  								true, false, false,
  								false, false, true, false, false);
  					do_end();
--- 270,276 ----
  								LexIDStr($9),
  								NULL,
  								$11,
! 								NULL, NIL, NULL,
  								true, false, false,
  								false, false, true, false, false);
  					do_end();
Index: src/backend/commands/indexcmds.c
===================================================================
RCS file: /home/neilc/postgres/cvs_root/pgsql/src/backend/commands/indexcmds.c,v
retrieving revision 1.158
diff -p -c -r1.158 indexcmds.c
*** src/backend/commands/indexcmds.c	2 May 2007 21:08:45 -0000	1.158
--- src/backend/commands/indexcmds.c	2 Jun 2007 19:46:07 -0000
*************** DefineIndex(RangeVar *heapRelation,
*** 100,105 ****
--- 100,106 ----
  			List *attributeList,
  			Expr *predicate,
  			List *options,
+ 			char *inhreloptions,
  			bool unique,
  			bool primary,
  			bool isconstraint,
*************** DefineIndex(RangeVar *heapRelation,
*** 393,400 ****
  
  	/*
  	 * Parse AM-specific options, convert to text array form, validate.
  	 */
! 	reloptions = transformRelOptions((Datum) 0, options, false, false);
  
  	(void) index_reloptions(amoptions, reloptions, true);
  
--- 394,408 ----
  
  	/*
  	 * Parse AM-specific options, convert to text array form, validate.
+ 	 * The inh reloptions introduced due to using unique/primary indexes via
+ 	 * the "CREATE LIKE INCLUDING CONSTRAINTS" statement also need to be merged here
  	 */
! 	if (inhreloptions)
! 		reloptions = deflatten_reloptions(inhreloptions);
! 	else
! 		reloptions = (Datum) 0;
! 
! 	reloptions = transformRelOptions(reloptions, options, false, false);
  
  	(void) index_reloptions(amoptions, reloptions, true);
  
Index: src/backend/commands/tablecmds.c
===================================================================
RCS file: /home/neilc/postgres/cvs_root/pgsql/src/backend/commands/tablecmds.c,v
retrieving revision 1.225
diff -p -c -r1.225 tablecmds.c
*** src/backend/commands/tablecmds.c	18 May 2007 23:19:41 -0000	1.225
--- src/backend/commands/tablecmds.c	2 Jun 2007 19:46:07 -0000
*************** ATExecAddIndex(AlteredTableInfo *tab, Re
*** 3781,3786 ****
--- 3781,3787 ----
  				stmt->indexParams,		/* parameters */
  				(Expr *) stmt->whereClause,
  				stmt->options,
+ 				stmt->inhreloptions,
  				stmt->unique,
  				stmt->primary,
  				stmt->isconstraint,
Index: src/backend/nodes/copyfuncs.c
===================================================================
RCS file: /home/neilc/postgres/cvs_root/pgsql/src/backend/nodes/copyfuncs.c,v
retrieving revision 1.376
diff -p -c -r1.376 copyfuncs.c
*** src/backend/nodes/copyfuncs.c	22 May 2007 23:23:55 -0000	1.376
--- src/backend/nodes/copyfuncs.c	2 Jun 2007 19:46:07 -0000
*************** _copyIndexStmt(IndexStmt *from)
*** 2162,2167 ****
--- 2162,2168 ----
  	COPY_STRING_FIELD(tableSpace);
  	COPY_NODE_FIELD(indexParams);
  	COPY_NODE_FIELD(options);
+ 	COPY_STRING_FIELD(inhreloptions);
  	COPY_NODE_FIELD(whereClause);
  	COPY_SCALAR_FIELD(unique);
  	COPY_SCALAR_FIELD(primary);
Index: src/backend/parser/analyze.c
===================================================================
RCS file: /home/neilc/postgres/cvs_root/pgsql/src/backend/parser/analyze.c,v
retrieving revision 1.363
diff -p -c -r1.363 analyze.c
*** src/backend/parser/analyze.c	27 Apr 2007 22:05:48 -0000	1.363
--- src/backend/parser/analyze.c	2 Jun 2007 20:47:44 -0000
***************
*** 27,40 ****
--- 27,43 ----
  
  #include "postgres.h"
  
+ #include "access/genam.h"
  #include "access/heapam.h"
  #include "catalog/heap.h"
  #include "catalog/index.h"
  #include "catalog/namespace.h"
+ #include "catalog/pg_opclass.h"
  #include "catalog/pg_type.h"
  #include "commands/defrem.h"
  #include "commands/prepare.h"
  #include "commands/tablecmds.h"
+ #include "commands/tablespace.h"
  #include "miscadmin.h"
  #include "nodes/makefuncs.h"
  #include "optimizer/clauses.h"
***************
*** 54,59 ****
--- 57,63 ----
  #include "utils/acl.h"
  #include "utils/builtins.h"
  #include "utils/lsyscache.h"
+ #include "utils/relcache.h"
  #include "utils/syscache.h"
  
  
*************** static void release_pstate_resources(Par
*** 150,155 ****
--- 154,162 ----
  static FromExpr *makeFromExpr(List *fromlist, Node *quals);
  static bool check_parameter_resolution_walker(Node *node,
  								check_parameter_resolution_context *context);
+ static IndexStmt *generateConstraintIndexStmt(CreateStmtContext *cxt, 
+ 							Relation parent_index, AttrNumber *attmap);
+ static void get_opclass(Oid opclass, Oid actual_datatype, List **name_list);
  
  
  /*
*************** transformInhRelation(ParseState *pstate,
*** 1349,1378 ****
  		}
  	}
  
! 	/*
! 	 * Copy CHECK constraints if requested, being careful to adjust
! 	 * attribute numbers
! 	 */
! 	if (including_constraints && tupleDesc->constr)
  	{
! 		AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns);
! 		int			ccnum;
  
! 		for (ccnum = 0; ccnum < tupleDesc->constr->num_check; ccnum++)
  		{
! 			char	   *ccname = tupleDesc->constr->check[ccnum].ccname;
! 			char	   *ccbin = tupleDesc->constr->check[ccnum].ccbin;
! 			Node	   *ccbin_node = stringToNode(ccbin);
! 			Constraint *n = makeNode(Constraint);
  
! 			change_varattnos_of_a_node(ccbin_node, attmap);
  
! 			n->contype = CONSTR_CHECK;
! 			n->name = pstrdup(ccname);
! 			n->raw_expr = NULL;
! 			n->cooked_expr = nodeToString(ccbin_node);
! 			n->indexspace = NULL;
! 			cxt->ckconstraints = lappend(cxt->ckconstraints, (Node *) n);
  		}
  	}
  
--- 1356,1428 ----
  		}
  	}
  
! 	if (including_constraints)
  	{
! 		AttrNumber *attmap;
! 
! 		attmap = varattnos_map_schema(tupleDesc, cxt->columns);
  
! 		/*
! 		 * Copy CHECK constraints, being careful to adjust attribute
! 		 * numbers.
! 		 */
! 		if (tupleDesc->constr)
  		{
! 			int			ccnum;
  
! 			for (ccnum = 0; ccnum < tupleDesc->constr->num_check; ccnum++)
! 			{
! 				char	   *ccname = tupleDesc->constr->check[ccnum].ccname;
! 				char	   *ccbin = tupleDesc->constr->check[ccnum].ccbin;
! 				Node	   *ccbin_node = stringToNode(ccbin);
! 				Constraint *n = makeNode(Constraint);
! 
! 				change_varattnos_of_a_node(ccbin_node, attmap);
! 
! 				n->contype = CONSTR_CHECK;
! 				n->name = pstrdup(ccname);
! 				n->raw_expr = NULL;
! 				n->cooked_expr = nodeToString(ccbin_node);
! 				n->indexspace = NULL;
! 				cxt->ckconstraints = lappend(cxt->ckconstraints, (Node *) n);
! 			}
! 		}
  
! 		/* Copy constraint indexes, if any */
! 		if (relation->rd_rel->relhasindex)
! 		{
! 			List	   *parent_indexes;
! 			ListCell   *l;
! 
! 			parent_indexes = RelationGetIndexList(relation);
! 			foreach(l, parent_indexes)
! 			{
! 				Oid        		 parent_index_oid = lfirst_oid(l);
! 				Relation   		 parent_index;
! 				IndexStmt  		*index_stmt;
! 
! 				parent_index = index_open(parent_index_oid, AccessShareLock);
! 
! 				/*
! 				 * It would be best to generate IndexStmts here and append
! 				 * them to the ixconstraints list. Constraint structure does
! 				 * not contain all the index-relevant fields as well and
! 				 * hence we use IndexStmts here.
! 				 */
! 				index_stmt = generateConstraintIndexStmt(cxt, parent_index,
! 														 attmap);
! 				if (index_stmt == NULL)
! 				{
! 					index_close(parent_index, AccessShareLock);
! 					continue;
! 				}
! 
! 				/* Add the new IndexStmt to the create context */
! 				cxt->ixconstraints = lappend(cxt->ixconstraints, index_stmt);
! 
! 				/* Keep our lock on the index till xact commit */
! 				index_close(parent_index, NoLock);
! 			}
  		}
  	}
  
*************** transformIndexConstraints(ParseState *ps
*** 1396,1568 ****
  	 * Run through the constraints that need to generate an index. For PRIMARY
  	 * KEY, mark each column as NOT NULL and create an index. For UNIQUE,
  	 * create an index as for PRIMARY KEY, but do not insist on NOT NULL.
  	 */
  	foreach(listptr, cxt->ixconstraints)
  	{
! 		Constraint *constraint = lfirst(listptr);
  		ListCell   *keys;
  		IndexElem  *iparam;
  
! 		Assert(IsA(constraint, Constraint));
! 		Assert((constraint->contype == CONSTR_PRIMARY)
! 			   || (constraint->contype == CONSTR_UNIQUE));
! 
! 		index = makeNode(IndexStmt);
  
! 		index->unique = true;
! 		index->primary = (constraint->contype == CONSTR_PRIMARY);
! 		if (index->primary)
  		{
! 			if (cxt->pkey != NULL)
! 				ereport(ERROR,
! 						(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
! 						 errmsg("multiple primary keys for table \"%s\" are not allowed",
! 								cxt->relation->relname)));
! 			cxt->pkey = index;
! 
! 			/*
! 			 * In ALTER TABLE case, a primary index might already exist, but
! 			 * DefineIndex will check for it.
! 			 */
  		}
- 		index->isconstraint = true;
- 
- 		if (constraint->name != NULL)
- 			index->idxname = pstrdup(constraint->name);
  		else
! 			index->idxname = NULL;		/* DefineIndex will choose name */
! 
! 		index->relation = cxt->relation;
! 		index->accessMethod = DEFAULT_INDEX_TYPE;
! 		index->options = constraint->options;
! 		index->tableSpace = constraint->indexspace;
! 		index->indexParams = NIL;
! 		index->whereClause = NULL;
! 		index->concurrent = false;
! 
! 		/*
! 		 * Make sure referenced keys exist.  If we are making a PRIMARY KEY
! 		 * index, also make sure they are NOT NULL, if possible. (Although we
! 		 * could leave it to DefineIndex to mark the columns NOT NULL, it's
! 		 * more efficient to get it right the first time.)
! 		 */
! 		foreach(keys, constraint->keys)
! 		{
! 			char	   *key = strVal(lfirst(keys));
! 			bool		found = false;
! 			ColumnDef  *column = NULL;
! 			ListCell   *columns;
  
! 			foreach(columns, cxt->columns)
! 			{
! 				column = (ColumnDef *) lfirst(columns);
! 				Assert(IsA(column, ColumnDef));
! 				if (strcmp(column->colname, key) == 0)
! 				{
! 					found = true;
! 					break;
! 				}
! 			}
! 			if (found)
! 			{
! 				/* found column in the new table; force it to be NOT NULL */
! 				if (constraint->contype == CONSTR_PRIMARY)
! 					column->is_not_null = TRUE;
! 			}
! 			else if (SystemAttributeByName(key, cxt->hasoids) != NULL)
  			{
  				/*
! 				 * column will be a system column in the new table, so accept
! 				 * it.	System columns can't ever be null, so no need to worry
! 				 * about PRIMARY/NOT NULL constraint.
  				 */
- 				found = true;
  			}
! 			else if (cxt->inhRelations)
  			{
! 				/* try inherited tables */
! 				ListCell   *inher;
  
! 				foreach(inher, cxt->inhRelations)
  				{
! 					RangeVar   *inh = (RangeVar *) lfirst(inher);
! 					Relation	rel;
! 					int			count;
! 
! 					Assert(IsA(inh, RangeVar));
! 					rel = heap_openrv(inh, AccessShareLock);
! 					if (rel->rd_rel->relkind != RELKIND_RELATION)
! 						ereport(ERROR,
! 								(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 						   errmsg("inherited relation \"%s\" is not a table",
! 								  inh->relname)));
! 					for (count = 0; count < rel->rd_att->natts; count++)
  					{
! 						Form_pg_attribute inhattr = rel->rd_att->attrs[count];
! 						char	   *inhname = NameStr(inhattr->attname);
  
! 						if (inhattr->attisdropped)
! 							continue;
! 						if (strcmp(key, inhname) == 0)
  						{
! 							found = true;
  
! 							/*
! 							 * We currently have no easy way to force an
! 							 * inherited column to be NOT NULL at creation, if
! 							 * its parent wasn't so already. We leave it to
! 							 * DefineIndex to fix things up in this case.
! 							 */
! 							break;
  						}
  					}
- 					heap_close(rel, NoLock);
- 					if (found)
- 						break;
  				}
- 			}
  
! 			/*
! 			 * In the ALTER TABLE case, don't complain about index keys not
! 			 * created in the command; they may well exist already.
! 			 * DefineIndex will complain about them if not, and will also take
! 			 * care of marking them NOT NULL.
! 			 */
! 			if (!found && !cxt->isalter)
! 				ereport(ERROR,
! 						(errcode(ERRCODE_UNDEFINED_COLUMN),
! 						 errmsg("column \"%s\" named in key does not exist",
! 								key)));
  
! 			/* Check for PRIMARY KEY(foo, foo) */
! 			foreach(columns, index->indexParams)
! 			{
! 				iparam = (IndexElem *) lfirst(columns);
! 				if (iparam->name && strcmp(key, iparam->name) == 0)
  				{
! 					if (index->primary)
! 						ereport(ERROR,
  								(errcode(ERRCODE_DUPLICATE_COLUMN),
  								 errmsg("column \"%s\" appears twice in primary key constraint",
! 										key)));
! 					else
! 						ereport(ERROR,
  								(errcode(ERRCODE_DUPLICATE_COLUMN),
  								 errmsg("column \"%s\" appears twice in unique constraint",
! 										key)));
  				}
  			}
  
- 			/* OK, add it to the index definition */
- 			iparam = makeNode(IndexElem);
- 			iparam->name = pstrdup(key);
- 			iparam->expr = NULL;
- 			iparam->opclass = NIL;
- 			iparam->ordering = SORTBY_DEFAULT;
- 			iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
- 			index->indexParams = lappend(index->indexParams, iparam);
  		}
- 
  		indexlist = lappend(indexlist, index);
  	}
  
--- 1446,1642 ----
  	 * Run through the constraints that need to generate an index. For PRIMARY
  	 * KEY, mark each column as NOT NULL and create an index. For UNIQUE,
  	 * create an index as for PRIMARY KEY, but do not insist on NOT NULL.
+ 	 *
+ 	 * If the table is being created using LIKE INCLUDING CONSTRAINTS, the
+ 	 * ixconstraints list will contain a mix of Constraint and IndexStmt
+ 	 * entries
  	 */
  	foreach(listptr, cxt->ixconstraints)
  	{
! 		Node	   *node = lfirst(listptr);
! 		Constraint *constraint;
  		ListCell   *keys;
  		IndexElem  *iparam;
  
! 		Assert(IsA(node, Constraint) || IsA(node, IndexStmt));
  
! 		if (IsA(node, IndexStmt))
  		{
! 			index = (IndexStmt *)node;	
! 			if (index->primary)
! 			{
! 				if (cxt->pkey != NULL)
! 					ereport(ERROR,
! 							(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
! 							 errmsg("multiple primary keys for table \"%s\" are not allowed",
! 									cxt->relation->relname)));
! 				cxt->pkey = index;
! 			}
  		}
  		else
! 		{
! 			Assert(IsA(node, Constraint));
  
! 			constraint = (Constraint *)node;
! 			Assert(constraint->contype == CONSTR_PRIMARY ||
! 				   constraint->contype == CONSTR_UNIQUE);
! 			index = makeNode(IndexStmt);
! 
! 			index->unique = true;
! 			index->primary = (constraint->contype == CONSTR_PRIMARY);
! 			if (index->primary)
  			{
+ 				if (cxt->pkey != NULL)
+ 					ereport(ERROR,
+ 							(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ 							 errmsg("multiple primary keys for table \"%s\" are not allowed",
+ 									cxt->relation->relname)));
+ 				cxt->pkey = index;
+ 
  				/*
! 				 * In ALTER TABLE case, a primary index might already exist, but
! 				 * DefineIndex will check for it.
  				 */
  			}
! 			index->isconstraint = true;
! 
! 			if (constraint->name != NULL)
! 				index->idxname = pstrdup(constraint->name);
! 			else
! 				index->idxname = NULL;		/* DefineIndex will choose name */
! 
! 			index->relation = cxt->relation;
! 			index->accessMethod = DEFAULT_INDEX_TYPE;
! 			index->options = constraint->options;
! 			index->tableSpace = constraint->indexspace;
! 			index->indexParams = NIL;
! 			index->whereClause = NULL;
! 			index->concurrent = false;
! 
! 			/*
! 			 * Make sure referenced keys exist.  If we are making a PRIMARY KEY
! 			 * index, also make sure they are NOT NULL, if possible. (Although we
! 			 * could leave it to DefineIndex to mark the columns NOT NULL, it's
! 			 * more efficient to get it right the first time.)
! 			 */
! 			foreach(keys, constraint->keys)
  			{
! 				char	   *key = strVal(lfirst(keys));
! 				bool		found = false;
! 				ColumnDef  *column = NULL;
! 				ListCell   *columns;
  
! 				foreach(columns, cxt->columns)
  				{
! 					column = (ColumnDef *) lfirst(columns);
! 					Assert(IsA(column, ColumnDef));
! 					if (strcmp(column->colname, key) == 0)
  					{
! 						found = true;
! 						break;
! 					}
! 				}
! 				if (found)
! 				{
! 					/* found column in the new table; force it to be NOT NULL */
! 					if (constraint->contype == CONSTR_PRIMARY)
! 						column->is_not_null = TRUE;
! 				}
! 				else if (SystemAttributeByName(key, cxt->hasoids) != NULL)
! 				{
! 					/*
! 					 * column will be a system column in the new table, so accept
! 					 * it.	System columns can't ever be null, so no need to worry
! 					 * about PRIMARY/NOT NULL constraint.
! 					 */
! 					found = true;
! 				}
! 				else if (cxt->inhRelations)
! 				{
! 					/* try inherited tables */
! 					ListCell   *inher;
  
! 					foreach(inher, cxt->inhRelations)
! 					{
! 						RangeVar   *inh = (RangeVar *) lfirst(inher);
! 						Relation	rel;
! 						int			count;
! 
! 						Assert(IsA(inh, RangeVar));
! 						rel = heap_openrv(inh, AccessShareLock);
! 						if (rel->rd_rel->relkind != RELKIND_RELATION)
! 							ereport(ERROR,
! 									(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 									 errmsg("inherited relation \"%s\" is not a table",
! 											inh->relname)));
! 						for (count = 0; count < rel->rd_att->natts; count++)
  						{
! 							Form_pg_attribute inhattr = rel->rd_att->attrs[count];
! 							char	   *inhname = NameStr(inhattr->attname);
  
! 							if (inhattr->attisdropped)
! 								continue;
! 							if (strcmp(key, inhname) == 0)
! 							{
! 								found = true;
! 
! 								/*
! 								 * We currently have no easy way to force an
! 								 * inherited column to be NOT NULL at creation, if
! 								 * its parent wasn't so already. We leave it to
! 								 * DefineIndex to fix things up in this case.
! 								 */
! 								break;
! 							}
  						}
+ 						heap_close(rel, NoLock);
+ 						if (found)
+ 							break;
  					}
  				}
  
! 				/*
! 				 * In the ALTER TABLE case, don't complain about index keys not
! 				 * created in the command; they may well exist already.
! 				 * DefineIndex will complain about them if not, and will also take
! 				 * care of marking them NOT NULL.
! 				 */
! 				if (!found && !cxt->isalter)
! 					ereport(ERROR,
! 							(errcode(ERRCODE_UNDEFINED_COLUMN),
! 							 errmsg("column \"%s\" named in key does not exist",
! 									key)));
  
! 				/* Check for PRIMARY KEY(foo, foo) */
! 				foreach(columns, index->indexParams)
  				{
! 					iparam = (IndexElem *) lfirst(columns);
! 					if (iparam->name && strcmp(key, iparam->name) == 0)
! 					{
! 						if (index->primary)
! 							ereport(ERROR,
  								(errcode(ERRCODE_DUPLICATE_COLUMN),
  								 errmsg("column \"%s\" appears twice in primary key constraint",
! 								key)));
! 						else
! 							ereport(ERROR,
  								(errcode(ERRCODE_DUPLICATE_COLUMN),
  								 errmsg("column \"%s\" appears twice in unique constraint",
! 								key)));
! 					}
  				}
+ 
+ 				/* OK, add it to the index definition */
+ 				iparam = makeNode(IndexElem);
+ 				iparam->name = pstrdup(key);
+ 				iparam->expr = NULL;
+ 				iparam->opclass = NIL;
+ 				iparam->ordering = SORTBY_DEFAULT;
+ 				iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
+ 				index->indexParams = lappend(index->indexParams, iparam);
  			}
  
  		}
  		indexlist = lappend(indexlist, index);
  	}
  
*************** check_parameter_resolution_walker(Node *
*** 3735,3737 ****
--- 3809,4047 ----
  	return expression_tree_walker(node, check_parameter_resolution_walker,
  								  (void *) context);
  }
+ 
+ /*
+  * Generate an IndexStmt entry using information from an already
+  * existing index "source_idx". Currently this function only generates
+  * entries for unique/primary indexes, but can be easily generalized
+  * to work for any index entry.
+  */
+ static IndexStmt *
+ generateConstraintIndexStmt(CreateStmtContext *cxt, Relation source_idx,
+ 							AttrNumber *attmap)
+ {
+ 	HeapTuple			 ht_idx;
+ 	HeapTuple			 ht_idxrel;
+ 	HeapTuple			 ht_am;
+ 	Form_pg_index 		 idxrec;
+ 	Form_pg_class 		 idxrelrec;
+ 	Form_pg_am			 amrec;
+ 	List	   			*indexprs = NIL;
+ 	ListCell   			*indexpr_item;
+ 	Oid					 indrelid;
+ 	Oid 				 source_relid;
+ 	int					 keyno;
+ 	Oid					 keycoltype;
+ 	Datum				 indclassDatum;
+ 	Datum				 indoptionDatum;
+ 	bool				 isnull;
+ 	oidvector  			*indclass;
+ 	int2vector 			*indoption;
+ 	IndexStmt  			*index;
+ 	Datum				 reloptions;
+ 
+ 	source_relid = RelationGetRelid(source_idx);
+ 
+ 	/* Fetch pg_index tuple for source index */
+ 	ht_idx = SearchSysCache(INDEXRELID,
+ 							ObjectIdGetDatum(source_relid),
+ 							0, 0, 0);
+ 	if (!HeapTupleIsValid(ht_idx))
+ 		elog(ERROR, "cache lookup failed for index %u", source_relid);
+ 	idxrec = (Form_pg_index) GETSTRUCT(ht_idx);
+ 
+ 	/* If this is not a constraint index (UNIQUE or PK), we're done */
+ 	if (idxrec->indisprimary == false && idxrec->indisunique == false)
+ 	{
+ 		ReleaseSysCache(ht_idx);
+ 		return NULL;
+ 	}
+ 
+ 	Assert(source_relid == idxrec->indexrelid);
+ 	indrelid = idxrec->indrelid;
+ 
+ 	index = makeNode(IndexStmt);
+ 	index->unique = idxrec->indisunique;
+ 	index->concurrent = false;
+ 	index->primary = idxrec->indisprimary;
+ 	index->relation = cxt->relation;
+ 	index->isconstraint = false;
+ 
+ 	/*
+ 	 * We don't try to preserve the name of the source index; instead, just
+ 	 * let DefineIndex() choose a reasonable name.
+ 	 */
+ 	index->idxname = NULL;
+ 
+ 	/* Must get indclass and indoption the hard way */
+ 	indclassDatum = SysCacheGetAttr(INDEXRELID, ht_idx,
+ 									Anum_pg_index_indclass, &isnull);
+ 	Assert(!isnull);
+ 	indclass = (oidvector *) DatumGetPointer(indclassDatum);
+ 	indoptionDatum = SysCacheGetAttr(INDEXRELID, ht_idx,
+ 									 Anum_pg_index_indoption, &isnull);
+ 	Assert(!isnull);
+ 	indoption = (int2vector *) DatumGetPointer(indoptionDatum);
+ 
+ 	/* Fetch pg_class tuple of source index */
+ 	ht_idxrel = SearchSysCache(RELOID,
+ 							   ObjectIdGetDatum(source_relid),
+ 							   0, 0, 0);
+ 	if (!HeapTupleIsValid(ht_idxrel))
+ 		elog(ERROR, "cache lookup failed for relation %u", source_relid);
+ 
+ 	/*
+ 	 * Store the reloptions for later use by this new index
+ 	 */
+ 	reloptions = SysCacheGetAttr(RELOID, ht_idxrel,
+ 								 Anum_pg_class_reloptions, &isnull);
+ 	if (!isnull)
+ 		index->inhreloptions = flatten_reloptions(source_relid);
+ 
+ 	idxrelrec = (Form_pg_class) GETSTRUCT(ht_idxrel);
+ 
+ 	/* Fetch pg_am tuple for the index's access method */
+ 	ht_am = SearchSysCache(AMOID,
+ 						   ObjectIdGetDatum(idxrelrec->relam),
+ 						   0, 0, 0);
+ 	if (!HeapTupleIsValid(ht_am))
+ 		elog(ERROR, "cache lookup failed for access method %u",
+ 			 idxrelrec->relam);
+ 	amrec = (Form_pg_am) GETSTRUCT(ht_am);
+ 	index->accessMethod = pstrdup(NameStr(amrec->amname));
+ 
+ 	/*
+ 	 * Get the index expressions, if any.
+ 	 */
+ 	if (!heap_attisnull(ht_idx, Anum_pg_index_indexprs))
+ 	{
+ 		Datum		exprsDatum;
+ 		bool		isnull;
+ 		char	   *exprsString;
+ 
+ 		exprsDatum = SysCacheGetAttr(INDEXRELID, ht_idx,
+ 									 Anum_pg_index_indexprs, &isnull);
+ 		exprsString = DatumGetCString(DirectFunctionCall1(textout,
+ 														  exprsDatum));
+ 		Assert(!isnull);
+ 		indexprs = (List *) stringToNode(exprsString);
+ 	}
+ 
+ 	indexpr_item = list_head(indexprs);
+ 
+ 	for (keyno = 0; keyno < idxrec->indnatts; keyno++)
+ 	{
+ 		IndexElem	*iparam;
+ 		AttrNumber	attnum = idxrec->indkey.values[keyno];
+ 		int16		opt = indoption->values[keyno];
+ 
+ 		iparam = makeNode(IndexElem);
+ 
+ 		if (attnum != 0)
+ 		{
+ 			/* Simple index column */
+ 			char	   *attname;
+ 
+ 			attname = get_relid_attribute_name(indrelid, attnum);
+ 			keycoltype = get_atttype(indrelid, attnum);
+ 
+ 			iparam->name = attname;
+ 			iparam->expr = NULL;
+ 		}
+ 		else
+ 		{
+ 			/* Expressional index */
+ 			Node	   *indexkey;
+ 
+ 			if (indexpr_item == NULL)
+ 				elog(ERROR, "too few entries in indexprs list");
+ 			indexkey = (Node *) lfirst(indexpr_item);
+ 			change_varattnos_of_a_node(indexkey, attmap);
+ 			iparam->name = NULL;
+ 			iparam->expr = indexkey;
+ 
+ 			indexpr_item = lnext(indexpr_item);
+ 			keycoltype = exprType(indexkey);
+ 		}
+ 
+ 		/* Add the operator class name */
+ 		get_opclass(indclass->values[keyno], keycoltype, &(iparam->opclass));
+ 
+ 		iparam->ordering = SORTBY_DEFAULT;
+ 		iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
+ 
+ 		/* Adjust options if relevant */
+ 		if (amrec->amcanorder)
+ 		{
+ 			/* If it supports sort ordering, report DESC and NULLS opts */
+ 			if (opt & INDOPTION_DESC)
+ 				iparam->ordering = SORTBY_DESC;
+ 			if (opt & INDOPTION_NULLS_FIRST)
+ 				iparam->nulls_ordering = SORTBY_NULLS_FIRST;
+ 		}
+ 
+ 		index->indexParams = lappend(index->indexParams, iparam);
+ 	}
+ 
+ 	/* Use the same tablespace as the source index */
+ 	index->tableSpace = get_tablespace_name(source_idx->rd_node.spcNode);
+ 
+ 	/*
+ 	 * If it's a partial index, decompile and append the predicate
+ 	 */
+ 	if (!heap_attisnull(ht_idx, Anum_pg_index_indpred))
+ 	{
+ 		Datum		pred_datum;
+ 		bool		isnull;
+ 		char	   *pred_str;
+ 
+ 		/* Convert text string to node tree */
+ 		pred_datum = SysCacheGetAttr(INDEXRELID, ht_idx,
+ 									 Anum_pg_index_indpred, &isnull);
+ 		Assert(!isnull);
+ 		pred_str = DatumGetCString(DirectFunctionCall1(textout,
+ 													   pred_datum));
+ 		index->whereClause = (Node *) stringToNode(pred_str);
+ 		change_varattnos_of_a_node(index->whereClause, attmap);
+ 	}
+ 
+ 	/* Clean up */
+ 	ReleaseSysCache(ht_idx);
+ 	ReleaseSysCache(ht_idxrel);
+ 	ReleaseSysCache(ht_am);
+ 
+ 	return index;
+ }
+ 
+ /*
+  * get_opclass			- fetch name of an index operator class
+  *
+  * If the opclass is the default for the given actual_datatype, then
+  * name_list is NIL.
+  */
+ static void
+ get_opclass(Oid opclass, Oid actual_datatype, List **name_list)
+ {
+ 	HeapTuple			ht_opc;
+ 	Form_pg_opclass 	opc_rec;
+ 
+ 	*name_list = NIL;
+ 
+ 	ht_opc = SearchSysCache(CLAOID,
+ 							ObjectIdGetDatum(opclass),
+ 							0, 0, 0);
+ 	if (!HeapTupleIsValid(ht_opc))
+ 		elog(ERROR, "cache lookup failed for opclass %u", opclass);
+ 	opc_rec = (Form_pg_opclass) GETSTRUCT(ht_opc);
+ 
+ 	if (!OidIsValid(actual_datatype) ||
+ 		GetDefaultOpClass(actual_datatype, opc_rec->opcmethod) != opclass)
+ 	{
+ 		char *nsp_name = get_namespace_name(opc_rec->opcnamespace);
+ 		char *opc_name = NameStr(opc_rec->opcname);
+ 
+ 		*name_list = list_make2(makeString(nsp_name), makeString(opc_name));
+ 	}
+ 
+ 	ReleaseSysCache(ht_opc);
+ }
Index: src/backend/tcop/utility.c
===================================================================
RCS file: /home/neilc/postgres/cvs_root/pgsql/src/backend/tcop/utility.c,v
retrieving revision 1.280
diff -p -c -r1.280 utility.c
*** src/backend/tcop/utility.c	30 May 2007 20:12:01 -0000	1.280
--- src/backend/tcop/utility.c	2 Jun 2007 19:46:07 -0000
*************** ProcessUtility(Node *parsetree,
*** 823,828 ****
--- 823,829 ----
  							stmt->indexParams,	/* parameters */
  							(Expr *) stmt->whereClause,
  							stmt->options,
+ 							stmt->inhreloptions,
  							stmt->unique,
  							stmt->primary,
  							stmt->isconstraint,
Index: src/backend/utils/adt/ruleutils.c
===================================================================
RCS file: /home/neilc/postgres/cvs_root/pgsql/src/backend/utils/adt/ruleutils.c,v
retrieving revision 1.258
diff -p -c -r1.258 ruleutils.c
*** src/backend/utils/adt/ruleutils.c	24 May 2007 18:58:42 -0000	1.258
--- src/backend/utils/adt/ruleutils.c	2 Jun 2007 19:46:07 -0000
*************** static char *generate_relation_name(Oid 
*** 193,199 ****
  static char *generate_function_name(Oid funcid, int nargs, Oid *argtypes);
  static char *generate_operator_name(Oid operid, Oid arg1, Oid arg2);
  static text *string_to_text(char *str);
- static char *flatten_reloptions(Oid relid);
  
  #define only_marker(rte)  ((rte)->inh ? "" : "ONLY ")
  
--- 193,198 ----
*************** string_to_text(char *str)
*** 5378,5384 ****
  /*
   * Generate a C string representing a relation's reloptions, or NULL if none.
   */
! static char *
  flatten_reloptions(Oid relid)
  {
  	char	   *result = NULL;
--- 5377,5383 ----
  /*
   * Generate a C string representing a relation's reloptions, or NULL if none.
   */
! char *
  flatten_reloptions(Oid relid)
  {
  	char	   *result = NULL;
*************** flatten_reloptions(Oid relid)
*** 5414,5416 ****
--- 5413,5443 ----
  
  	return result;
  }
+ 
+ /*
+  * Generate an Array Datum representing a relation's reloptions using a char
+  * string
+  */
+ Datum
+ deflatten_reloptions(char *reloptstring)
+ {
+ 	Datum		result = (Datum) 0;
+ 
+ 	if (reloptstring)
+ 	{
+ 		Datum		sep, relopts;
+ 
+ 		/*
+ 		 * We want to use text_to_array(reloptstring, ', ') --- but
+ 		 * DirectFunctionCall2(text_to_array) does not work, because
+ 		 * text_to_array() relies on fcinfo to be valid.  So use
+ 		 * OidFunctionCall2.
+ 		 */
+ 		sep = DirectFunctionCall1(textin, CStringGetDatum(", "));
+ 		relopts = DirectFunctionCall1(textin, CStringGetDatum(reloptstring));
+ 
+ 		result = OidFunctionCall2(F_TEXT_TO_ARRAY, relopts, sep);
+ 	}
+ 
+ 	return result;
+ }
Index: src/include/commands/defrem.h
===================================================================
RCS file: /home/neilc/postgres/cvs_root/pgsql/src/include/commands/defrem.h,v
retrieving revision 1.81
diff -p -c -r1.81 defrem.h
*** src/include/commands/defrem.h	13 Mar 2007 00:33:43 -0000	1.81
--- src/include/commands/defrem.h	2 Jun 2007 19:46:07 -0000
*************** extern void DefineIndex(RangeVar *heapRe
*** 26,31 ****
--- 26,32 ----
  			List *attributeList,
  			Expr *predicate,
  			List *options,
+ 			char *inhreloptions,
  			bool unique,
  			bool primary,
  			bool isconstraint,
Index: src/include/nodes/parsenodes.h
===================================================================
RCS file: /home/neilc/postgres/cvs_root/pgsql/src/include/nodes/parsenodes.h,v
retrieving revision 1.348
diff -p -c -r1.348 parsenodes.h
*** src/include/nodes/parsenodes.h	27 Apr 2007 22:05:49 -0000	1.348
--- src/include/nodes/parsenodes.h	2 Jun 2007 19:46:07 -0000
*************** typedef struct IndexStmt
*** 1499,1504 ****
--- 1499,1505 ----
  	char	   *tableSpace;		/* tablespace, or NULL to use parent's */
  	List	   *indexParams;	/* a list of IndexElem */
  	List	   *options;		/* options from WITH clause */
+ 	char	   *inhreloptions;	/* relopts inherited from parent index */
  	Node	   *whereClause;	/* qualification (partial-index predicate) */
  	bool		unique;			/* is index unique? */
  	bool		primary;		/* is index on primary key? */
Index: src/include/utils/builtins.h
===================================================================
RCS file: /home/neilc/postgres/cvs_root/pgsql/src/include/utils/builtins.h,v
retrieving revision 1.294
diff -p -c -r1.294 builtins.h
*** src/include/utils/builtins.h	1 Jun 2007 23:40:19 -0000	1.294
--- src/include/utils/builtins.h	2 Jun 2007 19:46:07 -0000
*************** extern List *deparse_context_for_plan(No
*** 571,576 ****
--- 571,578 ----
  extern const char *quote_identifier(const char *ident);
  extern char *quote_qualified_identifier(const char *namespace,
  						   const char *ident);
+ extern char *flatten_reloptions(Oid relid);
+ extern Datum deflatten_reloptions(char *reloptstring);
  
  /* tid.c */
  extern Datum tidin(PG_FUNCTION_ARGS);
Index: src/test/regress/expected/inherit.out
===================================================================
RCS file: /home/neilc/postgres/cvs_root/pgsql/src/test/regress/expected/inherit.out,v
retrieving revision 1.20
diff -p -c -r1.20 inherit.out
*** src/test/regress/expected/inherit.out	27 Jun 2006 03:43:20 -0000	1.20
--- src/test/regress/expected/inherit.out	2 Jun 2007 19:46:07 -0000
*************** INSERT INTO inhg VALUES ('foo');
*** 622,638 ****
  DROP TABLE inhg;
  CREATE TABLE inhg (x text, LIKE inhx INCLUDING CONSTRAINTS, y text); /* Copies constraints */
  INSERT INTO inhg VALUES ('x', 'text', 'y'); /* Succeeds */
! INSERT INTO inhg VALUES ('x', 'text', 'y'); /* Succeeds -- Unique constraints not copied */
  INSERT INTO inhg VALUES ('x', 'foo',  'y');  /* fails due to constraint */
  ERROR:  new row for relation "inhg" violates check constraint "foo"
  SELECT * FROM inhg; /* Two records with three columns in order x=x, xx=text, y=y */
   x |  xx  | y 
  ---+------+---
   x | text | y
!  x | text | y
! (2 rows)
  
  DROP TABLE inhg;
  -- Test changing the type of inherited columns
  insert into d values('test','one','two','three');
  alter table a alter column aa type integer using bit_length(aa);
--- 622,650 ----
  DROP TABLE inhg;
  CREATE TABLE inhg (x text, LIKE inhx INCLUDING CONSTRAINTS, y text); /* Copies constraints */
  INSERT INTO inhg VALUES ('x', 'text', 'y'); /* Succeeds */
! INSERT INTO inhg VALUES ('x', 'text', 'y'); /* Fails -- Unique constraints copied */
! ERROR:  duplicate key violates unique constraint "inhg_pkey"
  INSERT INTO inhg VALUES ('x', 'foo',  'y');  /* fails due to constraint */
  ERROR:  new row for relation "inhg" violates check constraint "foo"
  SELECT * FROM inhg; /* Two records with three columns in order x=x, xx=text, y=y */
   x |  xx  | y 
  ---+------+---
   x | text | y
! (1 row)
  
  DROP TABLE inhg;
+ /* Multiple primary keys creation should fail */
+ CREATE TABLE inhg (x text, LIKE inhx INCLUDING CONSTRAINTS, PRIMARY KEY(x)); /* fails */
+ ERROR:  multiple primary keys for table "inhg" are not allowed
+ CREATE TABLE inhz (xx text DEFAULT 'text', yy int UNIQUE);
+ NOTICE:  CREATE TABLE / UNIQUE will create implicit index "inhz_yy_key" for table "inhz"
+ /* Ok to create multiple unique indexes */
+ CREATE TABLE inhg (x text UNIQUE, LIKE inhz INCLUDING CONSTRAINTS);
+ NOTICE:  CREATE TABLE / UNIQUE will create implicit index "inhg_x_key" for table "inhg"
+ DROP TABLE inhg;
+ DROP TABLE inhz;
+ CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, y text); /* Unimplemented */
+ ERROR:  LIKE INCLUDING INDEXES is not implemented
  -- Test changing the type of inherited columns
  insert into d values('test','one','two','three');
  alter table a alter column aa type integer using bit_length(aa);
Index: src/test/regress/sql/inherit.sql
===================================================================
RCS file: /home/neilc/postgres/cvs_root/pgsql/src/test/regress/sql/inherit.sql,v
retrieving revision 1.10
diff -p -c -r1.10 inherit.sql
*** src/test/regress/sql/inherit.sql	27 Jun 2006 03:43:20 -0000	1.10
--- src/test/regress/sql/inherit.sql	2 Jun 2007 19:46:07 -0000
*************** INSERT INTO inhg VALUES ('foo');
*** 151,161 ****
  DROP TABLE inhg;
  CREATE TABLE inhg (x text, LIKE inhx INCLUDING CONSTRAINTS, y text); /* Copies constraints */
  INSERT INTO inhg VALUES ('x', 'text', 'y'); /* Succeeds */
! INSERT INTO inhg VALUES ('x', 'text', 'y'); /* Succeeds -- Unique constraints not copied */
  INSERT INTO inhg VALUES ('x', 'foo',  'y');  /* fails due to constraint */
  SELECT * FROM inhg; /* Two records with three columns in order x=x, xx=text, y=y */
  DROP TABLE inhg;
  
  
  -- Test changing the type of inherited columns
  insert into d values('test','one','two','three');
--- 151,169 ----
  DROP TABLE inhg;
  CREATE TABLE inhg (x text, LIKE inhx INCLUDING CONSTRAINTS, y text); /* Copies constraints */
  INSERT INTO inhg VALUES ('x', 'text', 'y'); /* Succeeds */
! INSERT INTO inhg VALUES ('x', 'text', 'y'); /* Fails -- Unique constraints copied */
  INSERT INTO inhg VALUES ('x', 'foo',  'y');  /* fails due to constraint */
  SELECT * FROM inhg; /* Two records with three columns in order x=x, xx=text, y=y */
  DROP TABLE inhg;
  
+ /* Multiple primary keys creation should fail */
+ CREATE TABLE inhg (x text, LIKE inhx INCLUDING CONSTRAINTS, PRIMARY KEY(x)); /* fails */
+ CREATE TABLE inhz (xx text DEFAULT 'text', yy int UNIQUE);
+ /* Ok to create multiple unique indexes */
+ CREATE TABLE inhg (x text UNIQUE, LIKE inhz INCLUDING CONSTRAINTS);
+ DROP TABLE inhg;
+ DROP TABLE inhz;
+ CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, y text); /* Unimplemented */
  
  -- Test changing the type of inherited columns
  insert into d values('test','one','two','three');
---------------------------(end of broadcast)---------------------------
TIP 7: You can help support the PostgreSQL project by donating at

                http://www.postgresql.org/about/donate

Reply via email to