From 78b8e3e959d83e898ad5befaf55fe46c6fdd6f20 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <HouZhijie@foxmail.com>
Date: Wed, 4 Aug 2021 15:33:15 +0800
Subject: [PATCH] fix-ALTER-SUB-ADD-DROP-PUBLICATION

---
 src/backend/commands/subscriptioncmds.c    | 76 +++++++++++++++-------
 src/test/subscription/t/001_rep_changes.pl | 21 +++++-
 2 files changed, 71 insertions(+), 26 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 5157f44058..68a128ced0 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -606,7 +606,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 }
 
 static void
-AlterSubscription_refresh(Subscription *sub, bool copy_data)
+AlterSubscription_refresh(Subscription *sub, bool copy_data,
+						  AlterSubscriptionType type)
 {
 	char	   *err;
 	List	   *pubrel_names;
@@ -617,6 +618,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 	int			off;
 	int			remove_rel_len;
 	Relation	rel = NULL;
+	int			ntables_to_drop = 0;
 	typedef struct SubRemoveRels
 	{
 		Oid			relid;
@@ -625,6 +627,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 	SubRemoveRels *sub_remove_rels;
 	WalReceiverConn *wrconn;
 
+	Assert(type == ALTER_SUBSCRIPTION_SET_PUBLICATION ||
+		   type == ALTER_SUBSCRIPTION_ADD_PUBLICATION ||
+		   type == ALTER_SUBSCRIPTION_DROP_PUBLICATION ||
+		   type == ALTER_SUBSCRIPTION_REFRESH);
+
 	/* Load the library providing us libpq calls. */
 	load_file("libpqwalreceiver", false);
 
@@ -656,8 +663,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 
 			subrel_local_oids[off++] = relstate->relid;
 		}
-		qsort(subrel_local_oids, list_length(subrel_states),
-			  sizeof(Oid), oid_cmp);
+
+		if (type != ALTER_SUBSCRIPTION_DROP_PUBLICATION)
+			qsort(subrel_local_oids, list_length(subrel_states),
+				sizeof(Oid), oid_cmp);
 
 		/*
 		 * Rels that we want to remove from subscription and drop any slots
@@ -681,22 +690,25 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 			Oid			relid;
 
 			relid = RangeVarGetRelid(rv, AccessShareLock, false);
-
-			/* Check for supported relkind. */
-			CheckSubscriptionRelkind(get_rel_relkind(relid),
-									 rv->schemaname, rv->relname);
-
 			pubrel_local_oids[off++] = relid;
 
-			if (!bsearch(&relid, subrel_local_oids,
-						 list_length(subrel_states), sizeof(Oid), oid_cmp))
+			if (type != ALTER_SUBSCRIPTION_DROP_PUBLICATION)
 			{
-				AddSubscriptionRelState(sub->oid, relid,
-										copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
-										InvalidXLogRecPtr);
-				ereport(DEBUG1,
-						(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
-										 rv->schemaname, rv->relname, sub->name)));
+				/* Check for supported relkind. */
+				CheckSubscriptionRelkind(get_rel_relkind(relid),
+										rv->schemaname, rv->relname);
+
+				if (!bsearch(&relid, subrel_local_oids,
+							 list_length(subrel_states), sizeof(Oid), oid_cmp))
+				{
+					AddSubscriptionRelState(sub->oid, relid,
+											copy_data ? SUBREL_STATE_INIT :
+														SUBREL_STATE_READY,
+											InvalidXLogRecPtr);
+					ereport(DEBUG1,
+							(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
+											rv->schemaname, rv->relname, sub->name)));
+				}
 			}
 		}
 
@@ -704,16 +716,30 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		 * Next remove state for tables we should not care about anymore using
 		 * the data we collected above
 		 */
-		qsort(pubrel_local_oids, list_length(pubrel_names),
-			  sizeof(Oid), oid_cmp);
+		if (type != ALTER_SUBSCRIPTION_ADD_PUBLICATION)
+		{
+			qsort(pubrel_local_oids, list_length(pubrel_names),
+				  sizeof(Oid), oid_cmp);
+			ntables_to_drop = list_length(subrel_states);
+		}
 
 		remove_rel_len = 0;
-		for (off = 0; off < list_length(subrel_states); off++)
+		for (off = 0; off < ntables_to_drop; off++)
 		{
 			Oid			relid = subrel_local_oids[off];
-
-			if (!bsearch(&relid, pubrel_local_oids,
-						 list_length(pubrel_names), sizeof(Oid), oid_cmp))
+			bool		drop_table = false;
+
+			if (type == ALTER_SUBSCRIPTION_SET_PUBLICATION ||
+				type == ALTER_SUBSCRIPTION_REFRESH)
+				drop_table = !bsearch(&relid, pubrel_local_oids,
+									  list_length(pubrel_names),
+									  sizeof(Oid), oid_cmp);
+			else if (type == ALTER_SUBSCRIPTION_DROP_PUBLICATION)
+				drop_table = bsearch(&relid, pubrel_local_oids,
+									 list_length(pubrel_names),
+									 sizeof(Oid), oid_cmp);
+
+			if (drop_table)
 			{
 				char		state;
 				XLogRecPtr	statelsn;
@@ -994,7 +1020,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					/* Make sure refresh sees the new list of publications. */
 					sub->publications = stmt->publication;
 
-					AlterSubscription_refresh(sub, opts.copy_data);
+					AlterSubscription_refresh(sub, opts.copy_data, stmt->kind);
 				}
 
 				break;
@@ -1045,7 +1071,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					/* Only refresh the added/dropped list of publications. */
 					sub->publications = stmt->publication;
 
-					AlterSubscription_refresh(sub, opts.copy_data);
+					AlterSubscription_refresh(sub, opts.copy_data, stmt->kind);
 				}
 
 				break;
@@ -1087,7 +1113,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
 
-				AlterSubscription_refresh(sub, opts.copy_data);
+				AlterSubscription_refresh(sub, opts.copy_data, stmt->kind);
 
 				break;
 			}
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 0c84d87873..f04df8b8d9 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 32;
+use Test::More tests => 34;
 
 # Initialize publisher node
 my $node_publisher = PostgresNode->new('publisher');
@@ -262,6 +262,25 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM temp1");
 is($result, qq(1), 'check rows on subscriber with multiple publications');
 
+# Test changing the list of subscribed publications
+# Removes publications from the list of publications
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub_temp1 DROP PUBLICATION tap_pub_temp1");
+
+# pg_subscription_rel should only have one row
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription_rel t1, pg_subscription t2 WHERE t1.srsubid = t2.oid AND t2.subname = 'tap_sub_temp1'");
+is($result, qq(1),
+	'check one relation was removed from subscribed list');
+
+# Add additional publications
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub_temp1 ADD PUBLICATION tap_pub_temp1");
+
+# pg_subscription_rel should only have two rows
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription_rel t1, pg_subscription t2 WHERE t1.srsubid = t2.oid AND t2.subname = 'tap_sub_temp1'");
+is($result, qq(2),
+	'check one more relation was subscribed');
+
 # Drop subscription as we don't need it anymore
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_temp1");
 
-- 
2.27.0

