On Wed, 19 Jul 2023 at 12:47, Michael Paquier <mich...@paquier.xyz> wrote:
>
> On Wed, May 10, 2023 at 05:59:24PM +1000, Peter Smith wrote:
> > 1. ALTER SUBSCRIPTION name ADD TABLE (relid = XYZ, state = 'x' [, lsn = 
> > 'X/Y'])
> >
> > I was a bit confused by this relation 'state' mentioned in multiple
> > places. IIUC the pg_upgrade logic is going to reject anything with a
> > non-READY (not 'r') state anyhow, so what is the point of having all
> > the extra grammar/parse_subscription_options etc to handle setting the
> > state when only possible value must be 'r'?
>
> We are just talking about the handling of an extra DefElem in an
> extensible grammar pattern, so adding the state field does not
> represent much maintenance work.  I'm OK with the addition of this
> field in the data set dumped, FWIW, on the ground that it can be
> useful for debugging purposes when looking at --binary-upgrade dumps,
> and because we aim at copying catalog contents from one cluster to
> another.
>
> Anyway, I am not convinced that we have any need for a parse-able
> grammar at all, because anything that's presented on this thread is
> aimed at being used only for the internal purpose of an upgrade in a
> --binary-upgrade dump with a direct catalog copy in mind, and having a
> grammar would encourage abuses of it outside of this context.  I think
> that we should aim for simpler than what's proposed by the patch,
> actually, with either a single SQL function à-la-binary_upgrade() that
> adds the contents of a relation.  Or we can be crazier and just create
> INSERT queries for pg_subscription_rel to provide an exact copy of the
> catalog contents.  A SQL function would be more consistent with other
> objects types that use similar tricks, see
> binary_upgrade_create_empty_extension() that does something similar
> for some pg_extension records.  So, this function would require in
> input 4 arguments:
> - The subscription name or OID.
> - The relation OID.
> - Its LSN.
> - Its sync state.

Added a SQL function to handle the insertion and removed the "ALTER
SUBSCRIPTION ... ADD TABLE" command that was added.
Attached patch has the changes for the same.

Regards,
Vignesh
From ce0e041bf120f3615ec7a02187ce27e9922688d2 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Wed, 6 Sep 2023 10:07:42 +0530
Subject: [PATCH v6] Optionally preserve the full subscription's state during
 pg_upgrade

Previously, only the subscription metadata information was preserved.  Without
the list of relations and their state it's impossible to re-enable the
subscriptions without missing some records as the list of relations can only be
refreshed after enabling the subscription (and therefore starting the apply
worker).  Even if we added a way to refresh the subscription while enabling a
publication, we still wouldn't know which relations are new on the publication
side, and therefore should be fully synced, and which shouldn't.

Similarly, the subscription's replication origin are needed to ensure
that we don't replicate anything twice.

To fix this problem, this patch teaches pg_dump in binary upgrade mode to
restore the content of pg_subscription_rel from the old cluster by using
binary_upgrade_create_sub_rel_state SQL function, and also provides an
additional LSN parameter for CREATE SUBSCRIPTION to restore the underlying
replication origin remote LSN.  The new binary_upgrade_create_sub_rel_state
SQL function and the new LSN parameter are not exposed to users and only
accepted in binary upgrade mode.

The new SQL binary_upgrade_create_sub_rel_state function has the following
syntax:
SELECT binary_upgrade_create_sub_rel_state(subname text, relid oid, state char [,sublsn pg_lsn])

In the above, subname is the subscription name, relid is the relation
identifier, the state is the state of the relation, sublsn is optional, and
defaults to NULL/InvalidXLogRecPtr if not provided. pg_dump will retrieve these
values(subname, relid, state and sublsn) from the old cluster.

This mode is optional and not enabled by default.  A new
--preserve-subscription-state option is added to pg_upgrade to use it.  For
now, pg_upgrade will check that all the subscription have a valid replication
origin remote_lsn, and that all underlying relations are in 'r' (ready) state,
and will error out if that's not the case, logging the reason for the failure.

Author: Julien Rouhaud
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20230217075433.u5mjly4d5cr4hcfe@jrouhaud
---
 doc/src/sgml/ref/pgupgrade.sgml          |  23 +++
 src/backend/catalog/pg_subscription.c    |  64 +++++++
 src/backend/commands/subscriptioncmds.c  |  10 +-
 src/bin/pg_dump/common.c                 |  22 +++
 src/bin/pg_dump/pg_backup.h              |   2 +
 src/bin/pg_dump/pg_dump.c                | 128 ++++++++++++-
 src/bin/pg_dump/pg_dump.h                |  15 ++
 src/bin/pg_upgrade/check.c               |  79 ++++++++
 src/bin/pg_upgrade/dump.c                |   3 +-
 src/bin/pg_upgrade/meson.build           |   1 +
 src/bin/pg_upgrade/option.c              |   6 +
 src/bin/pg_upgrade/pg_upgrade.h          |   1 +
 src/bin/pg_upgrade/t/003_subscription.pl | 220 +++++++++++++++++++++++
 src/include/catalog/pg_proc.dat          |   7 +
 src/tools/pgindent/typedefs.list         |   1 +
 15 files changed, 578 insertions(+), 4 deletions(-)
 create mode 100644 src/bin/pg_upgrade/t/003_subscription.pl

diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 7816b4c685..6af790c986 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -240,6 +240,29 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--preserve-subscription-state</option></term>
+      <listitem>
+       <para>
+        Fully preserve the logical subscription state if any.  That includes
+        the underlying replication origin with their remote LSN and the list of
+        relations in each subscription so that replication can be simply
+        resumed if the subscriptions are reactivated.
+       </para>
+       <para>
+        If this option isn't used, it is up to the user to reactivate the
+        subscriptions in a suitable way; see the subscription part in <xref
+        linkend="pg-dump-notes"/> for more information.
+       </para>
+       <para>
+        If this option is used and any of the subscription on the old cluster
+        has an unknown <varname>remote_lsn</varname> (0/0), or has any relation
+        in a state different from <literal>r</literal> (ready), the
+        <application>pg_upgrade</application> run will error.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-?</option></term>
       <term><option>--help</option></term>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d07f88ce28..fedf838d04 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -269,6 +269,70 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
 	table_close(rel, NoLock);
 }
 
+/*
+ * binary_upgrade_create_sub_rel_state
+ *
+ * Add the relation with the specified relation state to pg_subscription_rel
+ * table.
+ */
+Datum
+binary_upgrade_create_sub_rel_state(PG_FUNCTION_ARGS)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	Oid			subid;
+	Form_pg_subscription form;
+	char		*subname;
+	Oid			relid;
+	char		state;
+	XLogRecPtr  sublsn;
+
+	if (!IsBinaryUpgrade)
+		ereport(ERROR,
+				errcode(ERRCODE_SYNTAX_ERROR),
+				errmsg("binary_upgrade_create_sub_rel_state can only be called when server is in binary upgrade mode"));
+
+	/* We must check these things before dereferencing the arguments */
+	if (PG_ARGISNULL(0) ||
+		PG_ARGISNULL(1) ||
+		PG_ARGISNULL(2))
+		elog(ERROR, "null argument to binary_upgrade_create_sub_rel_state is not allowed");
+
+	subname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	relid = PG_GETARG_OID(1);
+	state = PG_GETARG_CHAR(2);
+
+	if (PG_ARGISNULL(3))
+		sublsn = InvalidXLogRecPtr;
+	else
+		sublsn = PG_GETARG_LSN(3);
+
+	if (!OidIsValid(relid))
+		ereport(ERROR,
+				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("invalid relation identifier used: %u", relid));
+
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+
+	/* Fetch the existing tuple. */
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
+							  CStringGetDatum(subname));
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("subscription \"%s\" does not exist", subname));
+
+	form = (Form_pg_subscription) GETSTRUCT(tup);
+	subid = form->oid;
+
+	AddSubscriptionRelState(subid, relid, state, sublsn);
+
+	heap_freetuple(tup);
+	table_close(rel, RowExclusiveLock);
+
+	PG_RETURN_VOID();
+}
+
 /*
  * Update the state of a subscription table.
  */
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 34d881fd94..aa89581010 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -580,6 +580,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	bits32		supported_opts;
 	SubOpts		opts = {0};
 	AclResult	aclresult;
+	RepOriginId originid;
 
 	/*
 	 * Parse and check options.
@@ -592,6 +593,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
 					  SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
+	if (IsBinaryUpgrade)
+		supported_opts |= SUBOPT_LSN;
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -720,7 +723,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
 
 	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
-	replorigin_create(originname);
+	originid = replorigin_create(originname);
+
+	if (IsBinaryUpgrade && IsSet(opts.specified_opts, SUBOPT_LSN))
+		replorigin_advance(originid, opts.lsn, InvalidXLogRecPtr,
+						   false /* backward */ ,
+						   false /* WAL log */ );
 
 	/*
 	 * Connect to remote side to execute requested commands and fetch table
diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c
index 8b0c1e7b53..764a39fcb9 100644
--- a/src/bin/pg_dump/common.c
+++ b/src/bin/pg_dump/common.c
@@ -24,6 +24,7 @@
 #include "catalog/pg_operator_d.h"
 #include "catalog/pg_proc_d.h"
 #include "catalog/pg_publication_d.h"
+#include "catalog/pg_subscription_d.h"
 #include "catalog/pg_type_d.h"
 #include "common/hashfn.h"
 #include "fe_utils/string_utils.h"
@@ -265,6 +266,9 @@ getSchemaData(Archive *fout, int *numTablesPtr)
 	pg_log_info("reading subscriptions");
 	getSubscriptions(fout);
 
+	pg_log_info("reading subscription membership of tables");
+	getSubscriptionTables(fout);
+
 	free(inhinfo);				/* not needed any longer */
 
 	*numTablesPtr = numTables;
@@ -978,6 +982,24 @@ findPublicationByOid(Oid oid)
 	return (PublicationInfo *) dobj;
 }
 
+/*
+ * findSubscriptionByOid
+ *	  finds the DumpableObject for the subscription with the given oid
+ *	  returns NULL if not found
+ */
+SubscriptionInfo *
+findSubscriptionByOid(Oid oid)
+{
+	CatalogId	catId;
+	DumpableObject *dobj;
+
+	catId.tableoid = SubscriptionRelationId;
+	catId.oid = oid;
+	dobj = findObjectByCatalogId(catId);
+	Assert(dobj == NULL || dobj->objType == DO_SUBSCRIPTION);
+	return (SubscriptionInfo *) dobj;
+}
+
 
 /*
  * recordExtensionMembership
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index aba780ef4b..8c82657e76 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -200,6 +200,8 @@ typedef struct _dumpOptions
 
 	int			sequence_data;	/* dump sequence data even in schema-only mode */
 	int			do_nothing;
+
+	int		preserve_subscriptions;
 } DumpOptions;
 
 /*
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index cebd2400fd..181a070a3e 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -431,6 +431,7 @@ main(int argc, char **argv)
 		{"table-and-children", required_argument, NULL, 12},
 		{"exclude-table-and-children", required_argument, NULL, 13},
 		{"exclude-table-data-and-children", required_argument, NULL, 14},
+		{"preserve-subscription-state", no_argument, &dopt.preserve_subscriptions, 1},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -714,6 +715,10 @@ main(int argc, char **argv)
 	if (dopt.do_nothing && dopt.dump_inserts == 0)
 		pg_fatal("option --on-conflict-do-nothing requires option --inserts, --rows-per-insert, or --column-inserts");
 
+	/* --preserve-subscription-state requires --binary-upgrade */
+	if (dopt.preserve_subscriptions && !dopt.binary_upgrade)
+		pg_fatal("option --preserve-subscription-state requires option --binary-upgrade");
+
 	/* Identify archive format to emit */
 	archiveFormat = parseArchiveFormat(format, &archiveMode);
 
@@ -4568,6 +4573,86 @@ is_superuser(Archive *fout)
 	return false;
 }
 
+/*
+ * getSubscriptionTables
+ *	  get information about the given subscription's relations
+ */
+void
+getSubscriptionTables(Archive *fout)
+{
+	SubscriptionInfo *subinfo;
+	SubRelInfo *rels = NULL;
+	PQExpBuffer query;
+	PGresult   *res;
+	int			i_srsubid;
+	int			i_srrelid;
+	int			i_srsubstate;
+	int			i_srsublsn;
+	int			i_nrels;
+	int			i,
+				cur_rel = 0,
+				ntups,
+				last_srsubid = InvalidOid;
+
+	if (!fout->dopt->binary_upgrade || !fout->dopt->preserve_subscriptions ||
+		fout->remoteVersion < 100000)
+		return;
+
+	query = createPQExpBuffer();
+	appendPQExpBuffer(query, "SELECT srsubid, srrelid, srsubstate, srsublsn,"
+					  " count(*) OVER (PARTITION BY srsubid) AS nrels"
+					  " FROM pg_subscription_rel"
+					  " ORDER BY srsubid");
+
+	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+	ntups = PQntuples(res);
+	if (ntups == 0)
+		goto cleanup;
+
+	/* Get subscription relation fields */
+	i_srsubid = PQfnumber(res, "srsubid");
+	i_srrelid = PQfnumber(res, "srrelid");
+	i_srsubstate = PQfnumber(res, "srsubstate");
+	i_srsublsn = PQfnumber(res, "srsublsn");
+	i_nrels = PQfnumber(res, "nrels");
+
+	for (i = 0; i < ntups; i++)
+	{
+		int			cur_srsubid = atooid(PQgetvalue(res, i, i_srsubid));
+
+		/*
+		 * If we switched to a new subscription, setup the necessary fields in
+		 * the SubscriptionInfo and reset the cur_rel counter.
+		 */
+		if (cur_srsubid != last_srsubid)
+		{
+			int			nrels;
+
+			subinfo = findSubscriptionByOid(cur_srsubid);
+
+			nrels = atooid(PQgetvalue(res, i, i_nrels));
+			rels = pg_malloc(nrels * sizeof(SubRelInfo));
+
+			subinfo->subrels = rels;
+			subinfo->nrels = nrels;
+
+			last_srsubid = cur_srsubid;
+			cur_rel = 0;
+		}
+
+		rels[cur_rel].srrelid = atooid(PQgetvalue(res, i, i_srrelid));
+		rels[cur_rel].srsubstate = PQgetvalue(res, i, i_srsubstate)[0];
+		rels[cur_rel].srsublsn = pg_strdup(PQgetvalue(res, i, i_srsublsn));
+
+		cur_rel++;
+	}
+
+cleanup:
+	PQclear(res);
+	destroyPQExpBuffer(query);
+}
+
 /*
  * getSubscriptions
  *	  get information about subscriptions
@@ -4593,6 +4678,7 @@ getSubscriptions(Archive *fout)
 	int			i_subpublications;
 	int			i_subbinary;
 	int			i_subpasswordrequired;
+	int			i_suboriginremotelsn;
 	int			i,
 				ntups;
 
@@ -4647,15 +4733,19 @@ getSubscriptions(Archive *fout)
 	if (fout->remoteVersion >= 160000)
 		appendPQExpBufferStr(query,
 							 " s.suborigin,\n"
-							 " s.subpasswordrequired\n");
+							 " s.subpasswordrequired,\n");
 	else
 		appendPQExpBuffer(query,
 						  " '%s' AS suborigin,\n"
-						  " 't' AS subpasswordrequired\n",
+						  " 't' AS subpasswordrequired,\n",
 						  LOGICALREP_ORIGIN_ANY);
 
+	appendPQExpBufferStr(query, "o.remote_lsn\n");
+
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
+						 "LEFT JOIN pg_replication_origin_status o \n"
+						 "    ON o.external_id = 'pg_' || s.oid::text \n"
 						 "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
 						 "                   WHERE datname = current_database())");
 
@@ -4681,6 +4771,7 @@ getSubscriptions(Archive *fout)
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_suborigin = PQfnumber(res, "suborigin");
 	i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
+	i_suboriginremotelsn = PQfnumber(res, "remote_lsn");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4713,6 +4804,18 @@ getSubscriptions(Archive *fout)
 		subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
 		subinfo[i].subpasswordrequired =
 			pg_strdup(PQgetvalue(res, i, i_subpasswordrequired));
+		if (PQgetisnull(res, i, i_suboriginremotelsn))
+			subinfo[i].suboriginremotelsn = NULL;
+		else
+			subinfo[i].suboriginremotelsn =
+				pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn));
+
+		/*
+		 * For now assume there's no relation associated with the
+		 * subscription. Later code might update this field and allocate
+		 * subrels as needed.
+		 */
+		subinfo[i].nrels = 0;
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4797,9 +4900,29 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subpasswordrequired, "t") != 0)
 		appendPQExpBuffer(query, ", password_required = false");
 
+	if (dopt->binary_upgrade && dopt->preserve_subscriptions &&
+		subinfo->suboriginremotelsn)
+		appendPQExpBuffer(query, ", lsn = '%s'", subinfo->suboriginremotelsn);
+
 	appendPQExpBufferStr(query, ");\n");
 
 	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+	{
+		for (i = 0; i < subinfo->nrels; i++)
+		{
+			appendPQExpBuffer(query,
+							  "SELECT binary_upgrade_create_sub_rel_state('%s', %u, '%c'",
+							  subinfo->dobj.name,
+							  subinfo->subrels[i].srrelid,
+							  subinfo->subrels[i].srsubstate);
+
+			if (subinfo->subrels[i].srsublsn[0] != '\0')
+				appendPQExpBuffer(query, ", '%s'",
+								  subinfo->subrels[i].srsublsn);
+
+			appendPQExpBufferStr(query, ");\n");
+		}
+
 		ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
 					 ARCHIVE_OPTS(.tag = subinfo->dobj.name,
 								  .owner = subinfo->rolname,
@@ -4807,6 +4930,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 								  .section = SECTION_POST_DATA,
 								  .createStmt = query->data,
 								  .dropStmt = delq->data));
+	}
 
 	if (subinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
 		dumpComment(fout, "SUBSCRIPTION", qsubname,
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 9036b13f6a..6718397dfa 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -653,6 +653,16 @@ typedef struct _PublicationSchemaInfo
 	NamespaceInfo *pubschema;
 } PublicationSchemaInfo;
 
+/*
+ * The SubRelInfo struct is used to represent a subscription relation.
+ */
+typedef struct _SubRelInfo
+{
+	Oid			srrelid;
+	char		srsubstate;
+	char	   *srsublsn;
+} SubRelInfo;
+
 /*
  * The SubscriptionInfo struct is used to represent subscription.
  */
@@ -670,6 +680,9 @@ typedef struct _SubscriptionInfo
 	char	   *subsynccommit;
 	char	   *subpublications;
 	char	   *subpasswordrequired;
+	char	   *suboriginremotelsn;
+	int			nrels;
+	SubRelInfo *subrels;
 } SubscriptionInfo;
 
 /*
@@ -696,6 +709,7 @@ extern CollInfo *findCollationByOid(Oid oid);
 extern NamespaceInfo *findNamespaceByOid(Oid oid);
 extern ExtensionInfo *findExtensionByOid(Oid oid);
 extern PublicationInfo *findPublicationByOid(Oid oid);
+extern SubscriptionInfo *findSubscriptionByOid(Oid oid);
 
 extern void recordExtensionMembership(CatalogId catId, ExtensionInfo *ext);
 extern ExtensionInfo *findOwningExtension(CatalogId catalogId);
@@ -755,5 +769,6 @@ extern void getPublicationNamespaces(Archive *fout);
 extern void getPublicationTables(Archive *fout, TableInfo tblinfo[],
 								 int numTables);
 extern void getSubscriptions(Archive *fout);
+extern void getSubscriptionTables(Archive *fout);
 
 #endif							/* PG_DUMP_H */
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 56e313f562..6d2d272fac 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -20,6 +20,7 @@ static void check_is_install_user(ClusterInfo *cluster);
 static void check_proper_datallowconn(ClusterInfo *cluster);
 static void check_for_prepared_transactions(ClusterInfo *cluster);
 static void check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster);
+static void check_for_subscription_state(ClusterInfo *cluster);
 static void check_for_user_defined_postfix_ops(ClusterInfo *cluster);
 static void check_for_incompatible_polymorphics(ClusterInfo *cluster);
 static void check_for_tables_with_oids(ClusterInfo *cluster);
@@ -104,6 +105,11 @@ check_and_dump_old_cluster(bool live_check)
 	check_for_reg_data_type_usage(&old_cluster);
 	check_for_isn_and_int8_passing_mismatch(&old_cluster);
 
+	/* PG 10 introduced subscriptions. */
+	if (GET_MAJOR_VERSION(old_cluster.major_version) >= 1000 &&
+		user_opts.preserve_subscriptions)
+		check_for_subscription_state(&old_cluster);
+
 	/*
 	 * PG 16 increased the size of the 'aclitem' type, which breaks the
 	 * on-disk format for existing data.
@@ -785,6 +791,79 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster)
 		check_ok();
 }
 
+/*
+ * check_for_subscription_state()
+ *
+ * Verify that all subscriptions have a valid remote_lsn and don't contain
+ * any table in srsubstate different than ready ('r').
+ */
+static void
+check_for_subscription_state(ClusterInfo *cluster)
+{
+	int			dbnum;
+	bool		is_error = false;
+
+	Assert(user_opts.preserve_subscriptions);
+
+	prep_status("Checking for subscription state");
+
+	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+	{
+		PGresult   *res;
+		DbInfo	   *active_db = &cluster->dbarr.dbs[dbnum];
+		PGconn	   *conn = connectToServer(cluster, active_db->db_name);
+
+		/* We need to check for pg_replication_origin_status only once. */
+		if (dbnum == 0)
+		{
+			int			ntup;
+
+			res = executeQueryOrDie(conn,
+									"SELECT s.subname "
+									"FROM pg_catalog.pg_subscription s "
+									"LEFT JOIN pg_catalog.pg_replication_origin_status os"
+									"  ON os.external_id = 'pg_' || s.oid "
+									"WHERE coalesce(remote_lsn, '0/0') = '0/0'");
+
+			ntup = PQntuples(res);
+			for (int i = 0; i < ntup; i++)
+			{
+				is_error = true;
+				pg_log(PG_WARNING,
+					   "\nWARNING:  subscription \"%s\" has an invalid remote_lsn",
+					   PQgetvalue(res, 0, 0));
+			}
+			PQclear(res);
+		}
+
+		res = executeQueryOrDie(conn,
+								"SELECT count(0) "
+								"FROM pg_catalog.pg_subscription_rel "
+								"WHERE srsubstate != 'r'");
+
+		if (PQntuples(res) != 1)
+			pg_fatal("could not determine the number of non-ready subscription relations");
+
+		if (strcmp(PQgetvalue(res, 0, 0), "0") != 0)
+		{
+			is_error = true;
+			pg_log(PG_WARNING,
+				   "\nWARNING: database \"%s\" has %s subscription "
+				   "relations(s) in non-ready state", active_db->db_name,
+				   PQgetvalue(res, 0, 0));
+		}
+
+		PQclear(res);
+		PQfinish(conn);
+	}
+
+	if (is_error)
+		pg_fatal("--preserve-subscription-state is incompatible with "
+				 "subscription relations in non-ready state");
+
+	check_ok();
+}
+
 /*
  * Verify that no user defined postfix operators exist.
  */
diff --git a/src/bin/pg_upgrade/dump.c b/src/bin/pg_upgrade/dump.c
index 6c8c82dca8..9284576af7 100644
--- a/src/bin/pg_upgrade/dump.c
+++ b/src/bin/pg_upgrade/dump.c
@@ -53,9 +53,10 @@ generate_old_dump(void)
 
 		parallel_exec_prog(log_file_name, NULL,
 						   "\"%s/pg_dump\" %s --schema-only --quote-all-identifiers "
-						   "--binary-upgrade --format=custom %s --file=\"%s/%s\" %s",
+						   "--binary-upgrade --format=custom %s %s --file=\"%s/%s\" %s",
 						   new_cluster.bindir, cluster_conn_opts(&old_cluster),
 						   log_opts.verbose ? "--verbose" : "",
+						   user_opts.preserve_subscriptions ? "--preserve-subscription-state" : "",
 						   log_opts.dumpdir,
 						   sql_file_name, escaped_connstr.data);
 
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 12a97f84e2..9ea25dec70 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -42,6 +42,7 @@ tests += {
     'tests': [
       't/001_basic.pl',
       't/002_pg_upgrade.pl',
+      't/003_subscription.pl',
     ],
     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
   },
diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index 640361009e..a42c6defc2 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -57,6 +57,7 @@ parseCommandLine(int argc, char *argv[])
 		{"verbose", no_argument, NULL, 'v'},
 		{"clone", no_argument, NULL, 1},
 		{"copy", no_argument, NULL, 2},
+		{"preserve-subscription-state", no_argument, NULL, 3},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -199,6 +200,10 @@ parseCommandLine(int argc, char *argv[])
 				user_opts.transfer_mode = TRANSFER_MODE_COPY;
 				break;
 
+			case 3:
+				user_opts.preserve_subscriptions = true;
+				break;
+
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
 						os_info.progname);
@@ -289,6 +294,7 @@ usage(void)
 	printf(_("  -V, --version                 display version information, then exit\n"));
 	printf(_("  --clone                       clone instead of copying files to new cluster\n"));
 	printf(_("  --copy                        copy files to new cluster (default)\n"));
+	printf(_("  --preserve-subscription-state preserve the subscription state fully\n"));
 	printf(_("  -?, --help                    show this help, then exit\n"));
 	printf(_("\n"
 			 "Before running pg_upgrade you must:\n"
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 7afa96716e..f2cae91f69 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -304,6 +304,7 @@ typedef struct
 	transferMode transfer_mode; /* copy files or link them? */
 	int			jobs;			/* number of processes/threads to use */
 	char	   *socketdir;		/* directory to use for Unix sockets */
+	bool		preserve_subscriptions; /* fully transfer subscription state */
 } UserOpts;
 
 typedef struct
diff --git a/src/bin/pg_upgrade/t/003_subscription.pl b/src/bin/pg_upgrade/t/003_subscription.pl
new file mode 100644
index 0000000000..053077150c
--- /dev/null
+++ b/src/bin/pg_upgrade/t/003_subscription.pl
@@ -0,0 +1,220 @@
+# Copyright (c) 2022-2023, PostgreSQL Global Development Group
+
+# Test for pg_upgrade of logical subscription
+use strict;
+use warnings;
+
+use Cwd qw(abs_path);
+use File::Basename qw(dirname);
+use File::Compare;
+use File::Find qw(find);
+use File::Path qw(rmtree);
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::AdjustUpgrade;
+use Test::More;
+
+# Can be changed to test the other modes.
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize publisher node
+my $publisher = PostgreSQL::Test::Cluster->new('publisher');
+$publisher->init(allows_streaming => 'logical');
+$publisher->start;
+
+# Initialize the old subscriber node
+my $old_sub = PostgreSQL::Test::Cluster->new('old_sub');
+$old_sub->init;
+$old_sub->start;
+
+# Initialize the new subscriber
+my $new_sub = PostgreSQL::Test::Cluster->new('new_sub');
+$new_sub->init;
+my $bindir = $new_sub->config_data('--bindir');
+
+sub insert_line
+{
+	my $payload = shift;
+
+	foreach("t1", "t2")
+	{
+		$publisher->safe_psql('postgres',
+			"INSERT INTO " . $_ . " (val) VALUES('$payload')");
+	}
+}
+
+# Initial setup
+foreach ("t1", "t2")
+{
+	$publisher->safe_psql('postgres',
+		"CREATE TABLE " . $_ . " (id serial, val text)");
+	$old_sub->safe_psql('postgres',
+		"CREATE TABLE " . $_ . " (id serial, val text)");
+}
+insert_line('before initial sync');
+
+# Setup logical replication, replicating only 1 table
+my $connstr = $publisher->connstr . ' dbname=postgres';
+
+$publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub FOR TABLE t1");
+
+$old_sub->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$connstr' PUBLICATION pub");
+
+# Wait for the catchup, as we need the subscription rel in ready state
+$old_sub->wait_for_subscription_sync($publisher, 'sub');
+
+# Check that pg_upgrade refuses to run if there's a subscription without a valid
+# remote_lsn.
+#
+# Replication origin's remote_lsn isn't set if no data is replicated after the
+# initial sync.
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',        '-d', $old_sub->data_dir,
+		'-D',         $new_sub->data_dir, '-b', $bindir,
+		'-B',         $bindir,            '-s', $new_sub->host,
+		'-p',         $old_sub->port,     '-P', $new_sub->port,
+		$mode,
+		'--preserve-subscription-state',
+		'--check',
+	],
+	'run of pg_upgrade --check for old instance with invalid remote_lsn');
+ok(-d $new_sub->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+rmtree($new_sub->data_dir . "/pg_upgrade_output.d");
+
+# Make sure the replication origin is set
+insert_line('after initial sync');
+$old_sub->wait_for_subscription_sync($publisher, 'sub');
+
+my $result = $old_sub->safe_psql('postgres',
+    "SELECT COUNT(*) FROM pg_subscription_rel WHERE srsubstate != 'r'");
+is ($result, qq(0), "All tables in pg_subscription_rel should be in ready state");
+
+# Check the number of rows for each table on each server
+$result = $publisher->safe_psql('postgres',
+	"SELECT count(*) FROM t1");
+is ($result, qq(2), "Table t1 should have 2 rows on the publisher");
+$result = $publisher->safe_psql('postgres',
+	"SELECT count(*) FROM t2");
+is ($result, qq(2), "Table t2 should have 2 rows on the publisher");
+$result = $old_sub->safe_psql('postgres',
+	"SELECT count(*) FROM t1");
+is ($result, qq(2), "Table t1 should have 2 rows on the old subscriber");
+$result = $old_sub->safe_psql('postgres',
+	"SELECT count(*) FROM t2");
+is ($result, qq(0), "Table t2 should have 0 rows on the old subscriber");
+
+# Check that pg_upgrade refuses to run if there's a subscription with tables in
+# a state different than 'r' (ready).
+$old_sub->safe_psql('postgres',
+    "ALTER SUBSCRIPTION sub DISABLE");
+$old_sub->safe_psql('postgres',
+	"UPDATE pg_subscription_rel
+		SET srsubstate = 'i' WHERE srsubstate = 'r'");
+
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',        '-d', $old_sub->data_dir,
+		'-D',         $new_sub->data_dir, '-b', $bindir,
+		'-B',         $bindir,            '-s', $new_sub->host,
+		'-p',         $old_sub->port,     '-P', $new_sub->port,
+		$mode,
+		'--preserve-subscription-state',
+		'--check',
+	],
+	'run of pg_upgrade --check for old instance with incorrect sub rel');
+ok(-d $new_sub->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+rmtree($new_sub->data_dir . "/pg_upgrade_output.d");
+
+# Check that pg_upgrade doesn't detect any problem once all the subscription's
+# relation are in 'r' (ready) state.
+$old_sub->safe_psql('postgres',
+	"UPDATE pg_subscription_rel
+		SET srsubstate = 'r' WHERE srsubstate = 'i'");
+
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',        '-d', $old_sub->data_dir,
+		'-D',         $new_sub->data_dir, '-b', $bindir,
+		'-B',         $bindir,            '-s', $new_sub->host,
+		'-p',         $old_sub->port,     '-P', $new_sub->port,
+		$mode,
+		'--preserve-subscription-state',
+		'--check',
+	],
+	'run of pg_upgrade --check for old instance with correct sub rel');
+
+# Stop the old subscriber, insert a row in each table while it's down and add
+# t2 to the publication
+my $remote_lsn = $old_sub->safe_psql('postgres',
+	"SELECT remote_lsn FROM pg_replication_origin_status");
+$old_sub->stop;
+
+insert_line('while old_sub is down');
+
+# Run pg_upgrade
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',        '-d', $old_sub->data_dir,
+		'-D',         $new_sub->data_dir, '-b', $bindir,
+		'-B',         $bindir,            '-s', $new_sub->host,
+		'-p',         $old_sub->port,     '-P', $new_sub->port,
+		$mode,
+		'--preserve-subscription-state',
+	],
+	'run of pg_upgrade for new sub');
+ok( !-d $new_sub->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+$publisher->safe_psql('postgres',
+	"ALTER PUBLICATION pub ADD TABLE t2");
+
+$new_sub->start;
+
+# Subscription relations and replication origin remote_lsn should be preserved
+$result = $new_sub->safe_psql('postgres',
+    "SELECT count(*) FROM pg_subscription_rel");
+is ($result, qq(1), "There should be 1 row in pg_subscription_rel");
+
+$result = $new_sub->safe_psql('postgres',
+    "SELECT remote_lsn FROM pg_replication_origin_status");
+is ($result, qq($remote_lsn), "remote_lsn should have been preserved");
+
+# There should be no new replicated rows before enabling the subscription
+$result = $new_sub->safe_psql('postgres',
+	"SELECT count(*) FROM t1");
+is ($result, qq(2), "Table t1 should still have 2 rows on the new subscriber");
+$result = $new_sub->safe_psql('postgres',
+	"SELECT count(*) FROM t2");
+is ($result, qq(0), "Table t2 should still have 0 rows on the new subscriber");
+
+# Enable the subscription
+$new_sub->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub ENABLE");
+
+$publisher->wait_for_catchup('sub');
+
+# Rows on t1 should have been replicated, while nothing should happen for t2
+$result = $new_sub->safe_psql('postgres',
+	"SELECT count(*) FROM t1");
+is ($result, qq(3), "Table t1 should now have 3 rows on the new subscriber");
+$result = $new_sub->safe_psql('postgres',
+	"SELECT count(*) FROM t2");
+is ($result, qq(0), "Table t2 should still have 0 rows on the new subscriber");
+
+# Refresh the subscription, only the missing row on t2 should be replicated
+$new_sub->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub REFRESH PUBLICATION");
+$new_sub->wait_for_subscription_sync($publisher, 'sub');
+$result = $new_sub->safe_psql('postgres',
+	"SELECT count(*) FROM t1");
+is ($result, qq(3), "Table t1 should still have 3 rows on the new subscriber");
+$result = $new_sub->safe_psql('postgres',
+	"SELECT count(*) FROM t2");
+is ($result, qq(3), "Table t2 should now have 3 rows on the new subscriber");
+
+done_testing();
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9805bc6118..ac7ec5df31 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5488,6 +5488,13 @@
   proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
   proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
   prosrc => 'pg_stat_get_subscription' },
+{ oid => '6108', descr => 'add a relation with the specified state to pg_subscription_rel table',
+  proname => 'binary_upgrade_create_sub_rel_state', prorettype => 'void',
+  proargtypes => 'text oid char pg_lsn',
+  proallargtypes => '{text,oid,char,pg_lsn}',
+  proargmodes => '{i,i,i,i}',
+  proargnames => '{subname,relid,state,sublsn}',
+  prosrc => 'binary_upgrade_create_sub_rel_state' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
   prorettype => 'int4', proargtypes => '', prosrc => 'pg_backend_pid' },
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 0656c94416..190ee73809 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2649,6 +2649,7 @@ SubLinkType
 SubOpts
 SubPlan
 SubPlanState
+SubRelInfo
 SubRemoveRels
 SubTransactionId
 SubXactCallback
-- 
2.34.1

Reply via email to