On 06/19/2018 01:19 PM, Tom Lane wrote:
Andres Freund <and...@anarazel.de> writes:
On 2018-06-19 12:37:52 -0400, Tom Lane wrote:
The problem here is that that function does not exist in 11beta1.
Since adding the "incoming" function is certainly going to require
initdb, we have to be able to dump from the server as it now stands,
or we'll be cutting existing beta testers adrift.
It'd probably not be too hard to write a plpgsql replacement for it,
should it come to that. Obviously it'd be nicer to not require users to
create that, but ...
After some thought, I think it's not that hard to get the support function
to accept the anyarray string form.  I was worried about issues like
whether float8 values would restore exactly, but really that's no worse
than a dump/reload today.  Basically, the support function would just need
to extract the target attribute's type and typmod from the pg_attribute
row, then call array_in().


This unfortunately crashes and burns if we use DirectFunctionCall3 to call array_in, because it uses fn_extra. There is the CallerFInfoFunctionCall stuff, but it only has 1 and 2 arg variants, and array_in takes 3. In retrospect we should probably have added a 3 arg form - quite a few input functions take 3 args. Anything else is likely to be rather uglier.

Attaching the failing patch. I'll attack this again in the morning.

cheers

andrew



--
Andrew Dunstan                https://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 0c54b02..d9474db 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -11,14 +11,18 @@
 
 #include "postgres.h"
 
+#include "access/heapam.h"
+#include "access/htup_details.h"
 #include "catalog/binary_upgrade.h"
+#include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_type.h"
 #include "commands/extension.h"
 #include "miscadmin.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
-
+#include "utils/rel.h"
+#include "utils/syscache.h"
 
 #define CHECK_IS_BINARY_UPGRADE									\
 do {															\
@@ -192,3 +196,59 @@ binary_upgrade_set_record_init_privs(PG_FUNCTION_ARGS)
 
 	PG_RETURN_VOID();
 }
+
+Datum
+binary_upgrade_set_missing_value(PG_FUNCTION_ARGS)
+{
+	Oid		table_id = PG_GETARG_OID(0);
+	text    *attname = PG_GETARG_TEXT_P(1);
+	text    *value = PG_GETARG_TEXT_P(2);
+	Datum    valuesAtt[Natts_pg_attribute];
+	bool     nullsAtt[Natts_pg_attribute];
+	bool     replacesAtt[Natts_pg_attribute];
+	Datum    missingval;
+	Form_pg_attribute attStruct;
+	Relation    attrrel;
+	HeapTuple   atttup;
+
+	CHECK_IS_BINARY_UPGRADE;
+
+	/* Lock the attribute row and get the data */
+	attrrel = heap_open(AttributeRelationId, RowExclusiveLock);
+	atttup = SearchSysCacheAttName(table_id,text_to_cstring(attname));
+	if (!HeapTupleIsValid(atttup))
+		elog(ERROR, "cache lookup failed for attribute %s of relation %u",
+			 text_to_cstring(attname), table_id);
+	attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
+
+
+	/* get an array value from the value string */
+	missingval = DirectFunctionCall3(array_in,
+									 CStringGetDatum(text_to_cstring(value)),
+									 ObjectIdGetDatum(attStruct->atttypid),
+									 Int32GetDatum(attStruct->atttypmod));
+
+	/* update the tuple - set atthasmissing and attmissingval */
+	MemSet(valuesAtt, 0, sizeof(valuesAtt));
+	MemSet(nullsAtt, false, sizeof(nullsAtt));
+	MemSet(replacesAtt, false, sizeof(replacesAtt));
+
+	valuesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
+	replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
+	valuesAtt[Anum_pg_attribute_attmissingval - 1] = missingval;
+	replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
+	nullsAtt[Anum_pg_attribute_attmissingval - 1] = false;
+
+	atttup = heap_modify_tuple(atttup, RelationGetDescr(attrrel),
+							   valuesAtt, nullsAtt, replacesAtt);
+	CatalogTupleUpdate(attrrel, &atttup->t_self, atttup);
+
+
+	/* clean up */
+	pfree(DatumGetPointer(missingval));
+
+	heap_close(attrrel, RowExclusiveLock);
+	heap_freetuple(atttup);
+
+	PG_RETURN_VOID();
+}
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index ea2f022..22be3c2 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -8095,6 +8095,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
 	int			i_typstorage;
 	int			i_attnotnull;
 	int			i_atthasdef;
+	int			i_atthasmissing;
 	int			i_attidentity;
 	int			i_attisdropped;
 	int			i_attlen;
@@ -8103,6 +8104,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
 	int			i_attoptions;
 	int			i_attcollation;
 	int			i_attfdwoptions;
+	int			i_attmissingval;
 	PGresult   *res;
 	int			ntups;
 	bool		hasdefaults;
@@ -8132,7 +8134,33 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
 
 		resetPQExpBuffer(q);
 
-		if (fout->remoteVersion >= 100000)
+		if (fout->remoteVersion >= 110000)
+		{
+			/* atthasmissing and attmissingval are new in 11 */
+			appendPQExpBuffer(q, "SELECT a.attnum, a.attname, a.atttypmod, "
+							  "a.attstattarget, a.attstorage, t.typstorage, "
+							  "a.attnotnull, a.atthasdef, a.attisdropped, "
+							  "a.attlen, a.attalign, a.attislocal, "
+							  "pg_catalog.format_type(t.oid,a.atttypmod) AS atttypname, "
+							  "array_to_string(a.attoptions, ', ') AS attoptions, "
+							  "CASE WHEN a.attcollation <> t.typcollation "
+							  "THEN a.attcollation ELSE 0 END AS attcollation, "
+							  "a.atthasmissing, a.attidentity, "
+							  "pg_catalog.array_to_string(ARRAY("
+							  "SELECT pg_catalog.quote_ident(option_name) || "
+							  "' ' || pg_catalog.quote_literal(option_value) "
+							  "FROM pg_catalog.pg_options_to_table(attfdwoptions) "
+							  "ORDER BY option_name"
+							  "), E',\n    ') AS attfdwoptions ,"
+							  "a.attmissingval "
+							  "FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
+							  "ON a.atttypid = t.oid "
+							  "WHERE a.attrelid = '%u'::pg_catalog.oid "
+							  "AND a.attnum > 0::pg_catalog.int2 "
+							  "ORDER BY a.attnum",
+							  tbinfo->dobj.catId.oid);
+		}
+		else if (fout->remoteVersion >= 100000)
 		{
 			/*
 			 * attidentity is new in version 10.
@@ -8258,6 +8286,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
 		i_typstorage = PQfnumber(res, "typstorage");
 		i_attnotnull = PQfnumber(res, "attnotnull");
 		i_atthasdef = PQfnumber(res, "atthasdef");
+		i_atthasmissing = PQfnumber(res, "atthasmissing");
 		i_attidentity = PQfnumber(res, "attidentity");
 		i_attisdropped = PQfnumber(res, "attisdropped");
 		i_attlen = PQfnumber(res, "attlen");
@@ -8266,6 +8295,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
 		i_attoptions = PQfnumber(res, "attoptions");
 		i_attcollation = PQfnumber(res, "attcollation");
 		i_attfdwoptions = PQfnumber(res, "attfdwoptions");
+		i_attmissingval = PQfnumber(res, "attmissingval");
 
 		tbinfo->numatts = ntups;
 		tbinfo->attnames = (char **) pg_malloc(ntups * sizeof(char *));
@@ -8274,6 +8304,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
 		tbinfo->attstattarget = (int *) pg_malloc(ntups * sizeof(int));
 		tbinfo->attstorage = (char *) pg_malloc(ntups * sizeof(char));
 		tbinfo->typstorage = (char *) pg_malloc(ntups * sizeof(char));
+		tbinfo->atthasmissing = (bool *) pg_malloc(ntups * sizeof(bool));
 		tbinfo->attidentity = (char *) pg_malloc(ntups * sizeof(char));
 		tbinfo->attisdropped = (bool *) pg_malloc(ntups * sizeof(bool));
 		tbinfo->attlen = (int *) pg_malloc(ntups * sizeof(int));
@@ -8282,6 +8313,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
 		tbinfo->attoptions = (char **) pg_malloc(ntups * sizeof(char *));
 		tbinfo->attcollation = (Oid *) pg_malloc(ntups * sizeof(Oid));
 		tbinfo->attfdwoptions = (char **) pg_malloc(ntups * sizeof(char *));
+		tbinfo->attmissingval = (char **) pg_malloc(ntups * sizeof(char *));
 		tbinfo->notnull = (bool *) pg_malloc(ntups * sizeof(bool));
 		tbinfo->inhNotNull = (bool *) pg_malloc(ntups * sizeof(bool));
 		tbinfo->attrdefs = (AttrDefInfo **) pg_malloc(ntups * sizeof(AttrDefInfo *));
@@ -8299,6 +8331,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
 			tbinfo->attstattarget[j] = atoi(PQgetvalue(res, j, i_attstattarget));
 			tbinfo->attstorage[j] = *(PQgetvalue(res, j, i_attstorage));
 			tbinfo->typstorage[j] = *(PQgetvalue(res, j, i_typstorage));
+			tbinfo->atthasmissing[j] = (i_atthasmissing >= 0 ? (PQgetvalue(res, j, i_atthasmissing)[0] == 't') : false);
 			tbinfo->attidentity[j] = (i_attidentity >= 0 ? *(PQgetvalue(res, j, i_attidentity)) : '\0');
 			tbinfo->needs_override = tbinfo->needs_override || (tbinfo->attidentity[j] == ATTRIBUTE_IDENTITY_ALWAYS);
 			tbinfo->attisdropped[j] = (PQgetvalue(res, j, i_attisdropped)[0] == 't');
@@ -8309,6 +8342,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
 			tbinfo->attoptions[j] = pg_strdup(PQgetvalue(res, j, i_attoptions));
 			tbinfo->attcollation[j] = atooid(PQgetvalue(res, j, i_attcollation));
 			tbinfo->attfdwoptions[j] = pg_strdup(PQgetvalue(res, j, i_attfdwoptions));
+			tbinfo->attmissingval[j] = (tbinfo->atthasmissing[j] ? pg_strdup(PQgetvalue(res, j, i_attmissingval)) : NULL );
 			tbinfo->attrdefs[j] = NULL; /* fix below */
 			if (PQgetvalue(res, j, i_atthasdef)[0] == 't')
 				hasdefaults = true;
@@ -15659,6 +15693,29 @@ dumpTableSchema(Archive *fout, TableInfo *tbinfo)
 			appendPQExpBufferStr(q, ";\n");
 
 		/*
+		 * in binary upgrade mode, update the catalog with any missing values
+		 * that might be present.
+		 */
+		if (dopt->binary_upgrade)
+		{
+			for (j = 0; j < tbinfo->numatts; j++)
+			{
+				if (tbinfo->atthasmissing[j])
+				{
+					appendPQExpBufferStr(q, "\n-- set missing value.\n");
+					appendPQExpBufferStr(q,
+										 "SELECT pg_catalog.binary_upgrade_set_missing_value(");
+					appendStringLiteralAH(q,qualrelname, fout);
+					appendPQExpBufferStr(q, "::pg_catalog.regclass,");
+					appendStringLiteralAH(q, tbinfo->attnames[j], fout);
+					appendPQExpBufferStr(q,",");
+					appendStringLiteralAH(q, tbinfo->attmissingval[j], fout);
+					appendPQExpBufferStr(q,");\n\n");
+				}
+			}
+		}
+
+	/*
 		 * To create binary-compatible heap files, we have to ensure the same
 		 * physical column order, including dropped columns, as in the
 		 * original.  Therefore, we create dropped columns above and drop them
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index e96c662..c1681b3 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -309,6 +309,7 @@ typedef struct _tableInfo
 	char	   *attstorage;		/* attribute storage scheme */
 	char	   *typstorage;		/* type storage scheme */
 	bool	   *attisdropped;	/* true if attr is dropped; don't dump it */
+	bool       *atthasmissing;  /* true if the attribute has a missing value */
 	char	   *attidentity;
 	int		   *attlen;			/* attribute length, used by binary_upgrade */
 	char	   *attalign;		/* attribute align, used by binary_upgrade */
@@ -316,6 +317,7 @@ typedef struct _tableInfo
 	char	  **attoptions;		/* per-attribute options */
 	Oid		   *attcollation;	/* per-attribute collation selection */
 	char	  **attfdwoptions;	/* per-attribute fdw options */
+	char      **attmissingval;  /* per attribute missing value */
 	bool	   *notnull;		/* NOT NULL constraints on attributes */
 	bool	   *inhNotNull;		/* true if NOT NULL is inherited */
 	struct _attrDefInfo **attrdefs; /* DEFAULT expressions */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 66c6c22..9834e38 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10037,6 +10037,10 @@
   proname => 'binary_upgrade_set_record_init_privs', provolatile => 'v',
   proparallel => 'r', prorettype => 'void', proargtypes => 'bool',
   prosrc => 'binary_upgrade_set_record_init_privs' },
+{ oid => '4101', descr => 'for use by pg_upgrade',
+  proname => 'binary_upgrade_set_missing_value', provolatile => 'v',
+  proparallel => 'r', prorettype => 'void', proargtypes => 'oid text text',
+  prosrc => 'binary_upgrade_set_missing_value' },
 
 # replication/origin.h
 { oid => '6003', descr => 'create a replication origin',

Reply via email to