From 1b838710fbbe16d412c5eff8a38d1aeb5590dd23 Mon Sep 17 00:00:00 2001
From: Vignesh <vignesh21@gmail.com>
Date: Mon, 3 Mar 2025 11:54:27 +0530
Subject: [PATCH v7] Fix logical replication breakage after ALTER SUBSCRIPTION
 ... SET PUBLICATION

Altering a subscription with `ALTER SUBSCRIPTION ... SET PUBLICATION`
could cause logical replication to break under certain conditions. When
the apply worker restarts after executing SET PUBLICATION, it continues
using the existing replication slot and origin. If the replication origin
was not updated before the restart, the WAL start location could point to
a position prior to the existence of the specified publication, leading to
persistent start of apply worker and reporting errors.

This patch skips loading the publication if the publication does not exist
and loads the publication and updates the relation entry when the publication
gets created.

Discussion: https://www.postgresql.org/message-id/flat/CALDaNm0-n8FGAorM%2BbTxkzn%2BAOUyx5%3DL_XmnvOP6T24%2B-NcBKg%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/CAA4eK1+T-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q@mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 26 ++++++++++++++++--
 src/test/subscription/t/024_add_drop_pub.pl | 29 ++++++++++++++++++++-
 2 files changed, 52 insertions(+), 3 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9063af6e1df..991aa6f7282 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1762,6 +1762,8 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 
 /*
  * Load publications from the list of publication names.
+ *
+ * Here, we just skip the publications that don't exist yet.
  */
 static List *
 LoadPublications(List *pubnames)
@@ -1772,9 +1774,29 @@ LoadPublications(List *pubnames)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, false);
+		Publication *pub = GetPublicationByName(pubname, true);
 
-		result = lappend(result, pub);
+		if (pub)
+			result = lappend(result, pub);
+		else
+		{
+			/*
+			 * When executing 'ALTER SUBSCRIPTION ... SET PUBLICATION', the
+			 * apply worker continues using the existing replication slot and
+			 * origin after restarting. If the replication origin is not
+			 * updated before the restart, the WAL start location may point to
+			 * a position before the specified publication exists, causing
+			 * persistent apply worker restarts and errors.
+			 *
+			 * This ensures that the publication is skipped if it does not
+			 * exist and is loaded when the corresponding WAL record is
+			 * encountered.
+			 */
+			ereport(WARNING,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("skipped loading publication: %s", pubname),
+					errhint("If the publication already exists, ignore it as it will be loaded upon reaching the corresponding WAL record; otherwise, create it."));
+		}
 	}
 
 	return result;
diff --git a/src/test/subscription/t/024_add_drop_pub.pl b/src/test/subscription/t/024_add_drop_pub.pl
index 4428a3413db..dfdc9835e0c 100644
--- a/src/test/subscription/t/024_add_drop_pub.pl
+++ b/src/test/subscription/t/024_add_drop_pub.pl
@@ -1,7 +1,9 @@
 
 # Copyright (c) 2021-2025, PostgreSQL Global Development Group
 
-# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION
+# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION and
+# ensures that dropping a publication associated with a subscription does not
+# disrupt existing logical replication.
 use strict;
 use warnings FATAL => 'all';
 use PostgreSQL::Test::Cluster;
@@ -80,6 +82,31 @@ $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_1");
 is($result, qq(20|1|10), 'check initial data is copied to subscriber');
 
+# Ensure that dropping a publication associated with a subscription does not
+# disrupt existing logical replication. Instead, it should log a warning
+# while allowing replication to continue. Additionally, verify that replication
+# of the dropped publication resumes once the publication is recreated.
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_1;");
+
+my $offset = -s $node_publisher->logfile;
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_1 values(50)");
+
+# Verify that a warning is logged.
+$node_publisher->wait_for_log(
+	qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_1/, $offset);
+
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_1 FOR TABLE tab_1");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_1 values(11)");
+
+# Verify that the insert operation gets replicated to subscriber after
+# re-createion.
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab_1");
+is($result, qq(21|1|11), 'check that the incremental data is replicated after the dropped publication is re-created');
+
 # shutdown
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
-- 
2.43.0

