Hi
While working on a big table recently, I noticed that ALTER TABLE does not
check for column existence in operations like SET NOT NULL before starting
working on the table, for instance adding a primary key.
It is thus possible, if a typo has been made, to generate a long lock and a
lot of WAL that will serve no purpose since the whole transaction will be
discarded.
For example :
toto=# alter table test add primary key(i), alter column typo set not null;
ERROR: column "typo" of relation "test" does not exist
Time: 10.794 s
The attached patch fixes this behaviour by adding a small check in the first
pass of alter table to make sure that a column referenced by an alter command
exists first. It also checks if the column is added by another alter sub-
command. It does not handle every scenario (dropping a column and then
altering it for instance), these are left to the exec code to exclude.
The patch has been checked with make check, and I see no documentation change
to do since this does not alter any existing documented behaviour.
Regards
Pierre
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 74e020bffc..48efb6ef5b 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -352,6 +352,7 @@ static void ATRewriteTables(AlterTableStmt *parsetree,
List **wqueue, LOCKMODE lockmode);
static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode);
static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
+static void ATColumnExists(Relation rel, const char *colName, AlteredTableInfo *tab);
static void ATSimplePermissions(Relation rel, int allowed_targets);
static void ATWrongRelkindError(Relation rel, int allowed_targets);
static void ATSimpleRecursion(List **wqueue, Relation rel,
@@ -3592,6 +3593,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
*/
ATSimplePermissions(rel, ATT_TABLE | ATT_VIEW | ATT_FOREIGN_TABLE);
ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
+ ATColumnExists(rel, cmd->name, tab);
/* No command-specific prep needed */
pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
break;
@@ -3611,6 +3613,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
ATPrepDropNotNull(rel, recurse, recursing);
ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
+ ATColumnExists(rel, cmd->name, tab);
/* No command-specific prep needed */
pass = AT_PASS_DROP;
break;
@@ -3618,6 +3621,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
ATPrepSetNotNull(rel, recurse, recursing);
ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
+ ATColumnExists(rel, cmd->name, tab);
/* No command-specific prep needed */
pass = AT_PASS_ADD_CONSTR;
break;
@@ -3630,12 +3634,14 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
case AT_SetOptions: /* ALTER COLUMN SET ( options ) */
case AT_ResetOptions: /* ALTER COLUMN RESET ( options ) */
ATSimplePermissions(rel, ATT_TABLE | ATT_MATVIEW | ATT_INDEX | ATT_FOREIGN_TABLE);
+ ATColumnExists(rel, cmd->name, tab);
/* This command never recurses */
pass = AT_PASS_MISC;
break;
case AT_SetStorage: /* ALTER COLUMN SET STORAGE */
ATSimplePermissions(rel, ATT_TABLE | ATT_MATVIEW | ATT_FOREIGN_TABLE);
ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
+ ATColumnExists(rel, cmd->name, tab);
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
@@ -3643,6 +3649,10 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
ATSimplePermissions(rel,
ATT_TABLE | ATT_COMPOSITE_TYPE | ATT_FOREIGN_TABLE);
ATPrepDropColumn(wqueue, rel, recurse, recursing, cmd, lockmode);
+ if (!cmd->missing_ok)
+ {
+ ATColumnExists(rel, cmd->name, tab);
+ }
/* Recursion occurs during execution phase */
pass = AT_PASS_DROP;
break;
@@ -4850,6 +4860,54 @@ ATSimplePermissions(Relation rel, int allowed_targets)
RelationGetRelationName(rel))));
}
+/*
+ * ATColumnExists
+ *
+ * - Ensure that the targeted column exists
+ */
+static void
+ATColumnExists(Relation rel, const char *colName, AlteredTableInfo *tab)
+{
+ HeapTuple tuple;
+ List *subcmds;
+ bool column_found = false;
+ tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
+
+ if (HeapTupleIsValid(tuple))
+ {
+ column_found = true;
+ }
+ else
+ {
+ /* Check that the column is not added in a previous operation */
+ subcmds = tab->subcmds[AT_PASS_ADD_COL];
+ if (subcmds != NULL)
+ {
+ ListCell *lcmd;
+ ColumnDef *coldef;
+
+ foreach(lcmd, subcmds)
+ {
+ AlterTableCmd *cmd = castNode(AlterTableCmd, lfirst(lcmd));
+ coldef = (ColumnDef *) cmd->def;
+ if (strcmp(coldef->colname, colName) == 0)
+ {
+ column_found = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (!column_found)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_COLUMN),
+ errmsg("column \"%s\" of relation \"%s\" does not exist",
+ colName, RelationGetRelationName(rel))));
+ }
+}
+
/*
* ATWrongRelkindError
*