From 37a504cd37a500c38e63a0a118c2e5407148ed37 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Wed, 13 Jan 2021 16:35:00 +0530
Subject: [PATCH v1 1/2] Fix ALTER PUBLICATION...DROP TABLE behaviour

Currently, in logical replication, publisher/walsender publishes
the tables even though they aren't part of the publication i.e
they are dropped from the publication. Because of this ALTER
PUBLICATION...DROP TABLE doesn't work as expected.

This patch adds a new function is_relation_part_of_publication,
which looks up in the pg_publication_rel system catalogue, to check
whether the given table is part of the publication. If the table
is not part of the publication, the publisher/walsender can skip
sending changes related to it.
---
 src/backend/replication/pgoutput/pgoutput.c | 33 +++++++++++++++++++++
 1 file changed, 33 insertions(+)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 2f01137b42..d4ff953dc0 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -66,6 +66,7 @@ static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
 									LogicalDecodingContext *ctx);
+static bool is_relation_part_of_publication(Oid relid, Oid puboid);
 
 /*
  * Entry in the map used to remember which relation schemas we sent.
@@ -1055,6 +1056,18 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 			}
+			else if (!is_relation_part_of_publication(relid, pub->oid))
+			{
+				/*
+				 * Relation is not associated with the publication anymore i.e
+				 * it would have been dropped from the publication. So no need
+				 * to publish the changes for it.
+				 */
+				entry->pubactions.pubinsert = false;
+				entry->pubactions.pubupdate = false;
+				entry->pubactions.pubdelete = false;
+				entry->pubactions.pubtruncate = false;
+			}
 
 			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
 				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
@@ -1181,3 +1194,23 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
 		entry->replicate_valid = false;
 }
+
+/*
+ * Check whether the relation is associated with the given publication. If yes,
+ * return true, otherwise false.
+ */
+static bool
+is_relation_part_of_publication(Oid relid, Oid puboid)
+{
+	HeapTuple	tup;
+
+	tup = SearchSysCache2(PUBLICATIONRELMAP,
+						  ObjectIdGetDatum(relid), ObjectIdGetDatum(puboid));
+
+	if (!HeapTupleIsValid(tup))
+		return false;
+
+	ReleaseSysCache(tup);
+
+	return true;
+}
-- 
2.25.1

