I wrote: > change_varattnos_of_a_node(), which is used to adjust expressions > referencing a parent relation or LIKE source relation to refer to the > child relation, ignores whole-row Vars (those with attnum zero). > ... > My inclination, especially in the back branches, is to just throw error > if we find a whole-row Var. > ... > There are a bunch of other things I don't like about > change_varattnos_of_a_node, starting with the name, but those gripes are > in the nature of cosmetics or inadequate error checking and wouldn't > have any user-visible impact if changed.
Attached is a draft patch that adds error reports for whole-row Vars and replaces change_varattnos_of_a_node with a more robust function. The latter makes it kind of a large patch, but it's difficult to make it a whole lot simpler unless we are willing to have the error messages say only "cannot convert whole-row table reference", without any indication of where the problem is. That seems a tad unfriendly to me. Comments? regards, tom lane
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 70753e33e430237a66991bac0740b8761c1b4e4e..d69809a2f866de511e7ce278b8dca9bf9db33d40 100644 *** a/src/backend/commands/tablecmds.c --- b/src/backend/commands/tablecmds.c *************** *** 68,73 **** --- 68,74 ---- #include "parser/parser.h" #include "rewrite/rewriteDefine.h" #include "rewrite/rewriteHandler.h" + #include "rewrite/rewriteManip.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "storage/lock.h" *************** static void truncate_check_rel(Relation *** 253,259 **** static List *MergeAttributes(List *schema, List *supers, char relpersistence, List **supOids, List **supconstr, int *supOidCount); static bool MergeCheckConstraint(List *constraints, char *name, Node *expr); - static bool change_varattnos_walker(Node *node, const AttrNumber *newattno); static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel); static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel); static void StoreCatalogInheritance(Oid relationId, List *supers); --- 254,259 ---- *************** MergeAttributes(List *schema, List *supe *** 1496,1502 **** * parents after the first one, nor if we have dropped columns.) */ newattno = (AttrNumber *) ! palloc(tupleDesc->natts * sizeof(AttrNumber)); for (parent_attno = 1; parent_attno <= tupleDesc->natts; parent_attno++) --- 1496,1502 ---- * parents after the first one, nor if we have dropped columns.) */ newattno = (AttrNumber *) ! palloc0(tupleDesc->natts * sizeof(AttrNumber)); for (parent_attno = 1; parent_attno <= tupleDesc->natts; parent_attno++) *************** MergeAttributes(List *schema, List *supe *** 1510,1524 **** * Ignore dropped columns in the parent. */ if (attribute->attisdropped) ! { ! /* ! * change_varattnos_of_a_node asserts that this is greater ! * than zero, so if anything tries to use it, we should find ! * out. ! */ ! newattno[parent_attno - 1] = 0; ! continue; ! } /* * Does it conflict with some previously inherited column? --- 1510,1516 ---- * Ignore dropped columns in the parent. */ if (attribute->attisdropped) ! continue; /* leave newattno entry as zero */ /* * Does it conflict with some previously inherited column? *************** MergeAttributes(List *schema, List *supe *** 1656,1669 **** { char *name = check[i].ccname; Node *expr; /* ignore if the constraint is non-inheritable */ if (check[i].ccnoinherit) continue; ! /* adjust varattnos of ccbin here */ ! expr = stringToNode(check[i].ccbin); ! change_varattnos_of_a_node(expr, newattno); /* check for duplicate */ if (!MergeCheckConstraint(constraints, name, expr)) --- 1648,1677 ---- { char *name = check[i].ccname; Node *expr; + bool found_whole_row; /* ignore if the constraint is non-inheritable */ if (check[i].ccnoinherit) continue; ! /* Adjust Vars to match new table's column numbering */ ! expr = map_variable_attnos(stringToNode(check[i].ccbin), ! 1, 0, ! newattno, tupleDesc->natts, ! &found_whole_row); ! ! /* ! * For the moment we have to reject whole-row variables. ! * We could convert them, if we knew the new table's rowtype ! * OID, but that hasn't been assigned yet. ! */ ! if (found_whole_row) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("cannot convert whole-row table reference"), ! errdetail("Constraint \"%s\" contains a whole-row reference to table \"%s\".", ! name, ! RelationGetRelationName(relation)))); /* check for duplicate */ if (!MergeCheckConstraint(constraints, name, expr)) *************** MergeCheckConstraint(List *constraints, *** 1867,1967 **** /* - * Replace varattno values in an expression tree according to the given - * map array, that is, varattno N is replaced by newattno[N-1]. It is - * caller's responsibility to ensure that the array is long enough to - * define values for all user varattnos present in the tree. System column - * attnos remain unchanged. - * - * Note that the passed node tree is modified in-place! - */ - void - change_varattnos_of_a_node(Node *node, const AttrNumber *newattno) - { - /* no setup needed, so away we go */ - (void) change_varattnos_walker(node, newattno); - } - - static bool - change_varattnos_walker(Node *node, const AttrNumber *newattno) - { - if (node == NULL) - return false; - if (IsA(node, Var)) - { - Var *var = (Var *) node; - - if (var->varlevelsup == 0 && var->varno == 1 && - var->varattno > 0) - { - /* - * ??? the following may be a problem when the node is multiply - * referenced though stringToNode() doesn't create such a node - * currently. - */ - Assert(newattno[var->varattno - 1] > 0); - var->varattno = var->varoattno = newattno[var->varattno - 1]; - } - return false; - } - return expression_tree_walker(node, change_varattnos_walker, - (void *) newattno); - } - - /* - * Generate a map for change_varattnos_of_a_node from old and new TupleDesc's, - * matching according to column name. - */ - AttrNumber * - varattnos_map(TupleDesc olddesc, TupleDesc newdesc) - { - AttrNumber *attmap; - int i, - j; - - attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * olddesc->natts); - for (i = 1; i <= olddesc->natts; i++) - { - if (olddesc->attrs[i - 1]->attisdropped) - continue; /* leave the entry as zero */ - - for (j = 1; j <= newdesc->natts; j++) - { - if (strcmp(NameStr(olddesc->attrs[i - 1]->attname), - NameStr(newdesc->attrs[j - 1]->attname)) == 0) - { - attmap[i - 1] = j; - break; - } - } - } - return attmap; - } - - /* - * Generate a map for change_varattnos_of_a_node from a TupleDesc and a list - * of ColumnDefs - */ - AttrNumber * - varattnos_map_schema(TupleDesc old, List *schema) - { - AttrNumber *attmap; - int i; - - attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * old->natts); - for (i = 1; i <= old->natts; i++) - { - if (old->attrs[i - 1]->attisdropped) - continue; /* leave the entry as zero */ - - attmap[i - 1] = findAttrByName(NameStr(old->attrs[i - 1]->attname), - schema); - } - return attmap; - } - - - /* * StoreCatalogInheritance * Updates the system catalogs with proper inheritance information. * --- 1875,1880 ---- diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index adc0d5b775e9754941d880722ea2fb7f4bf704fd..55a74eeaf448b825a091e50d8717082c89a71c5f 100644 *** a/src/backend/parser/parse_utilcmd.c --- b/src/backend/parser/parse_utilcmd.c *************** static void transformOfType(CreateStmtCo *** 108,114 **** TypeName *ofTypename); static char *chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt); static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt, ! Relation parent_index, AttrNumber *attmap); static List *get_collation(Oid collation, Oid actual_datatype); static List *get_opclass(Oid opclass, Oid actual_datatype); static void transformIndexConstraints(CreateStmtContext *cxt); --- 108,115 ---- TypeName *ofTypename); static char *chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt); static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt, ! Relation source_idx, ! const AttrNumber *attmap, int attmap_length); static List *get_collation(Oid collation, Oid actual_datatype); static List *get_opclass(Oid opclass, Oid actual_datatype); static void transformIndexConstraints(CreateStmtContext *cxt); *************** transformTableLikeClause(CreateStmtConte *** 634,639 **** --- 635,641 ---- Relation relation; TupleDesc tupleDesc; TupleConstr *constr; + AttrNumber *attmap; AclResult aclresult; char *comment; ParseCallbackState pcbstate; *************** transformTableLikeClause(CreateStmtConte *** 677,682 **** --- 679,691 ---- constr = tupleDesc->constr; /* + * Initialize column number map for map_variable_attnos(). We need this + * since dropped columns in the source table aren't copied, so the new + * table can have different column numbers. + */ + attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * tupleDesc->natts); + + /* * Insert the copied attributes into the cxt for the new table definition. */ for (parent_attno = 1; parent_attno <= tupleDesc->natts; *************** transformTableLikeClause(CreateStmtConte *** 687,693 **** ColumnDef *def; /* ! * Ignore dropped columns in the parent. */ if (attribute->attisdropped) continue; --- 696,702 ---- ColumnDef *def; /* ! * Ignore dropped columns in the parent. attmap entry is left zero. */ if (attribute->attisdropped) continue; *************** transformTableLikeClause(CreateStmtConte *** 718,723 **** --- 727,734 ---- */ cxt->columns = lappend(cxt->columns, def); + attmap[parent_attno - 1] = list_length(cxt->columns); + /* * Copy default, if present and the default has been requested */ *************** transformTableLikeClause(CreateStmtConte *** 776,797 **** /* * Copy CHECK constraints if requested, being careful to adjust attribute ! * numbers */ if ((table_like_clause->options & CREATE_TABLE_LIKE_CONSTRAINTS) && tupleDesc->constr) { - AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns); int ccnum; for (ccnum = 0; ccnum < tupleDesc->constr->num_check; ccnum++) { char *ccname = tupleDesc->constr->check[ccnum].ccname; char *ccbin = tupleDesc->constr->check[ccnum].ccbin; - Node *ccbin_node = stringToNode(ccbin); Constraint *n = makeNode(Constraint); ! change_varattnos_of_a_node(ccbin_node, attmap); n->contype = CONSTR_CHECK; n->location = -1; --- 787,825 ---- /* * Copy CHECK constraints if requested, being careful to adjust attribute ! * numbers so they match the child. */ if ((table_like_clause->options & CREATE_TABLE_LIKE_CONSTRAINTS) && tupleDesc->constr) { int ccnum; for (ccnum = 0; ccnum < tupleDesc->constr->num_check; ccnum++) { char *ccname = tupleDesc->constr->check[ccnum].ccname; char *ccbin = tupleDesc->constr->check[ccnum].ccbin; Constraint *n = makeNode(Constraint); + Node *ccbin_node; + bool found_whole_row; ! ccbin_node = map_variable_attnos(stringToNode(ccbin), ! 1, 0, ! attmap, tupleDesc->natts, ! &found_whole_row); ! ! /* ! * We reject whole-row variables because the whole point of LIKE ! * is that the new table's rowtype might later diverge from the ! * parent's. So, while translation might be possible right now, ! * it wouldn't be possible to guarantee it would work in future. ! */ ! if (found_whole_row) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("cannot convert whole-row table reference"), ! errdetail("Constraint \"%s\" contains a whole-row reference to table \"%s\".", ! ccname, ! RelationGetRelationName(relation)))); n->contype = CONSTR_CHECK; n->location = -1; *************** transformTableLikeClause(CreateStmtConte *** 827,833 **** if ((table_like_clause->options & CREATE_TABLE_LIKE_INDEXES) && relation->rd_rel->relhasindex) { - AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns); List *parent_indexes; ListCell *l; --- 855,860 ---- *************** transformTableLikeClause(CreateStmtConte *** 842,848 **** parent_index = index_open(parent_index_oid, AccessShareLock); /* Build CREATE INDEX statement to recreate the parent_index */ ! index_stmt = generateClonedIndexStmt(cxt, parent_index, attmap); /* Copy comment on index */ if (table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS) --- 869,876 ---- parent_index = index_open(parent_index_oid, AccessShareLock); /* Build CREATE INDEX statement to recreate the parent_index */ ! index_stmt = generateClonedIndexStmt(cxt, parent_index, ! attmap, tupleDesc->natts); /* Copy comment on index */ if (table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS) *************** chooseIndexName(const RangeVar *relation *** 961,967 **** */ static IndexStmt * generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, ! AttrNumber *attmap) { Oid source_relid = RelationGetRelid(source_idx); Form_pg_attribute *attrs = RelationGetDescr(source_idx)->attrs; --- 989,995 ---- */ static IndexStmt * generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, ! const AttrNumber *attmap, int attmap_length) { Oid source_relid = RelationGetRelid(source_idx); Form_pg_attribute *attrs = RelationGetDescr(source_idx)->attrs; *************** generateClonedIndexStmt(CreateStmtContex *** 1149,1162 **** { /* Expressional index */ Node *indexkey; if (indexpr_item == NULL) elog(ERROR, "too few entries in indexprs list"); indexkey = (Node *) lfirst(indexpr_item); indexpr_item = lnext(indexpr_item); ! /* OK to modify indexkey since we are working on a private copy */ ! change_varattnos_of_a_node(indexkey, attmap); iparam->name = NULL; iparam->expr = indexkey; --- 1177,1202 ---- { /* Expressional index */ Node *indexkey; + bool found_whole_row; if (indexpr_item == NULL) elog(ERROR, "too few entries in indexprs list"); indexkey = (Node *) lfirst(indexpr_item); indexpr_item = lnext(indexpr_item); ! /* Adjust Vars to match new table's column numbering */ ! indexkey = map_variable_attnos(indexkey, ! 1, 0, ! attmap, attmap_length, ! &found_whole_row); ! ! /* As in transformTableLikeClause, reject whole-row variables */ ! if (found_whole_row) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("cannot convert whole-row table reference"), ! errdetail("Index \"%s\" contains a whole-row table reference.", ! RelationGetRelationName(source_idx)))); iparam->name = NULL; iparam->expr = indexkey; *************** generateClonedIndexStmt(CreateStmtContex *** 1213,1224 **** if (!isnull) { char *pred_str; /* Convert text string to node tree */ pred_str = TextDatumGetCString(datum); ! index->whereClause = (Node *) stringToNode(pred_str); ! /* Adjust attribute numbers */ ! change_varattnos_of_a_node(index->whereClause, attmap); } /* Clean up */ --- 1253,1280 ---- if (!isnull) { char *pred_str; + Node *pred_tree; + bool found_whole_row; /* Convert text string to node tree */ pred_str = TextDatumGetCString(datum); ! pred_tree = (Node *) stringToNode(pred_str); ! ! /* Adjust Vars to match new table's column numbering */ ! pred_tree = map_variable_attnos(pred_tree, ! 1, 0, ! attmap, attmap_length, ! &found_whole_row); ! ! /* As in transformTableLikeClause, reject whole-row variables */ ! if (found_whole_row) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("cannot convert whole-row table reference"), ! errdetail("Index \"%s\" contains a whole-row table reference.", ! RelationGetRelationName(source_idx)))); ! ! index->whereClause = pred_tree; } /* Clean up */ diff --git a/src/backend/rewrite/rewriteManip.c b/src/backend/rewrite/rewriteManip.c index ec6b5bf5a924a8655227be1eeaffeefa88927611..9c778efd1c39849726c775aab55e3bdf3d97487f 100644 *** a/src/backend/rewrite/rewriteManip.c --- b/src/backend/rewrite/rewriteManip.c *************** replace_rte_variables_mutator(Node *node *** 1218,1223 **** --- 1218,1336 ---- /* + * map_variable_attnos() finds all user-column Vars in an expression tree + * that reference a particular RTE, and adjusts their varattnos according + * to the given mapping array (varattno n is replaced by attno_map[n-1]). + * Vars for system columns are not modified. + * + * A zero in the mapping array represents a dropped column, which should not + * appear in the expression. + * + * If the expression tree contains a whole-row Var for the target RTE, + * the Var is not changed but *found_whole_row is returned as TRUE. + * For most callers this is an error condition, but we leave it to the caller + * to report the error so that useful context can be provided. (In some + * usages it would be appropriate to modify the Var's vartype and insert a + * ConvertRowtypeExpr node to map back to the original vartype. We might + * someday extend this function's API to support that. For now, the only + * concession to that future need is that this function is a tree mutator + * not just a walker.) + * + * This could be built using replace_rte_variables and a callback function, + * but since we don't ever need to insert sublinks, replace_rte_variables is + * overly complicated. + */ + + typedef struct + { + int target_varno; /* RTE index to search for */ + int sublevels_up; /* (current) nesting depth */ + const AttrNumber *attno_map; /* map array for user attnos */ + int map_length; /* number of entries in attno_map[] */ + bool *found_whole_row; /* output flag */ + } map_variable_attnos_context; + + static Node * + map_variable_attnos_mutator(Node *node, + map_variable_attnos_context *context) + { + if (node == NULL) + return NULL; + if (IsA(node, Var)) + { + Var *var = (Var *) node; + + if (var->varno == context->target_varno && + var->varlevelsup == context->sublevels_up) + { + /* Found a matching variable, make the substitution */ + Var *newvar = (Var *) palloc(sizeof(Var)); + int attno = var->varattno; + + *newvar = *var; + if (attno > 0) + { + /* user-defined column, replace attno */ + if (attno > context->map_length || + context->attno_map[attno - 1] == 0) + elog(ERROR, "unexpected varattno %d in expression to be mapped", + attno); + newvar->varattno = newvar->varoattno = context->attno_map[attno - 1]; + } + else if (attno == 0) + { + /* whole-row variable, warn caller */ + *(context->found_whole_row) = true; + } + return (Node *) newvar; + } + /* otherwise fall through to copy the var normally */ + } + else if (IsA(node, Query)) + { + /* Recurse into RTE subquery or not-yet-planned sublink subquery */ + Query *newnode; + + context->sublevels_up++; + newnode = query_tree_mutator((Query *) node, + map_variable_attnos_mutator, + (void *) context, + 0); + context->sublevels_up--; + return (Node *) newnode; + } + return expression_tree_mutator(node, map_variable_attnos_mutator, + (void *) context); + } + + Node * + map_variable_attnos(Node *node, + int target_varno, int sublevels_up, + const AttrNumber *attno_map, int map_length, + bool *found_whole_row) + { + map_variable_attnos_context context; + + context.target_varno = target_varno; + context.sublevels_up = sublevels_up; + context.attno_map = attno_map; + context.map_length = map_length; + context.found_whole_row = found_whole_row; + + *found_whole_row = false; + + /* + * Must be prepared to start with a Query or a bare expression tree; if + * it's a Query, we don't want to increment sublevels_up. + */ + return query_or_expression_tree_mutator(node, + map_variable_attnos_mutator, + (void *) &context, + 0); + } + + + /* * ResolveNew - replace Vars with corresponding items from a targetlist * * Vars matching target_varno and sublevels_up are replaced by the diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index 9ceb086f681420be255ae8790988fb074b6495ff..15d4713cec16ff9e2c4366deedf6aca807516cf5 100644 *** a/src/include/commands/tablecmds.h --- b/src/include/commands/tablecmds.h *************** extern void find_composite_type_dependen *** 61,70 **** extern void check_of_type(HeapTuple typetuple); - extern AttrNumber *varattnos_map(TupleDesc olddesc, TupleDesc newdesc); - extern AttrNumber *varattnos_map_schema(TupleDesc old, List *schema); - extern void change_varattnos_of_a_node(Node *node, const AttrNumber *newattno); - extern void register_on_commit_action(Oid relid, OnCommitAction action); extern void remove_on_commit_action(Oid relid); --- 61,66 ---- diff --git a/src/include/rewrite/rewriteManip.h b/src/include/rewrite/rewriteManip.h index 7226187849232caf9a6c1dc30f2334c7e2656283..6f57b37b81443ba1a5b7a450b2cb005b99831187 100644 *** a/src/include/rewrite/rewriteManip.h --- b/src/include/rewrite/rewriteManip.h *************** extern Node *replace_rte_variables(Node *** 65,70 **** --- 65,75 ---- extern Node *replace_rte_variables_mutator(Node *node, replace_rte_variables_context *context); + extern Node *map_variable_attnos(Node *node, + int target_varno, int sublevels_up, + const AttrNumber *attno_map, int map_length, + bool *found_whole_row); + extern Node *ResolveNew(Node *node, int target_varno, int sublevels_up, RangeTblEntry *target_rte, List *targetlist, int event, int update_varno,
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers