On Thu, 21 Sept 2023 at 11:27, Michael Paquier <mich...@paquier.xyz> wrote:
>
> On Fri, Sep 15, 2023 at 03:08:21PM +0530, vignesh C wrote:
> > On Tue, 12 Sept 2023 at 14:25, Hayato Kuroda (Fujitsu)
> > <kuroda.hay...@fujitsu.com> wrote:
> >> Is there a possibility that apply worker on old cluster connects to the
> >> publisher during the upgrade? Regarding the pg_upgrade on publisher, the we
> >> refuse TCP/IP connections from remotes and port number is also changed, so 
> >> we can
> >> assume that subscriber does not connect to. But IIUC such settings may not 
> >> affect
> >> to the connection source, so that the apply worker may try to connect to 
> >> the
> >> publisher. Also, is there any hazards if it happens?
> >
> > Yes, there is a possibility that the apply worker gets started and new
> > transaction data is being synced from the publisher. I have made a fix
> > not to start the launcher process in binary ugprade mode as we don't
> > want the launcher to start apply worker during upgrade.
>
> Hmm.  I was wondering if 0001 is the right way to handle this case,
> but at the end I'm OK to paint one extra isBinaryUpgrade in the code
> path where apply launchers are registered.  I don't think that the
> patch is complete, though.  A comment should be added in pg_upgrade's
> server.c, exactly start_postmaster(), to tell that -b also stops apply
> workers.  I am attaching a version updated as of the attached, that
> I'd be OK to apply.

I have added comments

> I don't really think that we need to worry about a subscriber
> connecting back to a publisher in this case, though?  I mean, each
> postmaster instance started by pg_upgrade restricts the access to the
> instance with unix_socket_directories set to a custom path and
> permissions at 0700, and a subscription's connection string does not
> know the unix path used by pg_upgrade.  I certainly agree that
> stopping these processes could lead to inconsistencies in the data the
> subscribers have been holding though, if we are not careful, so
> preventing them from running is a good practice anyway.

I have made the fix similar to how upgrade publisher has done to keep
it  consistent.

> I have also reviewed 0002.  As a whole, I think that I'm OK with the
> main approach of the patch in pg_dump to use a new type of dumpable
> object for subscription relations that are dumped with their upgrade
> functions after.  This still needs more work, and more documentation.

Added documentation

> Also, perhaps we should really have an option to control if this part
> of the copy happens or not.  With a --no-subscription-relations for
> pg_dump at least?

Currently this is done by default in binary upgrade mode, I will add a
separate patch to skip dump of subscription relations from upgrade and
dump a little later.

>
> +{ oid => '4551', descr => 'add a relation with the specified relation state 
> to pg_subscription_rel table',
>
> During a development cycle, any new function added needs to use an OID
> in range 8000-9999.  Running unused_oids will suggest new random OIDs.

Modified

> FWIW, I am not convinced that there is a need for two functions to add
> an entry to pg_subscription_rel, with sole difference between both the
> handling of a valid or invalid LSN.  We should have only one function
> that's able to handle NULL for the LSN.  So let's remove rel_state_a
> and rel_state_b, and have a single rel_state().  The description of
> the SQL functions is inconsistent with the other binary upgrade ones,
> I would suggest for the two functions
> "for use by pg_upgrade (relation for pg_subscription_rel)"
> "for use by pg_upgrade (remote_lsn for origin)"

Removed rel_state_a and rel_state_b and updated the description accordingly

> +   i_srsublsn = PQfnumber(res, "srsublsn");
> [...]
> +       subrinfo[cur_rel].srsublsn = pg_strdup(PQgetvalue(res, i, 
> i_srsublsn));
>
> In getSubscriptionTables(), this should check for PQgetisnull()
> because we would have a NULL value for InvalidXLogRecPtr in the
> catalog.  Using a char* for srsublsn is OK, but just assign NULL to
> it, then just pass a hardcoded NULL value to the function as we do in
> other places.  So I don't quite get why this is not the same handling
> as suboriginremotelsn.

Modified

>
> getSubscriptionTables() is entirely skipped if we don't want any
> subscriptions, if we deal with a server of 9.6 or older or if we don't
> do binary upgrades, which is OK.
>
> +/*
> + * getSubscriptionTables
> + *       get information about subscription membership for dumpable tables.
> + */
> This commit is slightly misleading and should mention that this is an
> upgrade-only path?

Modified

>
> The code for dumpSubscriptionTable() is a copy-paste of
> dumpPublicationTable(), but a lot of what you are doing here is
> actually pointless if we are not in binary mode?  Why should this code
> path not taken only under dataOnly?  I mean, this is a code path we
> should never take except if we are in binary mode.  This should have
> at least a cross-check to make sure that we never have a
> DO_SUBSCRIPTION_REL in this code path if we are in non-binary mode.

I have added an assert in this case, as it is not expected to come
here in non binary mode

> +    if (dopt->binary_upgrade && subinfo->suboriginremotelsn)
> +    {
> +        appendPQExpBufferStr(query,
> +                             "SELECT 
> pg_catalog.binary_upgrade_replorigin_advance(");
> +        appendStringLiteralAH(query, subinfo->dobj.name, fout);
> +        appendPQExpBuffer(query, ", '%s');\n", subinfo->suboriginremotelsn);
> +    }
>
> Hmm..  Could it be actually useful even for debugging to still have
> this query if suboriginremotelsn is an InvalidXLogRecPtr?  I think
> that this should have a comment of the kind "\n-- For binary upgrade,
> blah".  At least it would not be a bad thing to enforce a correct
> state from the start, removing the NULL check for the second argument
> in binary_upgrade_replorigin_advance().

Modified

> +    /* We need to check for pg_replication_origin_status only once. */
> Perhaps it would be better to explain why?

This remote_lsn code change is actually not required, I have removed this now.

>
> +                       "WHERE coalesce(remote_lsn, '0/0') = '0/0'"
> Why a COALESCE here?  Cannot this stuff just use NULL?

This remote_lsn code change is actually not required, I have removed this now.

> +    fprintf(script, "database:%s subscription:%s relation:%s in non-ready 
> state\n",
> Could it be possible to include the schema of the relation in this log?

Modified

> +static void check_for_subscription_state(ClusterInfo *cluster);
> I'd be tempted to move that into a patch on its own, actually, for a
> cleaner history.

As of now I have kept it together, I will change it later based on
more feedback from others

> +# Copyright (c) 2022-2023, PostgreSQL Global Development Group
> New as of 2023.

Modified

> +# Check that after upgradation of the subscriber server, the incremental
> +# changes added to the publisher are replicated.
> [..]
> +   For upgradation of the subscriptions, all the subscriptions on the old
> +   cluster must have a valid <varname>remote_lsn</varname>, and all the
>
> Upgradation?  I think that this should be reworded:
> "All the subscriptions of an old cluster require a valid remote_lsn
> during an upgrade."

This remote_lsn code change is actually not required, I have removed this now.

>
> A CI run is reporting the following compilation warnings:
> [04:21:15.290] pg_dump.c: In function ‘getSubscriptionTables’:
> [04:21:15.290] pg_dump.c:4655:29: error: ‘subinfo’ may be used
> uninitialized in this function [-Werror=maybe-uninitialized]
> [04:21:15.290]  4655 |   subrinfo[cur_rel].subinfo = subinfo;

 I have initialized and checked with [-Werror=maybe-uninitialized],
let me check in the next cfbot run


> +ok(-d $new_sub->data_dir . "/pg_upgrade_output.d",
> +       "pg_upgrade_output.d/ not removed after pg_upgrade failure");
> Not sure that there's a need for this check.  Okay, that's cheap.

Modified

> And, err.  We are going to need an option to control if the slot data
> is copied, and a bit more documentation in pg_upgrade to explain how
> things happen when the copy happens.
Added documentation for this, we will copy the slot data by default,
we will add a separate patch to skip dump of subscription
relations/replication slot from upgrade and dump a little later.

The attached v9 version patch has the changes for the same.

Apart from this I'm still checking that the old cluster's subscription
relations states are READY state still, but there is a possibility
that SYNCDONE or FINISHEDCOPY could work, this needs more thought
before concluding which is the correct state to check. Let' handle
this in the upcoming version.

Regards,
Vignesh
From 40cff73c7bd5d78eb05609986e25b1718fdbea0c Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Fri, 27 Oct 2023 11:18:28 +0530
Subject: [PATCH v9 1/2] Prevent startup of logical replication launcher in
 binary upgrade mode

The logical replication launcher may start apply workers during an
upgrade, which could be the cause of corruptions on a new cluster if
these are able to apply changes before the physical files are copied
over.

The chance of being able to do so should be small as pg_upgrade uses its
own port and unix domain directory (customizable as well with
--socketdir), but just preventing the launcher to start is safer at the
end, because we are then sure that no changes would ever be applied.

Author: Vignesh C
Discussion: https://postgr.es/m/CALDaNm2g9ZKf=y8X6z6MsLCuh8WwU-=q6plj35nfi2m5bzn...@mail.gmail.com
---
 src/bin/pg_upgrade/server.c | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/src/bin/pg_upgrade/server.c b/src/bin/pg_upgrade/server.c
index d7f6c268ef..9dedf63a87 100644
--- a/src/bin/pg_upgrade/server.c
+++ b/src/bin/pg_upgrade/server.c
@@ -248,9 +248,14 @@ start_postmaster(ClusterInfo *cluster, bool report_and_exit_on_error)
 	 * invalidation of slots during the upgrade. We set this option when
 	 * cluster is PG17 or later because logical replication slots can only be
 	 * migrated since then. Besides, max_slot_wal_keep_size is added in PG13.
+	 * We don't want the launcher to run while upgrading because it may start
+	 * apply workers which could start receiving changes from the publisher
+	 * before the physical files are put in place, causing corruption on the
+	 * new cluster upgrading to, so setting max_logical_replication_workers=0
+	 * to disable launcher.
 	 */
 	if (GET_MAJOR_VERSION(cluster->major_version) >= 1700)
-		appendPQExpBufferStr(&pgoptions, " -c max_slot_wal_keep_size=-1");
+		appendPQExpBufferStr(&pgoptions, " -c max_slot_wal_keep_size=-1 -c max_logical_replication_workers=0");
 
 	/* Use -b to disable autovacuum. */
 	snprintf(cmd, sizeof(cmd),
-- 
2.34.1

From d4313513a3af89461f3b00e81761ab001fb8764a Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Fri, 27 Oct 2023 10:58:04 +0530
Subject: [PATCH v9 2/2] Preserve the full subscription's state during
 pg_upgrade

Previously, only the subscription metadata information was preserved.  Without
the list of relations and their state it's impossible to re-enable the
subscriptions without missing some records as the list of relations can only be
refreshed after enabling the subscription (and therefore starting the apply
worker).  Even if we added a way to refresh the subscription while enabling a
publication, we still wouldn't know which relations are new on the publication
side, and therefore should be fully synced, and which shouldn't.

To fix this problem, this patch teaches pg_dump to restore the content of
pg_subscription_rel from the old cluster by using
binary_upgrade_create_sub_rel_state SQL function. This is supported only
in binary upgrade mode.

The new SQL binary_upgrade_create_sub_rel_state function has the following
syntax:
SELECT binary_upgrade_create_sub_rel_state(subname text, relid oid, state char [,sublsn pg_lsn])

In the above, subname is the subscription name, relid is the relation
identifier, the state is the state of the relation, sublsn is subscription lsn
which is optional, and defaults to NULL/InvalidXLogRecPtr if not provided.
pg_dump will retrieve these values(subname, relid, state and sublsn) from the
old cluster.

The subscription's replication origin are needed to ensure
that we don't replicate anything twice.

To fix this problem, this patch teaches pg_dump to update the replication
origin along with create subscription by using
binary_upgrade_replorigin_advance SQL function to restore the
underlying replication origin remote LSN. This is supported only in
binary upgrade mode.

The new SQL binary_upgrade_replorigin_advance function has the following
syntax:
SELECT binary_upgrade_replorigin_advance(subname text, sublsn pg_lsn)

In the above, subname is the subscription name and sublsn is subscription lsn.
pg_dump will retrieve these values(subname and sublsn) from the old cluster.

pg_upgrade will check that all the subscription have a valid replication origin
remote_lsn, and that all underlying relations are in 'r' (ready) state, and
will error out if that's not the case, logging the reason for the failure.

Author: Julien Rouhaud
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20230217075433.u5mjly4d5cr4hcfe@jrouhaud
---
 doc/src/sgml/ref/pgupgrade.sgml            |  45 +++++
 src/backend/catalog/pg_subscription.c      |   2 +
 src/backend/utils/adt/pg_upgrade_support.c | 126 +++++++++++++
 src/bin/pg_dump/common.c                   |  22 +++
 src/bin/pg_dump/pg_dump.c                  | 198 +++++++++++++++++++-
 src/bin/pg_dump/pg_dump.h                  |  16 ++
 src/bin/pg_dump/pg_dump_sort.c             |  11 +-
 src/bin/pg_upgrade/check.c                 |  76 ++++++++
 src/bin/pg_upgrade/meson.build             |   1 +
 src/bin/pg_upgrade/t/004_subscription.pl   | 200 +++++++++++++++++++++
 src/include/catalog/pg_proc.dat            |  10 ++
 src/tools/pgindent/typedefs.list           |   1 +
 12 files changed, 704 insertions(+), 4 deletions(-)
 create mode 100644 src/bin/pg_upgrade/t/004_subscription.pl

diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 46e8a0b746..280621389d 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -456,6 +456,45 @@ make prefix=/usr/local/pgsql.new install
 
    </step>
 
+   <step>
+    <title>Prepare for subscriber upgrades</title>
+
+    <para>
+     Verify that all the subscription tables in the old subscriber are in
+     <literal>r</literal> (ready) state. Setup the
+     <link linkend="logical-replication-config-subscriber"> subscriber
+     configurations</link> in the new subscriber.
+     <application>pg_upgrade</application> attempts to migrate subscription
+     dependencies which includes the subscription tables information present in
+     <link linkend="catalog-pg-subscription-rel">pg_subscription_rel</link>
+     system table and the subscription replication origin which
+     will help in continuing logical replication from where the old subscriber
+     was replicating. This helps in avoiding the need for setting up the
+     subscription objects manually which requires truncating all the
+     subscription tables and setting the logical replication slots. Migration
+     of subscriber dependencies is only supported when the old cluster is
+     version 17.0 or later. Subscriber dependencies on clusters before version
+     17.0 will silently be ignored.
+    </para>
+
+    <para>
+     There is a prerequisites that all the subscription tables should be in
+     <literal>r</literal> (ready) state for
+     <application>pg_upgrade</application> to be able to upgrade the
+     subscriber. If this is not met an error will be reported.
+    </para>
+
+    <para>
+     Enable the subscriptions by executing
+     <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... ENABLE</command></link>.
+    </para>
+    <para>
+     Create all the new tables that were created in the publication and
+     refresh the publication by executing
+     <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command></link>.
+    </para>
+   </step>
+
    <step>
     <title>Stop both servers</title>
 
@@ -928,6 +967,12 @@ psql --username=postgres --file=script.sql postgres
    (<type>regclass</type>, <type>regrole</type>, and <type>regtype</type> can be upgraded.)
   </para>
 
+  <para>
+   For upgradation of the subscriptions, all the subscription tables should be
+   in <literal>r</literal> (ready) state, or else the
+   <application>pg_upgrade</application> run will error.
+  </para>
+
   <para>
    If you want to use link mode and you do not want your old cluster
    to be modified when the new cluster is started, consider using the clone mode.
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d6a978f136..492b34ff12 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -25,6 +25,8 @@
 #include "catalog/pg_type.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "replication/origin.h"
+#include "replication/worker_internal.h"
 #include "storage/lmgr.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 2f6fc86c3d..e8b12adb3c 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -11,15 +11,21 @@
 
 #include "postgres.h"
 
+#include "access/table.h"
 #include "catalog/binary_upgrade.h"
 #include "catalog/heap.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
 #include "commands/extension.h"
 #include "miscadmin.h"
 #include "replication/logical.h"
+#include "replication/origin.h"
+#include "replication/worker_internal.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/syscache.h"
 
 
 #define CHECK_IS_BINARY_UPGRADE									\
@@ -305,3 +311,123 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 
 	PG_RETURN_BOOL(!found_pending_wal);
 }
+
+/*
+ * binary_upgrade_create_sub_rel_state
+ *
+ * Add the relation with the specified relation state to pg_subscription_rel
+ * table.
+ */
+Datum
+binary_upgrade_create_sub_rel_state(PG_FUNCTION_ARGS)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	Oid			subid;
+	Form_pg_subscription form;
+	char	   *subname;
+	Oid			relid;
+	char		relstate;
+	XLogRecPtr	sublsn;
+
+	CHECK_IS_BINARY_UPGRADE;
+
+	/* We must check these things before dereferencing the arguments */
+	if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2))
+		elog(ERROR, "null argument to binary_upgrade_create_sub_rel_state is not allowed");
+
+	subname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	relid = PG_GETARG_OID(1);
+	relstate = PG_GETARG_CHAR(2);
+
+	if (PG_ARGISNULL(3))
+		sublsn = InvalidXLogRecPtr;
+	else
+		sublsn = PG_GETARG_LSN(3);
+
+	if (!OidIsValid(relid))
+		ereport(ERROR,
+				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				errmsg("invalid relation identifier used: %u", relid));
+
+	tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				errmsg("relation %u does not exist", relid));
+	ReleaseSysCache(tup);
+
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+
+	/* Fetch the existing tuple. */
+	tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
+						  CStringGetDatum(subname));
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				errcode(ERRCODE_UNDEFINED_OBJECT),
+				errmsg("subscription \"%s\" does not exist", subname));
+
+	form = (Form_pg_subscription) GETSTRUCT(tup);
+	subid = form->oid;
+
+	AddSubscriptionRelState(subid, relid, relstate, sublsn);
+
+	ReleaseSysCache(tup);
+	table_close(rel, RowExclusiveLock);
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * binary_upgrade_replorigin_advance
+ *
+ * Update the remote_lsn for the subscriber's replication origin.
+ */
+Datum
+binary_upgrade_replorigin_advance(PG_FUNCTION_ARGS)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	Oid			subid;
+	Form_pg_subscription form;
+	char	   *subname;
+	XLogRecPtr	sublsn;
+	char		originname[NAMEDATALEN];
+	RepOriginId originid;
+
+	CHECK_IS_BINARY_UPGRADE;
+
+	/* We must check these things before dereferencing the arguments */
+	if (PG_ARGISNULL(0))
+		elog(ERROR, "null argument to binary_upgrade_replorigin_advance is not allowed");
+
+	subname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+
+	if (PG_ARGISNULL(1))
+		sublsn = InvalidXLogRecPtr;
+	else
+		sublsn = PG_GETARG_LSN(1);
+
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+
+	/* Fetch the existing tuple. */
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
+							  CStringGetDatum(subname));
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				errcode(ERRCODE_UNDEFINED_OBJECT),
+				errmsg("subscription \"%s\" does not exist", subname));
+
+	form = (Form_pg_subscription) GETSTRUCT(tup);
+	subid = form->oid;
+
+	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
+	originid = replorigin_by_name(originname, false);
+	replorigin_advance(originid, sublsn, InvalidXLogRecPtr,
+					   false /* backward */ ,
+					   false /* WAL log */ );
+	heap_freetuple(tup);
+	table_close(rel, RowExclusiveLock);
+
+	PG_RETURN_VOID();
+}
diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c
index 8b0c1e7b53..764a39fcb9 100644
--- a/src/bin/pg_dump/common.c
+++ b/src/bin/pg_dump/common.c
@@ -24,6 +24,7 @@
 #include "catalog/pg_operator_d.h"
 #include "catalog/pg_proc_d.h"
 #include "catalog/pg_publication_d.h"
+#include "catalog/pg_subscription_d.h"
 #include "catalog/pg_type_d.h"
 #include "common/hashfn.h"
 #include "fe_utils/string_utils.h"
@@ -265,6 +266,9 @@ getSchemaData(Archive *fout, int *numTablesPtr)
 	pg_log_info("reading subscriptions");
 	getSubscriptions(fout);
 
+	pg_log_info("reading subscription membership of tables");
+	getSubscriptionTables(fout);
+
 	free(inhinfo);				/* not needed any longer */
 
 	*numTablesPtr = numTables;
@@ -978,6 +982,24 @@ findPublicationByOid(Oid oid)
 	return (PublicationInfo *) dobj;
 }
 
+/*
+ * findSubscriptionByOid
+ *	  finds the DumpableObject for the subscription with the given oid
+ *	  returns NULL if not found
+ */
+SubscriptionInfo *
+findSubscriptionByOid(Oid oid)
+{
+	CatalogId	catId;
+	DumpableObject *dobj;
+
+	catId.tableoid = SubscriptionRelationId;
+	catId.oid = oid;
+	dobj = findObjectByCatalogId(catId);
+	Assert(dobj == NULL || dobj->objType == DO_SUBSCRIPTION);
+	return (SubscriptionInfo *) dobj;
+}
+
 
 /*
  * recordExtensionMembership
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 7afdbf4d9d..900ddef064 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -296,6 +296,7 @@ static void dumpPolicy(Archive *fout, const PolicyInfo *polinfo);
 static void dumpPublication(Archive *fout, const PublicationInfo *pubinfo);
 static void dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo);
 static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo);
+static void dumpSubscriptionTable(Archive *fout, const SubRelInfo *subrinfo);
 static void dumpDatabase(Archive *fout);
 static void dumpDatabaseConfig(Archive *AH, PQExpBuffer outbuf,
 							   const char *dbname, Oid dboid);
@@ -4581,6 +4582,99 @@ is_superuser(Archive *fout)
 	return false;
 }
 
+/*
+ * getSubscriptionTables
+ *	  get information about subscription membership for dumpable tables, this
+ *    will be used only in binary-upgrade mode.
+ */
+void
+getSubscriptionTables(Archive *fout)
+{
+	DumpOptions *dopt = fout->dopt;
+	SubscriptionInfo *subinfo = NULL;
+	SubRelInfo *subrinfo;
+	PQExpBuffer query;
+	PGresult   *res;
+	int			i_srsubid;
+	int			i_srrelid;
+	int			i_srsubstate;
+	int			i_srsublsn;
+	int			i;
+	int			cur_rel = 0;
+	int			ntups;
+	Oid			last_srsubid = InvalidOid;
+
+	if (dopt->no_subscriptions || !dopt->binary_upgrade ||
+		fout->remoteVersion < 170000)
+		return;
+
+	query = createPQExpBuffer();
+	appendPQExpBuffer(query, "SELECT srsubid, srrelid, srsubstate, srsublsn"
+					  " FROM pg_catalog.pg_subscription_rel"
+					  " ORDER BY srsubid");
+	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+	ntups = PQntuples(res);
+	if (ntups == 0)
+		goto cleanup;
+
+	/* Get subscription relation fields */
+	i_srsubid = PQfnumber(res, "srsubid");
+	i_srrelid = PQfnumber(res, "srrelid");
+	i_srsubstate = PQfnumber(res, "srsubstate");
+	i_srsublsn = PQfnumber(res, "srsublsn");
+
+	subrinfo = pg_malloc(ntups * sizeof(SubRelInfo));
+	for (i = 0; i < ntups; i++)
+	{
+		Oid			cur_srsubid = atooid(PQgetvalue(res, i, i_srsubid));
+		Oid			relid = atooid(PQgetvalue(res, i, i_srrelid));
+		TableInfo  *tblinfo;
+
+		/*
+		 * If we switched to a new subscription, check if the subscription
+		 * exists.
+		 */
+		if (cur_srsubid != last_srsubid)
+		{
+			subinfo = findSubscriptionByOid(cur_srsubid);
+			if (subinfo == NULL)
+				pg_fatal("subscription with OID %u does not exist", cur_srsubid);
+
+			last_srsubid = cur_srsubid;
+		}
+
+		tblinfo = findTableByOid(relid);
+		if (tblinfo == NULL)
+			pg_fatal("failed sanity check, table with OID %u not found",
+					 relid);
+
+		/* OK, make a DumpableObject for this relationship */
+		subrinfo[cur_rel].dobj.objType = DO_SUBSCRIPTION_REL;
+		subrinfo[cur_rel].dobj.catId.tableoid = relid;
+		subrinfo[cur_rel].dobj.catId.oid = cur_srsubid;
+		AssignDumpId(&subrinfo[cur_rel].dobj);
+		subrinfo[cur_rel].dobj.name = pg_strdup(subinfo->dobj.name);
+		subrinfo[cur_rel].tblinfo = tblinfo;
+		subrinfo[cur_rel].srsubstate = PQgetvalue(res, i, i_srsubstate)[0];
+		if (PQgetisnull(res, i, i_srsublsn))
+			subrinfo[cur_rel].srsublsn = NULL;
+		else
+			subrinfo[cur_rel].srsublsn = pg_strdup(PQgetvalue(res, i, i_srsublsn));
+
+		subrinfo[cur_rel].subinfo = subinfo;
+
+		/* Decide whether we want to dump it */
+		selectDumpableObject(&(subrinfo[cur_rel].dobj), fout);
+
+		cur_rel++;
+	}
+
+cleanup:
+	PQclear(res);
+	destroyPQExpBuffer(query);
+}
+
 /*
  * getSubscriptions
  *	  get information about subscriptions
@@ -4606,6 +4700,7 @@ getSubscriptions(Archive *fout)
 	int			i_subpublications;
 	int			i_subbinary;
 	int			i_subpasswordrequired;
+	int			i_suboriginremotelsn;
 	int			i,
 				ntups;
 
@@ -4660,15 +4755,19 @@ getSubscriptions(Archive *fout)
 	if (fout->remoteVersion >= 160000)
 		appendPQExpBufferStr(query,
 							 " s.suborigin,\n"
-							 " s.subpasswordrequired\n");
+							 " s.subpasswordrequired,\n");
 	else
 		appendPQExpBuffer(query,
 						  " '%s' AS suborigin,\n"
-						  " 't' AS subpasswordrequired\n",
+						  " 't' AS subpasswordrequired,\n",
 						  LOGICALREP_ORIGIN_ANY);
 
+	appendPQExpBufferStr(query, "o.remote_lsn\n");
+
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
+						 "LEFT JOIN pg_replication_origin_status o \n"
+						 "    ON o.external_id = 'pg_' || s.oid::text \n"
 						 "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
 						 "                   WHERE datname = current_database())");
 
@@ -4694,6 +4793,7 @@ getSubscriptions(Archive *fout)
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_suborigin = PQfnumber(res, "suborigin");
 	i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
+	i_suboriginremotelsn = PQfnumber(res, "remote_lsn");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4726,6 +4826,11 @@ getSubscriptions(Archive *fout)
 		subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
 		subinfo[i].subpasswordrequired =
 			pg_strdup(PQgetvalue(res, i, i_subpasswordrequired));
+		if (PQgetisnull(res, i, i_suboriginremotelsn))
+			subinfo[i].suboriginremotelsn = NULL;
+		else
+			subinfo[i].suboriginremotelsn =
+				pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4735,6 +4840,80 @@ getSubscriptions(Archive *fout)
 	destroyPQExpBuffer(query);
 }
 
+/*
+ * dumpSubscriptionTable
+ *	  dump the definition of the given subscription table mapping, this will be
+ *    used only for upgrade operation.
+ */
+static void
+dumpSubscriptionTable(Archive *fout, const SubRelInfo *subrinfo)
+{
+	DumpOptions *dopt = fout->dopt;
+	SubscriptionInfo *subinfo = subrinfo->subinfo;
+	PQExpBuffer query;
+	char	   *tag;
+
+	/* Do nothing in data-only dump */
+	if (dopt->dataOnly)
+		return;
+
+	Assert(fout->dopt->binary_upgrade);
+
+	tag = psprintf("%s %s", subinfo->dobj.name, subrinfo->dobj.name);
+
+	query = createPQExpBuffer();
+
+	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+	{
+		/*
+		 * binary_upgrade_create_sub_rel_state will add the subscription
+		 * relation to pg_subscripion_rel table, this is supported only for
+		 * upgrade operation.
+		 */
+		if (fout->remoteVersion >= 170000)
+		{
+			appendPQExpBufferStr(query,
+								 "\n-- For binary upgrade, must preserve the subscriber table.\n");
+			appendPQExpBufferStr(query,
+								 "SELECT pg_catalog.binary_upgrade_create_sub_rel_state(");
+			appendStringLiteralAH(query, subrinfo->dobj.name, fout);
+			appendPQExpBuffer(query,
+							  ", %u, '%c'",
+							  subrinfo->tblinfo->dobj.catId.oid,
+							  subrinfo->srsubstate);
+
+			if (subrinfo->srsublsn && subrinfo->srsublsn[0] != '\0')
+				appendPQExpBuffer(query, ", '%s'",
+								  subrinfo->srsublsn);
+			else
+				appendPQExpBuffer(query, ", NULL");
+
+			appendPQExpBufferStr(query, ");\n");
+		}
+	}
+
+	/*
+	 * There is no point in creating a drop query as the drop is done by table
+	 * drop.  (If you think to change this, see also _printTocEntry().)
+	 * Although this object doesn't really have ownership as such, set the
+	 * owner field anyway to ensure that the command is run by the correct
+	 * role at restore time.
+	 */
+	if (subrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+		ArchiveEntry(fout, subrinfo->dobj.catId, subrinfo->dobj.dumpId,
+					 ARCHIVE_OPTS(.tag = tag,
+								  .namespace = subrinfo->tblinfo->dobj.namespace->dobj.name,
+								  .owner = subinfo->rolname,
+								  .description = "SUBSCRIPTION TABLE",
+								  .section = SECTION_POST_DATA,
+								  .createStmt = query->data));
+
+	/* These objects can't currently have comments or seclabels */
+
+	free(tag);
+	destroyPQExpBuffer(query);
+}
+
 /*
  * dumpSubscription
  *	  dump the definition of the given subscription
@@ -4812,6 +4991,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 
 	appendPQExpBufferStr(query, ");\n");
 
+	if (dopt->binary_upgrade && fout->remoteVersion >= 170000 &&
+		subinfo->suboriginremotelsn)
+	{
+		appendPQExpBufferStr(query,
+							 "\n-- For binary upgrade, must preserve the remote_lsn for the subscriber's replication origin.\n");
+		appendPQExpBufferStr(query,
+							 "SELECT pg_catalog.binary_upgrade_replorigin_advance(");
+		appendStringLiteralAH(query, subinfo->dobj.name, fout);
+		appendPQExpBuffer(query, ", '%s');\n", subinfo->suboriginremotelsn);
+	}
+
 	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
 		ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
 					 ARCHIVE_OPTS(.tag = subinfo->dobj.name,
@@ -10430,6 +10620,9 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
 		case DO_SUBSCRIPTION:
 			dumpSubscription(fout, (const SubscriptionInfo *) dobj);
 			break;
+		case DO_SUBSCRIPTION_REL:
+			dumpSubscriptionTable(fout, (const SubRelInfo *) dobj);
+			break;
 		case DO_PRE_DATA_BOUNDARY:
 		case DO_POST_DATA_BOUNDARY:
 			/* never dumped, nothing to do */
@@ -18496,6 +18689,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
 			case DO_PUBLICATION_REL:
 			case DO_PUBLICATION_TABLE_IN_SCHEMA:
 			case DO_SUBSCRIPTION:
+			case DO_SUBSCRIPTION_REL:
 				/* Post-data objects: must come after the post-data boundary */
 				addObjectDependency(dobj, postDataBound->dumpId);
 				break;
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index d8f27f187c..efc942283c 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -83,6 +83,7 @@ typedef enum
 	DO_PUBLICATION_REL,
 	DO_PUBLICATION_TABLE_IN_SCHEMA,
 	DO_SUBSCRIPTION,
+	DO_SUBSCRIPTION_REL,
 } DumpableObjectType;
 
 /*
@@ -670,8 +671,21 @@ typedef struct _SubscriptionInfo
 	char	   *subsynccommit;
 	char	   *subpublications;
 	char	   *subpasswordrequired;
+	char	   *suboriginremotelsn;
 } SubscriptionInfo;
 
+/*
+ * The SubRelInfo struct is used to represent a subscription relation.
+ */
+typedef struct _SubRelInfo
+{
+	DumpableObject dobj;
+	SubscriptionInfo *subinfo;
+	TableInfo  *tblinfo;
+	char		srsubstate;
+	char	   *srsublsn;
+} SubRelInfo;
+
 /*
  *	common utility functions
  */
@@ -696,6 +710,7 @@ extern CollInfo *findCollationByOid(Oid oid);
 extern NamespaceInfo *findNamespaceByOid(Oid oid);
 extern ExtensionInfo *findExtensionByOid(Oid oid);
 extern PublicationInfo *findPublicationByOid(Oid oid);
+extern SubscriptionInfo *findSubscriptionByOid(Oid oid);
 
 extern void recordExtensionMembership(CatalogId catId, ExtensionInfo *ext);
 extern ExtensionInfo *findOwningExtension(CatalogId catalogId);
@@ -755,5 +770,6 @@ extern void getPublicationNamespaces(Archive *fout);
 extern void getPublicationTables(Archive *fout, TableInfo tblinfo[],
 								 int numTables);
 extern void getSubscriptions(Archive *fout);
+extern void getSubscriptionTables(Archive *fout);
 
 #endif							/* PG_DUMP_H */
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index abfea15c09..4a4b91224d 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -94,6 +94,7 @@ enum dbObjectTypePriorities
 	PRIO_PUBLICATION_REL,
 	PRIO_PUBLICATION_TABLE_IN_SCHEMA,
 	PRIO_SUBSCRIPTION,
+	PRIO_SUBSCRIPTION_REL,
 	PRIO_DEFAULT_ACL,			/* done in ACL pass */
 	PRIO_EVENT_TRIGGER,			/* must be next to last! */
 	PRIO_REFRESH_MATVIEW		/* must be last! */
@@ -147,10 +148,11 @@ static const int dbObjectTypePriority[] =
 	PRIO_PUBLICATION,			/* DO_PUBLICATION */
 	PRIO_PUBLICATION_REL,		/* DO_PUBLICATION_REL */
 	PRIO_PUBLICATION_TABLE_IN_SCHEMA,	/* DO_PUBLICATION_TABLE_IN_SCHEMA */
-	PRIO_SUBSCRIPTION			/* DO_SUBSCRIPTION */
+	PRIO_SUBSCRIPTION,			/* DO_SUBSCRIPTION */
+	PRIO_SUBSCRIPTION_REL		/* DO_SUBSCRIPTION_REL */
 };
 
-StaticAssertDecl(lengthof(dbObjectTypePriority) == (DO_SUBSCRIPTION + 1),
+StaticAssertDecl(lengthof(dbObjectTypePriority) == (DO_SUBSCRIPTION_REL + 1),
 				 "array length mismatch");
 
 static DumpId preDataBoundId;
@@ -1472,6 +1474,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
 					 "SUBSCRIPTION (ID %d OID %u)",
 					 obj->dumpId, obj->catId.oid);
 			return;
+		case DO_SUBSCRIPTION_REL:
+			snprintf(buf, bufsize,
+					 "SUBSCRIPTION TABLE (ID %d)",
+					 obj->dumpId);
+			return;
 		case DO_PRE_DATA_BOUNDARY:
 			snprintf(buf, bufsize,
 					 "PRE-DATA BOUNDARY  (ID %d)",
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 179f85ae8a..e5a3112dce 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -20,6 +20,7 @@ static void check_is_install_user(ClusterInfo *cluster);
 static void check_proper_datallowconn(ClusterInfo *cluster);
 static void check_for_prepared_transactions(ClusterInfo *cluster);
 static void check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster);
+static void check_for_subscription_state(ClusterInfo *cluster);
 static void check_for_user_defined_postfix_ops(ClusterInfo *cluster);
 static void check_for_incompatible_polymorphics(ClusterInfo *cluster);
 static void check_for_tables_with_oids(ClusterInfo *cluster);
@@ -112,6 +113,8 @@ check_and_dump_old_cluster(bool live_check)
 	check_for_reg_data_type_usage(&old_cluster);
 	check_for_isn_and_int8_passing_mismatch(&old_cluster);
 
+	check_for_subscription_state(&old_cluster);
+
 	/*
 	 * Logical replication slots can be migrated since PG17. See comments atop
 	 * get_old_cluster_logical_slot_infos().
@@ -812,6 +815,79 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster)
 		check_ok();
 }
 
+/*
+ * check_for_subscription_state()
+ *
+ * Verify that each of the subscriptions have all their corresponding tables in
+ * ready state.
+ */
+static void
+check_for_subscription_state(ClusterInfo *cluster)
+{
+	int			dbnum;
+	FILE	   *script = NULL;
+	char		output_path[MAXPGPATH];
+	int			ntup;
+
+	/* Subscription relations state can be migrated since PG17. */
+	if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700)
+		return;
+
+	prep_status("Checking for subscription state");
+
+	snprintf(output_path, sizeof(output_path), "%s/%s",
+			 log_opts.basedir,
+			 "subscription_state.txt");
+	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+	{
+		PGresult   *res;
+		DbInfo	   *active_db = &cluster->dbarr.dbs[dbnum];
+		PGconn	   *conn = connectToServer(cluster, active_db->db_name);
+
+		res = executeQueryOrDie(conn,
+								"SELECT s.subname, c.relname, n.nspname "
+								"FROM pg_catalog.pg_subscription_rel r "
+								"LEFT JOIN pg_catalog.pg_subscription s"
+								"	ON r.srsubid = s.oid "
+								"LEFT JOIN pg_catalog.pg_class c"
+								"	ON r.srrelid = c.oid "
+								"LEFT JOIN pg_catalog.pg_namespace n"
+								"	ON c.relnamespace = n.oid "
+								"WHERE srsubstate != 'r' "
+								"ORDER BY s.subname");
+
+		ntup = PQntuples(res);
+		for (int i = 0; i < ntup; i++)
+		{
+			if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
+				pg_fatal("could not open file \"%s\": %s",
+							output_path, strerror(errno));
+
+			fprintf(script, "database:%s subscription:%s schema:%s relation:%s in non-ready state\n",
+					active_db->db_name,
+					PQgetvalue(res, i, 0),
+					PQgetvalue(res, i, 1),
+					PQgetvalue(res, i, 2));
+		}
+
+		PQclear(res);
+		PQfinish(conn);
+	}
+
+	if (script)
+	{
+		fclose(script);
+		pg_log(PG_REPORT, "fatal");
+		pg_fatal("Your installation contains subscription(s) with\n"
+				 "invalid remote_lsn or subscription relation(s) not in ready state.\n"
+				 "A list of subscription having invalid remote_lsn and/or\n"
+				 "subscription relation(s) not in ready state is in the file: %s",
+				 output_path);
+	}
+	else
+		check_ok();
+}
+
 /*
  * Verify that no user defined postfix operators exist.
  */
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 2c4f38d865..9bd6e5cbe1 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -43,6 +43,7 @@ tests += {
       't/001_basic.pl',
       't/002_pg_upgrade.pl',
       't/003_upgrade_logical_replication_slots.pl',
+      't/004_subscription.pl',
     ],
     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
   },
diff --git a/src/bin/pg_upgrade/t/004_subscription.pl b/src/bin/pg_upgrade/t/004_subscription.pl
new file mode 100644
index 0000000000..b495de96e3
--- /dev/null
+++ b/src/bin/pg_upgrade/t/004_subscription.pl
@@ -0,0 +1,200 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Test for pg_upgrade of logical subscription
+use strict;
+use warnings;
+
+use Cwd qw(abs_path);
+use File::Basename qw(dirname);
+use File::Compare;
+use File::Find qw(find);
+use File::Path qw(rmtree);
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::AdjustUpgrade;
+use Test::More;
+
+# Can be changed to test the other modes.
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize publisher node
+my $publisher = PostgreSQL::Test::Cluster->new('publisher');
+$publisher->init(allows_streaming => 'logical');
+$publisher->start;
+
+# Initialize the old subscriber node
+my $old_sub = PostgreSQL::Test::Cluster->new('old_sub');
+$old_sub->init;
+$old_sub->start;
+
+# Initialize the new subscriber
+my $new_sub = PostgreSQL::Test::Cluster->new('new_sub');
+$new_sub->init;
+my $bindir = $new_sub->config_data('--bindir');
+
+sub insert_line
+{
+	my $payload = shift;
+
+	foreach ("t1", "t2")
+	{
+		$publisher->safe_psql('postgres',
+			"INSERT INTO " . $_ . " (val) VALUES('$payload')");
+	}
+}
+
+# Initial setup
+foreach ("t1", "t2")
+{
+	$publisher->safe_psql('postgres',
+		"CREATE TABLE " . $_ . " (id serial, val text)");
+	$old_sub->safe_psql('postgres',
+		"CREATE TABLE " . $_ . " (id serial, val text)");
+}
+insert_line('before initial sync');
+
+# Setup logical replication, replicating only 1 table
+my $connstr = $publisher->connstr . ' dbname=postgres';
+
+$publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR TABLE t1");
+
+$old_sub->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$connstr' PUBLICATION pub");
+
+# Wait for the catchup, as we need the subscription rel in ready state
+$old_sub->wait_for_subscription_sync($publisher, 'sub');
+
+# ------------------------------------------------------
+# Check that pg_upgrade is succesful when all tables are in ready state.
+# ------------------------------------------------------
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');";
+$old_sub->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',        '-d', $old_sub->data_dir,
+		'-D',         $new_sub->data_dir, '-b', $bindir,
+		'-B',         $bindir,            '-s', $new_sub->host,
+		'-p',         $old_sub->port,     '-P', $new_sub->port,
+		$mode,        '--check',
+	],
+	'run of pg_upgrade --check for old instance with invalid remote_lsn');
+ok( !-d $new_sub->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after successful pg_upgrade");
+
+# Check the number of rows for each table on each server
+my $result = $publisher->safe_psql('postgres', "SELECT count(*) FROM t1");
+is($result, qq(1), "check initial t1 table data on publisher");
+$result = $publisher->safe_psql('postgres', "SELECT count(*) FROM t2");
+is($result, qq(1), "check initial t1 table data on publisher");
+$result = $old_sub->safe_psql('postgres', "SELECT count(*) FROM t1");
+is($result, qq(1), "check initial t1 table data on the old subscriber");
+$result = $old_sub->safe_psql('postgres', "SELECT count(*) FROM t2");
+is($result, qq(0), "check initial t2 table data on the old subscriber");
+
+# ------------------------------------------------------
+# Check that pg_upgrade refuses to run if there's a subscription with tables in
+# a state different than 'r' (ready).
+# ------------------------------------------------------
+
+$old_sub->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+
+# Set tables to 'i' state
+$old_sub->safe_psql(
+	'postgres',
+	"UPDATE pg_subscription_rel
+		SET srsubstate = 'i' WHERE srsubstate = 'r'");
+
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',        '-d', $old_sub->data_dir,
+		'-D',         $new_sub->data_dir, '-b', $bindir,
+		'-B',         $bindir,            '-s', $new_sub->host,
+		'-p',         $old_sub->port,     '-P', $new_sub->port,
+		$mode,        '--check',
+	],
+	'run of pg_upgrade --check for old instance with incorrect sub rel');
+rmtree($new_sub->data_dir . "/pg_upgrade_output.d");
+
+# ------------------------------------------------------
+# Check that pg_upgrade doesn't detect any problem once all the subscription's
+# relation are in 'r' (ready) state.
+# ------------------------------------------------------
+
+$old_sub->safe_psql(
+	'postgres',
+	"UPDATE pg_subscription_rel
+		SET srsubstate = 'r' WHERE srsubstate = 'i'");
+
+# ------------------------------------------------------
+# The incremental changes added to the publisher are replicated after upgrade.
+# ------------------------------------------------------
+
+# Stop the old subscriber, insert a row in each table while it's down and add
+# t2 to the publication
+my $remote_lsn = $old_sub->safe_psql('postgres',
+	"SELECT remote_lsn FROM pg_replication_origin_status");
+$old_sub->stop;
+
+insert_line('while old_sub is down');
+
+# Run pg_upgrade
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',        '-d', $old_sub->data_dir,
+		'-D',         $new_sub->data_dir, '-b', $bindir,
+		'-B',         $bindir,            '-s', $new_sub->host,
+		'-p',         $old_sub->port,     '-P', $new_sub->port,
+		$mode,
+	],
+	'run of pg_upgrade for new sub');
+ok( !-d $new_sub->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+$publisher->safe_psql('postgres', "ALTER PUBLICATION pub ADD TABLE t2");
+
+$new_sub->start;
+
+# Subscription relations and replication origin remote_lsn should be preserved
+$result =
+  $new_sub->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(1), "There should be 1 row in pg_subscription_rel");
+
+$result = $new_sub->safe_psql('postgres',
+	"SELECT remote_lsn FROM pg_replication_origin_status");
+is($result, qq($remote_lsn), "remote_lsn should have been preserved");
+
+# There should be no new replicated rows before enabling the subscription
+$result = $new_sub->safe_psql('postgres', "SELECT count(*) FROM t1");
+is($result, qq(1),
+	"t1 table has no new replicated rows before enabling the subscription");
+$result = $new_sub->safe_psql('postgres', "SELECT count(*) FROM t2");
+is($result, qq(0),
+	"no change in t2 table which is not part of the publication");
+
+# Enable the subscription
+$new_sub->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+$publisher->wait_for_catchup('sub');
+
+# Rows on t1 should have been replicated, while nothing should happen for t2
+$result = $new_sub->safe_psql('postgres', "SELECT count(*) FROM t1");
+is($result, qq(2), "check replicated inserts on new subscriber");
+$result = $new_sub->safe_psql('postgres', "SELECT count(*) FROM t2");
+is($result, qq(0),
+	"no change in table t2 afer enable subscription which is not part of the publication"
+);
+
+# Refresh the subscription, only the missing row on t2 should be replicated
+$new_sub->safe_psql('postgres', "ALTER SUBSCRIPTION sub REFRESH PUBLICATION");
+$new_sub->wait_for_subscription_sync($publisher, 'sub');
+$result = $new_sub->safe_psql('postgres', "SELECT count(*) FROM t1");
+is($result, qq(2),
+	"check there is no change when there was no changes replicated");
+$result = $new_sub->safe_psql('postgres', "SELECT count(*) FROM t2");
+is($result, qq(2),
+	"check replicated inserts on new subscriber after refreshing");
+
+done_testing();
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index bc41e92677..380ff107d3 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11375,6 +11375,16 @@
   provolatile => 'v', proparallel => 'u', prorettype => 'bool',
   proargtypes => 'name',
   prosrc => 'binary_upgrade_logical_slot_has_caught_up' },
+{ oid => '8404', descr => 'for use by pg_upgrade (relation for pg_subscription_rel)',
+  proname => 'binary_upgrade_create_sub_rel_state', proisstrict => 'f',
+  provolatile => 'v', proparallel => 'u', prorettype => 'void',
+  proargtypes => 'text oid char pg_lsn',
+  prosrc => 'binary_upgrade_create_sub_rel_state' },
+{ oid => '8405', descr => 'for use by pg_upgrade (remote_lsn for origin)',
+  proname => 'binary_upgrade_replorigin_advance', proisstrict => 'f',
+  provolatile => 'v', proparallel => 'u', prorettype => 'void',
+  proargtypes => 'text pg_lsn',
+  prosrc => 'binary_upgrade_replorigin_advance' },
 
 # conversion functions
 { oid => '4302',
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 87c1aee379..90b321945c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2656,6 +2656,7 @@ SubLinkType
 SubOpts
 SubPlan
 SubPlanState
+SubRelInfo
 SubRemoveRels
 SubTransactionId
 SubXactCallback
-- 
2.34.1

Reply via email to