On 06/20/2018 01:46 PM, Andrew Dunstan wrote:
Attached deals with all those issues except locking.
This version adds a lock on the table owning the attribute. cheers andrew -- Andrew Dunstan https://www.2ndQuadrant.com PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c index 0c54b02..2480fa0 100644 --- a/src/backend/utils/adt/pg_upgrade_support.c +++ b/src/backend/utils/adt/pg_upgrade_support.c @@ -11,13 +11,20 @@ #include "postgres.h" +#include "access/heapam.h" +#include "access/htup_details.h" #include "catalog/binary_upgrade.h" +#include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/pg_type.h" #include "commands/extension.h" #include "miscadmin.h" #include "utils/array.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" +#include "utils/syscache.h" #define CHECK_IS_BINARY_UPGRADE \ @@ -192,3 +199,63 @@ binary_upgrade_set_record_init_privs(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + +Datum +binary_upgrade_set_missing_value(PG_FUNCTION_ARGS) +{ + Oid table_id = PG_GETARG_OID(0); + text *attname = PG_GETARG_TEXT_P(1); + text *value = PG_GETARG_TEXT_P(2); + Datum valuesAtt[Natts_pg_attribute]; + bool nullsAtt[Natts_pg_attribute]; + bool replacesAtt[Natts_pg_attribute]; + Datum missingval; + Form_pg_attribute attStruct; + Relation attrrel, tablerel; + HeapTuple atttup, newtup; + char *cattname = text_to_cstring(attname); + char *cvalue = text_to_cstring(value); + + CHECK_IS_BINARY_UPGRADE; + + /* lock the table the attribute belongs to */ + tablerel = heap_open(table_id, AccessExclusiveLock); + + /* Lock the attribute row and get the data */ + attrrel = heap_open(AttributeRelationId, RowExclusiveLock); + atttup = SearchSysCacheAttName(table_id, cattname); + if (!HeapTupleIsValid(atttup)) + elog(ERROR, "cache lookup failed for attribute %s of relation %u", + cattname, table_id); + attStruct = (Form_pg_attribute) GETSTRUCT(atttup); + + /* get an array value from the value string */ + + /* find the io info for an arbitrary array type to get array_in Oid */ + missingval = OidFunctionCall3( + F_ARRAY_IN, + CStringGetDatum(cvalue), + ObjectIdGetDatum(attStruct->atttypid), + Int32GetDatum(attStruct->atttypmod)); + + /* update the tuple - set atthasmissing and attmissingval */ + MemSet(valuesAtt, 0, sizeof(valuesAtt)); + MemSet(nullsAtt, false, sizeof(nullsAtt)); + MemSet(replacesAtt, false, sizeof(replacesAtt)); + + valuesAtt[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(true); + replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true; + valuesAtt[Anum_pg_attribute_attmissingval - 1] = missingval; + replacesAtt[Anum_pg_attribute_attmissingval - 1] = true; + + newtup = heap_modify_tuple(atttup, RelationGetDescr(attrrel), + valuesAtt, nullsAtt, replacesAtt); + CatalogTupleUpdate(attrrel, &newtup->t_self, newtup); + + /* clean up */ + ReleaseSysCache(atttup); + heap_close(attrrel, RowExclusiveLock); + heap_close(tablerel, AccessExclusiveLock); + + PG_RETURN_VOID(); +} diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index ea2f022..22be3c2 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -8095,6 +8095,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables) int i_typstorage; int i_attnotnull; int i_atthasdef; + int i_atthasmissing; int i_attidentity; int i_attisdropped; int i_attlen; @@ -8103,6 +8104,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables) int i_attoptions; int i_attcollation; int i_attfdwoptions; + int i_attmissingval; PGresult *res; int ntups; bool hasdefaults; @@ -8132,7 +8134,33 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables) resetPQExpBuffer(q); - if (fout->remoteVersion >= 100000) + if (fout->remoteVersion >= 110000) + { + /* atthasmissing and attmissingval are new in 11 */ + appendPQExpBuffer(q, "SELECT a.attnum, a.attname, a.atttypmod, " + "a.attstattarget, a.attstorage, t.typstorage, " + "a.attnotnull, a.atthasdef, a.attisdropped, " + "a.attlen, a.attalign, a.attislocal, " + "pg_catalog.format_type(t.oid,a.atttypmod) AS atttypname, " + "array_to_string(a.attoptions, ', ') AS attoptions, " + "CASE WHEN a.attcollation <> t.typcollation " + "THEN a.attcollation ELSE 0 END AS attcollation, " + "a.atthasmissing, a.attidentity, " + "pg_catalog.array_to_string(ARRAY(" + "SELECT pg_catalog.quote_ident(option_name) || " + "' ' || pg_catalog.quote_literal(option_value) " + "FROM pg_catalog.pg_options_to_table(attfdwoptions) " + "ORDER BY option_name" + "), E',\n ') AS attfdwoptions ," + "a.attmissingval " + "FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t " + "ON a.atttypid = t.oid " + "WHERE a.attrelid = '%u'::pg_catalog.oid " + "AND a.attnum > 0::pg_catalog.int2 " + "ORDER BY a.attnum", + tbinfo->dobj.catId.oid); + } + else if (fout->remoteVersion >= 100000) { /* * attidentity is new in version 10. @@ -8258,6 +8286,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables) i_typstorage = PQfnumber(res, "typstorage"); i_attnotnull = PQfnumber(res, "attnotnull"); i_atthasdef = PQfnumber(res, "atthasdef"); + i_atthasmissing = PQfnumber(res, "atthasmissing"); i_attidentity = PQfnumber(res, "attidentity"); i_attisdropped = PQfnumber(res, "attisdropped"); i_attlen = PQfnumber(res, "attlen"); @@ -8266,6 +8295,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables) i_attoptions = PQfnumber(res, "attoptions"); i_attcollation = PQfnumber(res, "attcollation"); i_attfdwoptions = PQfnumber(res, "attfdwoptions"); + i_attmissingval = PQfnumber(res, "attmissingval"); tbinfo->numatts = ntups; tbinfo->attnames = (char **) pg_malloc(ntups * sizeof(char *)); @@ -8274,6 +8304,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables) tbinfo->attstattarget = (int *) pg_malloc(ntups * sizeof(int)); tbinfo->attstorage = (char *) pg_malloc(ntups * sizeof(char)); tbinfo->typstorage = (char *) pg_malloc(ntups * sizeof(char)); + tbinfo->atthasmissing = (bool *) pg_malloc(ntups * sizeof(bool)); tbinfo->attidentity = (char *) pg_malloc(ntups * sizeof(char)); tbinfo->attisdropped = (bool *) pg_malloc(ntups * sizeof(bool)); tbinfo->attlen = (int *) pg_malloc(ntups * sizeof(int)); @@ -8282,6 +8313,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables) tbinfo->attoptions = (char **) pg_malloc(ntups * sizeof(char *)); tbinfo->attcollation = (Oid *) pg_malloc(ntups * sizeof(Oid)); tbinfo->attfdwoptions = (char **) pg_malloc(ntups * sizeof(char *)); + tbinfo->attmissingval = (char **) pg_malloc(ntups * sizeof(char *)); tbinfo->notnull = (bool *) pg_malloc(ntups * sizeof(bool)); tbinfo->inhNotNull = (bool *) pg_malloc(ntups * sizeof(bool)); tbinfo->attrdefs = (AttrDefInfo **) pg_malloc(ntups * sizeof(AttrDefInfo *)); @@ -8299,6 +8331,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables) tbinfo->attstattarget[j] = atoi(PQgetvalue(res, j, i_attstattarget)); tbinfo->attstorage[j] = *(PQgetvalue(res, j, i_attstorage)); tbinfo->typstorage[j] = *(PQgetvalue(res, j, i_typstorage)); + tbinfo->atthasmissing[j] = (i_atthasmissing >= 0 ? (PQgetvalue(res, j, i_atthasmissing)[0] == 't') : false); tbinfo->attidentity[j] = (i_attidentity >= 0 ? *(PQgetvalue(res, j, i_attidentity)) : '\0'); tbinfo->needs_override = tbinfo->needs_override || (tbinfo->attidentity[j] == ATTRIBUTE_IDENTITY_ALWAYS); tbinfo->attisdropped[j] = (PQgetvalue(res, j, i_attisdropped)[0] == 't'); @@ -8309,6 +8342,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables) tbinfo->attoptions[j] = pg_strdup(PQgetvalue(res, j, i_attoptions)); tbinfo->attcollation[j] = atooid(PQgetvalue(res, j, i_attcollation)); tbinfo->attfdwoptions[j] = pg_strdup(PQgetvalue(res, j, i_attfdwoptions)); + tbinfo->attmissingval[j] = (tbinfo->atthasmissing[j] ? pg_strdup(PQgetvalue(res, j, i_attmissingval)) : NULL ); tbinfo->attrdefs[j] = NULL; /* fix below */ if (PQgetvalue(res, j, i_atthasdef)[0] == 't') hasdefaults = true; @@ -15659,6 +15693,29 @@ dumpTableSchema(Archive *fout, TableInfo *tbinfo) appendPQExpBufferStr(q, ";\n"); /* + * in binary upgrade mode, update the catalog with any missing values + * that might be present. + */ + if (dopt->binary_upgrade) + { + for (j = 0; j < tbinfo->numatts; j++) + { + if (tbinfo->atthasmissing[j]) + { + appendPQExpBufferStr(q, "\n-- set missing value.\n"); + appendPQExpBufferStr(q, + "SELECT pg_catalog.binary_upgrade_set_missing_value("); + appendStringLiteralAH(q,qualrelname, fout); + appendPQExpBufferStr(q, "::pg_catalog.regclass,"); + appendStringLiteralAH(q, tbinfo->attnames[j], fout); + appendPQExpBufferStr(q,","); + appendStringLiteralAH(q, tbinfo->attmissingval[j], fout); + appendPQExpBufferStr(q,");\n\n"); + } + } + } + + /* * To create binary-compatible heap files, we have to ensure the same * physical column order, including dropped columns, as in the * original. Therefore, we create dropped columns above and drop them diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index e96c662..c1681b3 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -309,6 +309,7 @@ typedef struct _tableInfo char *attstorage; /* attribute storage scheme */ char *typstorage; /* type storage scheme */ bool *attisdropped; /* true if attr is dropped; don't dump it */ + bool *atthasmissing; /* true if the attribute has a missing value */ char *attidentity; int *attlen; /* attribute length, used by binary_upgrade */ char *attalign; /* attribute align, used by binary_upgrade */ @@ -316,6 +317,7 @@ typedef struct _tableInfo char **attoptions; /* per-attribute options */ Oid *attcollation; /* per-attribute collation selection */ char **attfdwoptions; /* per-attribute fdw options */ + char **attmissingval; /* per attribute missing value */ bool *notnull; /* NOT NULL constraints on attributes */ bool *inhNotNull; /* true if NOT NULL is inherited */ struct _attrDefInfo **attrdefs; /* DEFAULT expressions */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 66c6c22..9834e38 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -10037,6 +10037,10 @@ proname => 'binary_upgrade_set_record_init_privs', provolatile => 'v', proparallel => 'r', prorettype => 'void', proargtypes => 'bool', prosrc => 'binary_upgrade_set_record_init_privs' }, +{ oid => '4101', descr => 'for use by pg_upgrade', + proname => 'binary_upgrade_set_missing_value', provolatile => 'v', + proparallel => 'r', prorettype => 'void', proargtypes => 'oid text text', + prosrc => 'binary_upgrade_set_missing_value' }, # replication/origin.h { oid => '6003', descr => 'create a replication origin',