From 4eafa47f2751076c6caaa4f16011d1953a869da4 Mon Sep 17 00:00:00 2001
From: amitlan <amitlangote09@gmail.com>
Date: Thu, 2 Dec 2021 12:12:51 +0900
Subject: [PATCH] wip: don't add partition to publication if parent present

---
 src/backend/catalog/pg_publication.c      | 77 +++++++++++++++---
 src/backend/commands/publicationcmds.c    | 96 ++++++++++++++++++-----
 src/backend/commands/tablecmds.c          | 48 ++++++++++++
 src/backend/nodes/list.c                  | 26 ++++++
 src/include/commands/publicationcmds.h    |  1 +
 src/include/nodes/pg_list.h               |  3 +-
 src/test/regress/expected/publication.out | 43 +++++++++-
 src/test/regress/sql/publication.sql      | 25 +++++-
 8 files changed, 286 insertions(+), 33 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 63579b2f82..e98f575dcc 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -253,6 +253,42 @@ GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
 	return result;
 }
 
+/*
+ * Checks if the relation being added or its ancestor is already present in the
+ * publication.  If so, error out if asked to do so.
+ */
+static bool
+publication_relation_exists(Publication *pub,
+							Oid relid, Oid ancestor_oid,
+							bool error_if_exists)
+{
+	if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
+							  ObjectIdGetDatum(pub->oid)))
+	{
+		if (error_if_exists)
+			ereport(ERROR,
+					(errcode(ERRCODE_DUPLICATE_OBJECT),
+					 errmsg("relation \"%s\" is already member of publication \"%s\"",
+							get_rel_name(relid), pub->name)));
+		return true;
+	}
+	else if (OidIsValid(ancestor_oid) &&
+			 SearchSysCacheExists2(PUBLICATIONRELMAP,
+								   ObjectIdGetDatum(ancestor_oid),
+								   ObjectIdGetDatum(pub->oid)))
+	{
+		if (error_if_exists)
+			ereport(ERROR,
+					(errcode(ERRCODE_DUPLICATE_OBJECT),
+					 errmsg("ancestor \"%s\" of relation \"%s\" is already member of publication \"%s\"",
+							get_rel_name(ancestor_oid), get_rel_name(relid),
+							pub->name)));
+		return true;
+	}
+
+	return false;
+}
+
 /*
  * Insert new publication / relation mapping.
  */
@@ -271,25 +307,42 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 				referenced;
 	List	   *relids = NIL;
 
+	/* If table already present in the publication, don't add it again. */
+	if (publication_relation_exists(pub, relid, InvalidOid, !if_not_exists))
+		return InvalidObjectAddress;
+
 	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
 
 	/*
-	 * Check for duplicates. Note that this does not really prevent
-	 * duplicates, it's here just to provide nicer error message in common
-	 * case. The real protection is the unique key on the catalog.
+	 * If a partition's ancestor is already present in the publication, don't
+	 * add it.  Partition will be implicitly considered to be a part of the
+	 * publication via the ancestor.
 	 */
-	if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
-							  ObjectIdGetDatum(pubid)))
+	if (targetrel->relation->rd_rel->relispartition)
 	{
-		table_close(rel, RowExclusiveLock);
+		List   *ancestors = get_partition_ancestors(relid);
+		ListCell *lc;
+		bool	skip_add = false;
 
-		if (if_not_exists)
-			return InvalidObjectAddress;
+		foreach(lc, ancestors)
+		{
+			Oid		ancestor_oid = lfirst_oid(lc);
 
-		ereport(ERROR,
-				(errcode(ERRCODE_DUPLICATE_OBJECT),
-				 errmsg("relation \"%s\" is already member of publication \"%s\"",
-						RelationGetRelationName(targetrel->relation), pub->name)));
+			if (publication_relation_exists(pub, relid, ancestor_oid,
+											!if_not_exists))
+			{
+				skip_add = true;
+				break;
+			}
+		}
+
+		list_free(ancestors);
+
+		if (skip_add)
+		{
+			table_close(rel, RowExclusiveLock);
+			return InvalidObjectAddress;
+		}
 	}
 
 	check_publication_add_relation(targetrel->relation);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 7d4a0e95f6..d1fc307c95 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -899,6 +899,7 @@ OpenTableList(List *tables)
 	List	   *relids = NIL;
 	List	   *rels = NIL;
 	ListCell   *lc;
+	bool		contains_partition = false;
 
 	/*
 	 * Open, share-lock, and check all the explicitly-specified relations
@@ -917,6 +918,9 @@ OpenTableList(List *tables)
 		rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
 		myrelid = RelationGetRelid(rel);
 
+		if (rel->rd_rel->relispartition)
+			contains_partition = true;
+
 		/*
 		 * Filter out duplicates if user specifies "foo, foo".
 		 *
@@ -973,6 +977,52 @@ OpenTableList(List *tables)
 		}
 	}
 
+	/*
+	 * De-duplicate partitions.
+	 *
+	 * This is to handle the case where a user inadvertently specifies
+	 * "foo, partition_of_foo" or "partition_of_foo, foo".  We'd only need to
+	 * add the parent table "foo" in either of those cases.
+	 */
+	if (list_length(relids) > 1 && contains_partition)
+	{
+		List   *new_rels = NIL;
+
+		foreach(lc, rels)
+		{
+			PublicationRelInfo *pubrel = lfirst(lc);
+			Relation	rel = pubrel->relation;
+			bool		skip_rel = false;
+
+			if (rel->rd_rel->relispartition)
+			{
+				List	 *ancestors = get_partition_ancestors(RelationGetRelid(rel));
+				ListCell *l;
+
+				foreach(l, ancestors)
+				{
+					Oid		ancestor_oid = lfirst_oid(l);
+
+					if (list_member_oid(relids, ancestor_oid))
+					{
+						table_close(rel, ShareUpdateExclusiveLock);
+						skip_rel = true;
+					}
+				}
+
+				list_free(ancestors);
+			}
+
+			if (!skip_rel)
+				new_rels = lappend(new_rels, pubrel);
+			else
+				pfree(pubrel);
+		}
+
+		list_free(rels);
+		rels = new_rels;
+	}
+
 	list_free(relids);
 
 	return rels;
@@ -1058,15 +1108,40 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 	}
 }
 
+/*
+ * Removes the relation's membership in given publication by dropping the
+ * corresponding pg_publication_rel entry.
+ */
+void
+RemovePublicationRel(Oid pubid, Oid relid, bool missing_ok)
+{
+	ObjectAddress obj;
+	Oid		pubrelid = GetSysCacheOid2(PUBLICATIONRELMAP,
+									   Anum_pg_publication_rel_oid,
+									   ObjectIdGetDatum(relid),
+									   ObjectIdGetDatum(pubid));
+	if (!OidIsValid(pubrelid))
+	{
+		if (missing_ok)
+			return;
+
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("relation \"%s\" is not part of the publication",
+						get_rel_name(relid))));
+	}
+
+	ObjectAddressSet(obj, PublicationRelRelationId, pubrelid);
+	performDeletion(&obj, DROP_CASCADE, 0);
+}
+
 /*
  * Remove listed tables from the publication.
  */
 static void
 PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 {
-	ObjectAddress obj;
 	ListCell   *lc;
-	Oid			prid;
 
 	foreach(lc, rels)
 	{
@@ -1074,22 +1149,7 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 		Relation	rel = pubrel->relation;
 		Oid			relid = RelationGetRelid(rel);
 
-		prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
-							   ObjectIdGetDatum(relid),
-							   ObjectIdGetDatum(pubid));
-		if (!OidIsValid(prid))
-		{
-			if (missing_ok)
-				continue;
-
-			ereport(ERROR,
-					(errcode(ERRCODE_UNDEFINED_OBJECT),
-					 errmsg("relation \"%s\" is not part of the publication",
-							RelationGetRelationName(rel))));
-		}
-
-		ObjectAddressSet(obj, PublicationRelRelationId, prid);
-		performDeletion(&obj, DROP_CASCADE, 0);
+		RemovePublicationRel(pubid, relid, missing_ok);
 	}
 }
 
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index c35f09998c..7117ba7539 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -52,6 +52,7 @@
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/policy.h"
+#include "commands/publicationcmds.h"
 #include "commands/sequence.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
@@ -586,6 +587,8 @@ static void QueuePartitionConstraintValidation(List **wqueue, Relation scanrel,
 											   List *partConstraint,
 											   bool validate_default);
 static void CloneRowTriggersToPartition(Relation parent, Relation partition);
+static void AttachPartitionRemoveDuplicatePublications(Relation parentrel,
+										   Relation attachrel);
 static void DetachAddConstraintIfNeeded(List **wqueue, Relation partRel);
 static void DropClonedTriggersFromPartition(Oid partitionId);
 static ObjectAddress ATExecDetachPartition(List **wqueue, AlteredTableInfo *tab,
@@ -17471,6 +17474,12 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd,
 	 */
 	CloneForeignKeyConstraints(wqueue, rel, attachrel);
 
+	/*
+	 * Check if the partition is in publications that the parent or some
+	 * other higher-level ancestor is also present in.
+	 */
+	AttachPartitionRemoveDuplicatePublications(rel, attachrel);
+
 	/*
 	 * Generate partition constraint from the partition bound specification.
 	 * If the parent itself is a partition, make sure to include its
@@ -17893,6 +17902,45 @@ CloneRowTriggersToPartition(Relation parent, Relation partition)
 	table_close(pg_trigger, RowExclusiveLock);
 }
 
+/*
+ * Removes a table being attached to the parent relation from any publications
+ * in which the parent relation or its ancestor is already present.
+ *
+ * This it to maintain the invariant that a partition is not present
+ * duplicatively in the publications where one of its partition ancestors is
+ * already present.  The partition will be implicitly present in the
+ * publication after the successful ATTACH, because the parent or its ancestor
+ * is present.
+ */
+static void
+AttachPartitionRemoveDuplicatePublications(Relation parentrel,
+										   Relation attachrel)
+{
+	List   *attachrel_pubs;
+	ListCell *lc;
+	Oid		attachrelid = RelationGetRelid(attachrel);
+	Oid		parentid = RelationGetRelid(parentrel);
+	List   *ancestors = list_make1_oid(parentid);
+
+	if (parentrel->rd_rel->relispartition)
+		ancestors = list_concat(ancestors, get_partition_ancestors(parentid));
+
+	attachrel_pubs = GetRelationPublications(RelationGetRelid(attachrel));
+	foreach(lc, attachrel_pubs)
+	{
+		Oid		pubid = lfirst_oid(lc);
+		List   *pub_rels = GetPublicationRelations(pubid,
+												   PUBLICATION_PART_ROOT);
+
+		if (list_intersection_oid(pub_rels, ancestors) != NIL)
+			RemovePublicationRel(pubid, attachrelid, false);
+		list_free(pub_rels);
+	}
+
+	list_free(ancestors);
+	list_free(attachrel_pubs);
+}
+
 /*
  * ALTER TABLE DETACH PARTITION
  *
diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c
index 410ffe0835..27d4be2758 100644
--- a/src/backend/nodes/list.c
+++ b/src/backend/nodes/list.c
@@ -1178,6 +1178,32 @@ list_intersection_int(const List *list1, const List *list2)
 	return result;
 }
 
+/*
+ * As list_intersection but operates on lists of oids.
+ */
+List *
+list_intersection_oid(const List *list1, const List *list2)
+{
+	List	   *result;
+	const ListCell *cell;
+
+	if (list1 == NIL || list2 == NIL)
+		return NIL;
+
+	Assert(IsOidList(list1));
+	Assert(IsOidList(list2));
+
+	result = NIL;
+	foreach(cell, list1)
+	{
+		if (list_member_oid(list2, lfirst_oid(cell)))
+			result = lappend_oid(result, lfirst_oid(cell));
+	}
+
+	check_list_invariants(result);
+	return result;
+}
+
 /*
  * Return a list that contains all the cells in list1 that are not in
  * list2. The returned list is freshly allocated via palloc(), but the
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index 4ba68c70ee..397675ba7b 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -25,6 +25,7 @@
 extern ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt);
 extern void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt);
 extern void RemovePublicationById(Oid pubid);
+extern void RemovePublicationRel(Oid pubid, Oid relid, bool missing_ok);
 extern void RemovePublicationRelById(Oid proid);
 extern void RemovePublicationSchemaById(Oid psoid);
 
diff --git a/src/include/nodes/pg_list.h b/src/include/nodes/pg_list.h
index c3f47db888..69d78793a2 100644
--- a/src/include/nodes/pg_list.h
+++ b/src/include/nodes/pg_list.h
@@ -575,8 +575,9 @@ extern List *list_union_oid(const List *list1, const List *list2);
 
 extern List *list_intersection(const List *list1, const List *list2);
 extern List *list_intersection_int(const List *list1, const List *list2);
+extern List *list_intersection_oid(const List *list1, const List *list2);
 
-/* currently, there's no need for list_intersection_ptr etc */
+/* currently, there's no need for list_intersection_ptr */
 
 extern List *list_difference(const List *list1, const List *list2);
 extern List *list_difference_ptr(const List *list1, const List *list2);
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 1feb558968..f5a15dfcc1 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -237,8 +237,49 @@ HINT:  To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
 ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
 -- works again, because update is no longer replicated
 UPDATE testpub_parted2 SET a = 2;
-DROP TABLE testpub_parted1, testpub_parted2;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
+-- test behavior where a partition is added to publication where the parent
+-- is already present
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_forparted FOR TABLE testpub_parted1, testpub_parted, testpub_parted2;
+RESET client_min_messages;
+-- must show only testpub_parted
+\dRp+ testpub_forparted
+                               Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
+Tables:
+    "public.testpub_parted"
+
+ALTER TABLE testpub_parted DETACH PARTITION testpub_parted2;
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted2;
+-- must show only testpub_parted, testpub_parted2
+\dRp+ testpub_forparted
+                               Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
+Tables:
+    "public.testpub_parted"
+    "public.testpub_parted2"
+
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted2 FOR VALUES IN (2);
+-- must show only testpub_parted
+\dRp+ testpub_forparted
+                               Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
+Tables:
+    "public.testpub_parted"
+
+-- errors because parent already in publication
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted2;
+ERROR:  ancestor "testpub_parted" of relation "testpub_parted2" is already member of publication "testpub_forparted"
+DROP PUBLICATION testpub_forparted;
+DROP TABLE testpub_parted1, testpub_parted2;
 -- Test cache invalidation FOR ALL TABLES publication
 SET client_min_messages = 'ERROR';
 CREATE TABLE testpub_tbl4(a int);
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 8fa0435c32..ca5708335f 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -131,9 +131,32 @@ UPDATE testpub_parted2 SET a = 2;
 ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
 -- works again, because update is no longer replicated
 UPDATE testpub_parted2 SET a = 2;
-DROP TABLE testpub_parted1, testpub_parted2;
+
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
 
+-- test behavior where a partition is added to publication where the parent
+-- is already present
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_forparted FOR TABLE testpub_parted1, testpub_parted, testpub_parted2;
+RESET client_min_messages;
+-- must show only testpub_parted
+\dRp+ testpub_forparted
+ALTER TABLE testpub_parted DETACH PARTITION testpub_parted2;
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted2;
+-- must show only testpub_parted, testpub_parted2
+\dRp+ testpub_forparted
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted2 FOR VALUES IN (2);
+-- must show only testpub_parted
+\dRp+ testpub_forparted
+-- errors because parent already in publication
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted2;
+
+DROP PUBLICATION testpub_forparted;
+DROP TABLE testpub_parted1, testpub_parted2;
+
+
+
 -- Test cache invalidation FOR ALL TABLES publication
 SET client_min_messages = 'ERROR';
 CREATE TABLE testpub_tbl4(a int);
-- 
2.24.1

