From a28a7d85778e6a1d66d25c27ba1e6d6489202bbe Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Sun, 24 Apr 2022 14:04:01 +0800
Subject: [PATCH v1] Fix data replicated twice when specifying
 PUBLISH_VIA_PARTITION_ROOT option.

If there are two publications that publish the parent table and the child table
separately, and both specify the option PUBLISH_VIA_PARTITION_ROOT, when
subscribing to both publications using one subscription, the data is replicated
twice in inital copy. What we expect is to be copied only once.

To fix this, we add a new function pg_get_publications_tables just like
pg_get_publication_tables but the input is the array of publications, which is
to exclude the partitioned table whose ancestor belongs to this publications
array when getting the table list.
---
 src/backend/catalog/pg_publication.c       | 147 +++++++++++++++++++++
 src/backend/commands/subscriptioncmds.c    |  17 ++-
 src/include/catalog/pg_proc.dat            |   5 +
 src/test/subscription/t/013_partition.pl   |  20 +--
 src/test/subscription/t/028_row_filter.pl  |   9 +-
 src/test/subscription/t/031_column_list.pl |   5 +-
 6 files changed, 178 insertions(+), 25 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 2631558ff1..2b90e237f5 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1154,3 +1154,150 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 
 	SRF_RETURN_DONE(funcctx);
 }
+
+/*
+ * Returns Oids of tables in publications array.
+ */
+Datum
+pg_get_publications_tables(PG_FUNCTION_ARGS)
+{
+	FuncCallContext *funcctx;
+	ArrayType  *arr = PG_GETARG_ARRAYTYPE_P(0);
+	Datum	   *elems;
+	bool	   *nulls;
+	int			nelems;
+	List	   *tables_viaroot = NIL,
+			   *tables = NIL,
+			   *current_table = NIL;
+	int			i;
+	Publication *publication;
+
+	deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
+					  &elems, &nulls, &nelems);
+
+	/* stuff done only on the first call of the function */
+	if (SRF_IS_FIRSTCALL())
+	{
+		MemoryContext oldcontext;
+
+		/* create a function context for cross-call persistence */
+		funcctx = SRF_FIRSTCALL_INIT();
+
+		/* switch to memory context appropriate for multiple function calls */
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		for (i = 0; i < nelems; i++)
+		{
+			publication = GetPublicationByName(TextDatumGetCString(elems[i]), true);
+
+			if (!publication)
+				continue;
+
+			if (publication->alltables)
+			{
+				current_table = GetAllTablesPublicationRelations(publication->pubviaroot);
+			}
+			else
+			{
+				List	   *relids,
+						   *schemarelids;
+
+				relids = GetPublicationRelations(publication->oid,
+												 publication->pubviaroot ?
+												 PUBLICATION_PART_ROOT :
+												 PUBLICATION_PART_LEAF);
+				schemarelids = GetAllSchemaPublicationRelations(publication->oid,
+																publication->pubviaroot ?
+																PUBLICATION_PART_ROOT :
+																PUBLICATION_PART_LEAF);
+				current_table = list_concat_unique_oid(relids, schemarelids);
+			}
+
+			/*
+			 * Store the published tables separately according to pubviaroot.
+			 * It is convenient for removing tables that do not need to be
+			 * published later.
+			 */
+			if (publication->pubviaroot)
+				tables_viaroot = list_concat_unique_oid(tables_viaroot, current_table);
+			else
+				tables = list_concat_unique_oid(tables, current_table);
+
+			/* Reset current_table */
+			list_free(current_table);
+			current_table = NIL;
+		}
+
+		/*
+		 * If a partition table is published in one publication with viaroot
+		 * and its parent or child table is published in another publication
+		 * without viaroot, then we need to filter this whole partition tree.
+		 * So, move the related partition table from tables to tables_viaroot.
+		 *
+		 * If option viaroot is the same for all publications, skip this part.
+		 */
+		if (tables_viaroot && tables)
+		{
+			ListCell   *lc;
+			ListCell   *lc2;
+			List	   *change_tables = NIL;
+
+			foreach(lc, tables_viaroot)
+			{
+				Oid			relid_viaroot = lfirst_oid(lc);
+				Oid			ancestor_viaroot;
+
+				/*
+				 * If the relid is not a partition table, the relid may be the
+				 * root partitioned table of the relation in the tables, so
+				 * check itself.
+				 */
+				if (get_rel_relispartition(relid_viaroot))
+					ancestor_viaroot = llast_oid(get_partition_ancestors(relid_viaroot));
+				else
+					ancestor_viaroot = relid_viaroot;
+
+				foreach(lc2, tables)
+				{
+					Oid			relid = lfirst_oid(lc2);
+					Oid			ancestor;
+
+					if (get_rel_relispartition(relid))
+						ancestor = llast_oid(get_partition_ancestors(relid));
+					else
+						ancestor = relid;
+
+					if (ancestor_viaroot == ancestor)
+					{
+						tables = foreach_delete_current(tables, lc2);
+						change_tables = list_append_unique_oid(change_tables,
+															   relid);
+					}
+				}
+			}
+
+			if (change_tables)
+				tables_viaroot = list_concat_unique_oid(tables_viaroot, change_tables);
+		}
+
+		if (tables_viaroot)
+			tables = list_concat_unique_oid(tables, filter_partitions(tables_viaroot));
+
+		funcctx->user_fctx = (void *) tables;
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+
+	/* stuff done on every call of the function */
+	funcctx = SRF_PERCALL_SETUP();
+	tables = (List *) funcctx->user_fctx;
+
+	if (funcctx->call_cntr < list_length(tables))
+	{
+		Oid			relid = list_nth_oid(tables, funcctx->call_cntr);
+
+		SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
+	}
+
+	SRF_RETURN_DONE(funcctx);
+}
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b94236f74d..3a9e883d4f 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1759,17 +1759,22 @@ static List *
 fetch_table_list(WalReceiverConn *wrconn, List *publications)
 {
 	WalRcvExecResult *res;
-	StringInfoData cmd;
+	StringInfoData cmd,
+				pub_names;
 	TupleTableSlot *slot;
 	Oid			tableRow[2] = {TEXTOID, TEXTOID};
 	List	   *tablelist = NIL;
 
+	initStringInfo(&pub_names);
+	get_publications_str(publications, &pub_names, true);
+
 	initStringInfo(&cmd);
-	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
-						   "  FROM pg_catalog.pg_publication_tables t\n"
-						   " WHERE t.pubname IN (");
-	get_publications_str(publications, &cmd, true);
-	appendStringInfoChar(&cmd, ')');
+	appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname"
+					 " FROM pg_class c JOIN pg_namespace n"
+					 "   ON n.oid = c.relnamespace,"
+					 " LATERAL pg_get_publications_tables(array[ %s ]) gst"
+					 " WHERE c.oid = gst.relid;",
+					 pub_names.data);
 
 	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
 	pfree(cmd.data);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 6d378ff785..7f9fd025d4 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11675,6 +11675,11 @@
   provolatile => 's', prorettype => 'oid', proargtypes => 'text',
   proallargtypes => '{text,oid}', proargmodes => '{i,o}',
   proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_tables' },
+{ oid => '6122', descr => 'get OIDs of tables in publication array',
+  proname => 'pg_get_publications_tables', prorows => '1000', proretset => 't',
+  provolatile => 's', prorettype => 'oid', proargtypes => 'anyarray',
+  proallargtypes => '{anyarray,oid}', proargmodes => '{i,o}',
+  proargnames => '{pubnames,relid}', prosrc => 'pg_get_publications_tables' },
 { oid => '6121',
   descr => 'returns whether a relation can be part of a publication',
   proname => 'pg_relation_is_publishable', provolatile => 's',
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 66e63e755e..6313d425dc 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -476,14 +476,14 @@ $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab3_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3_1', b text)"
 );
 
-# Note: We create two separate tables, not a partitioned one, so that we can
-# easily identity through which relation were the changes replicated.
+# Note: We only create one table for the partition table (tab4) here.
+# Because we specify option PUBLISH_VIA_PARTITION_ROOT (see pub_all and
+# pub_lower_level above), all data should be replicated to the partition table.
+# So we do not need to create table for the partitioned table.
 $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab4 (a int PRIMARY KEY)"
 );
-$node_subscriber2->safe_psql('postgres',
-	"CREATE TABLE tab4_1 (a int PRIMARY KEY)"
-);
+
 # Publication that sub2 points to now publishes via root, so must update
 # subscription target relations. We set the list of publications so that
 # the FOR ALL TABLES publication is second (the list order matters).
@@ -559,11 +559,6 @@ $result = $node_subscriber2->safe_psql('postgres',
 	"SELECT a FROM tab4 ORDER BY 1");
 is( $result, qq(0), 'inserts into tab4 replicated');
 
-$result = $node_subscriber2->safe_psql('postgres',
-	"SELECT a FROM tab4_1 ORDER BY 1");
-is( $result, qq(), 'inserts into tab4_1 replicated');
-
-
 # now switch the order of publications in the list, try again, the result
 # should be the same (no dependence on order of pulications)
 $node_subscriber2->safe_psql('postgres',
@@ -588,11 +583,6 @@ $result = $node_subscriber2->safe_psql('postgres',
 is( $result, qq(0
 1), 'inserts into tab4 replicated');
 
-$result = $node_subscriber2->safe_psql('postgres',
-	"SELECT a FROM tab4_1 ORDER BY 1");
-is( $result, qq(), 'inserts into tab4_1 replicated');
-
-
 # update (replicated as update)
 $node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 5");
 $node_publisher->safe_psql('postgres', "UPDATE tab2 SET a = 6 WHERE a = 5");
diff --git a/src/test/subscription/t/028_row_filter.pl b/src/test/subscription/t/028_row_filter.pl
index 82c4eb6ef6..21800a03b6 100644
--- a/src/test/subscription/t/028_row_filter.pl
+++ b/src/test/subscription/t/028_row_filter.pl
@@ -394,6 +394,10 @@ $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_rowfilter_child(a, b) VALUES(0,'0'),(30,'30'),(40,'40')"
 );
 
+# insert data into partitioned table.
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_viaroot_part(a) VALUES(13), (17)");
+
 $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1"
 );
@@ -719,8 +723,9 @@ is($result, qq(t|1), 'check replicated rows to tab_rowfilter_toast');
 # - INSERT (16)        YES, 16 > 15
 $result =
   $node_subscriber->safe_psql('postgres',
-	"SELECT a FROM tab_rowfilter_viaroot_part");
-is( $result, qq(16),
+	"SELECT a FROM tab_rowfilter_viaroot_part ORDER BY 1");
+is( $result, qq(16
+17),
 	'check replicated rows to tab_rowfilter_viaroot_part'
 );
 
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index bdcf3e4a24..7c73913f4d 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -948,7 +948,8 @@ $node_publisher->safe_psql('postgres', qq(
 	CREATE TABLE test_root_1 PARTITION OF test_root FOR VALUES FROM (1) TO (10);
 	CREATE TABLE test_root_2 PARTITION OF test_root FOR VALUES FROM (10) TO (20);
 
-	CREATE PUBLICATION pub_root_true FOR TABLE test_root (a) WITH (publish_via_partition_root = true);
+	CREATE PUBLICATION pub_root_true_1 FOR TABLE test_root (a) WITH (publish_via_partition_root = true);
+	CREATE PUBLICATION pub_root_true_2 FOR TABLE test_root_1 (a, b) WITH (publish_via_partition_root = true);
 
 	-- initial data
 	INSERT INTO test_root VALUES (1, 2, 3);
@@ -956,7 +957,7 @@ $node_publisher->safe_psql('postgres', qq(
 ));
 
 $node_subscriber->safe_psql('postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true_1, pub_root_true_2;
 ));
 
 wait_for_subscription_sync($node_subscriber);
-- 
2.23.0.windows.1

