From 6293d73363082ad3517074d8bf4c2459f649e9fb Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Tue, 14 Jun 2022 13:44:09 +0800
Subject: [PATCH v6] Fix data replicated twice when specifying
 PUBLISH_VIA_PARTITION_ROOT option.

If there are two publications that publish the parent table and the child table
separately, and both specify the option PUBLISH_VIA_PARTITION_ROOT, subscribing
to both publications from one subscription causes initial copy twice. What we
expect is to be copied only once.

To fix this, we extend the API of the function pg_get_publication_tables.
Now, the function pg_get_publication_tables could receive the publication list.
And then, if we specify option viaroot, we could exclude the partitioned table
whose ancestor belongs to the publication list when getting the table
informations.
---
 src/backend/catalog/pg_publication.c       | 145 +++++++++++++++------
 src/backend/commands/subscriptioncmds.c    |  65 +++++++--
 src/include/catalog/pg_proc.dat            |  12 +-
 src/test/regress/expected/rules.out        |   2 +-
 src/test/subscription/t/013_partition.pl   |  19 +--
 src/test/subscription/t/028_row_filter.pl  |  11 +-
 src/test/subscription/t/031_column_list.pl |   5 +-
 7 files changed, 186 insertions(+), 73 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index c365de3a72..f095bf40a8 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1077,22 +1077,36 @@ get_publication_name(Oid pubid, bool missing_ok)
 }
 
 /*
- * Returns information of tables in a publication.
+ * Returns information of tables in one or more publications.
  */
 Datum
 pg_get_publication_tables(PG_FUNCTION_ARGS)
 {
 #define NUM_PUBLICATION_TABLES_ELEM	3
 	FuncCallContext *funcctx;
-	char	   *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
-	Publication *publication;
-	List	   *tables;
+	List	   *tables = NIL,
+			   *results = NIL;
 
 	/* stuff done only on the first call of the function */
 	if (SRF_IS_FIRSTCALL())
 	{
+		typedef struct
+		{
+			Oid			pubid;
+			List	   *table_list;
+		}			pub_info;
+
 		TupleDesc	tupdesc;
 		MemoryContext oldcontext;
+		ArrayType  *arr;
+		Datum	   *elems;
+		int			nelems,
+					i;
+		Publication *publication;
+		bool		viaroot = false;
+		List	   *pub_infos = NIL;
+		ListCell   *lc1,
+				   *lc2;
 
 		/* create a function context for cross-call persistence */
 		funcctx = SRF_FIRSTCALL_INIT();
@@ -1100,43 +1114,93 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 		/* switch to memory context appropriate for multiple function calls */
 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
 
-		publication = GetPublicationByName(pubname, false);
+		arr = PG_GETARG_ARRAYTYPE_P(0);
+		deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
+						  &elems, NULL, &nelems);
 
-		/*
-		 * Publications support partitioned tables, although all changes are
-		 * replicated using leaf partition identity and schema, so we only
-		 * need those.
-		 */
-		if (publication->alltables)
+		/* get Oids of tables from each publication */
+		for (i = 0; i < nelems; i++)
 		{
-			tables = GetAllTablesPublicationRelations(publication->pubviaroot);
-		}
-		else
-		{
-			List	   *relids,
-					   *schemarelids;
-
-			relids = GetPublicationRelations(publication->oid,
-											 publication->pubviaroot ?
-											 PUBLICATION_PART_ROOT :
-											 PUBLICATION_PART_LEAF);
-			schemarelids = GetAllSchemaPublicationRelations(publication->oid,
-															publication->pubviaroot ?
-															PUBLICATION_PART_ROOT :
-															PUBLICATION_PART_LEAF);
-			tables = list_concat_unique_oid(relids, schemarelids);
+			List	   *current_tables = NIL;
+			pub_info   *pubinfo = (pub_info *) malloc(sizeof(pub_info));
+
+			publication = GetPublicationByName(TextDatumGetCString(elems[i]), false);
 
 			/*
-			 * If the publication publishes partition changes via their
-			 * respective root partitioned tables, we must exclude partitions
-			 * in favor of including the root partitioned tables. Otherwise,
-			 * the function could return both the child and parent tables
-			 * which could cause data of the child table to be
-			 * double-published on the subscriber side.
+			 * Publications support partitioned tables, although all changes
+			 * are replicated using leaf partition identity and schema, so we
+			 * only need those.
 			 */
+			if (publication->alltables)
+				current_tables = GetAllTablesPublicationRelations(publication->pubviaroot);
+			else
+			{
+				List	   *relids,
+						   *schemarelids;
+
+				relids = GetPublicationRelations(publication->oid,
+												 publication->pubviaroot ?
+												 PUBLICATION_PART_ROOT :
+												 PUBLICATION_PART_LEAF);
+				schemarelids = GetAllSchemaPublicationRelations(publication->oid,
+																publication->pubviaroot ?
+																PUBLICATION_PART_ROOT :
+																PUBLICATION_PART_LEAF);
+				current_tables = list_concat(relids, schemarelids);
+			}
+
+			/* Record the publication and corresponding table list. */
+			pubinfo->pubid = publication->oid;
+			pubinfo->table_list = list_copy(current_tables);
+			pub_infos = lappend(pub_infos, pubinfo);
+
+			tables = list_concat(tables, current_tables);
+
 			if (publication->pubviaroot)
-				tables = filter_partitions(tables);
+				viaroot = true;
+		}
+
+		pfree(elems);
+
+		/* Now sort and de-duplicate the result list */
+		list_sort(tables, list_oid_cmp);
+		list_deduplicate_oid(tables);
+
+		/*
+		 * If the publication publishes partition changes via their respective
+		 * root partitioned tables, we must exclude partitions in favor of
+		 * including the root partitioned tables. Otherwise, the function
+		 * could return both the child and parent tables which could cause
+		 * data of the child table to be double-published on the subscriber
+		 * side.
+		 */
+		if (viaroot)
+			tables = filter_partitions(tables);
+
+		/*
+		 * Record the published table and the corresponding publication so
+		 * that we can get row filters and column list later.
+		 */
+		foreach(lc1, tables)
+		{
+			Oid			relid = lfirst_oid(lc1);
+
+			foreach(lc2, pub_infos)
+			{
+				pub_info   *pubinfo = (pub_info *) lfirst(lc2);
+
+				if (list_member_oid(pubinfo->table_list, relid))
+				{
+					Oid		   *result = (Oid *) malloc(sizeof(Oid) * 2);
+
+					result[0] = relid;
+					result[1] = pubinfo->pubid;
+
+					results = lappend(results, result);
+				}
+			}
 		}
+		list_free(pub_infos);
 
 		/* Construct a tuple descriptor for the result rows. */
 		tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
@@ -1148,20 +1212,23 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 						   PG_NODE_TREEOID, -1, 0);
 
 		funcctx->tuple_desc = BlessTupleDesc(tupdesc);
-		funcctx->user_fctx = (void *) tables;
+		funcctx->user_fctx = (void *) results;
 
 		MemoryContextSwitchTo(oldcontext);
 	}
 
 	/* stuff done on every call of the function */
 	funcctx = SRF_PERCALL_SETUP();
-	tables = (List *) funcctx->user_fctx;
+	results = (List *) funcctx->user_fctx;
 
-	if (funcctx->call_cntr < list_length(tables))
+	if (funcctx->call_cntr < list_length(results))
 	{
 		HeapTuple	pubtuple = NULL;
 		HeapTuple	rettuple;
-		Oid			relid = list_nth_oid(tables, funcctx->call_cntr);
+		Oid		   *table_info = (Oid *) list_nth(results, funcctx->call_cntr);
+		Oid			relid = table_info[0],
+					pubid = table_info[1];
+		Publication *publication;
 		Datum		values[NUM_PUBLICATION_TABLES_ELEM];
 		bool		nulls[NUM_PUBLICATION_TABLES_ELEM];
 
@@ -1171,7 +1238,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 		MemSet(nulls, 0, sizeof(nulls));
 		MemSet(values, 0, sizeof(values));
 
-		publication = GetPublicationByName(pubname, false);
+		publication = GetPublication(pubid);
 
 		values[0] = ObjectIdGetDatum(relid);
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 83e6eae855..04aa83da95 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1764,25 +1764,70 @@ static List *
 fetch_table_list(WalReceiverConn *wrconn, List *publications)
 {
 	WalRcvExecResult *res;
-	StringInfoData cmd;
+	StringInfoData cmd,
+				pub_names;
 	TupleTableSlot *slot;
-	Oid			tableRow[3] = {TEXTOID, TEXTOID, NAMEARRAYOID};
+	Oid			tableRow[3] = {TEXTOID, TEXTOID, INT2VECTOROID};
 	List	   *tablelist = NIL;
-	bool		check_columnlist = (walrcv_server_version(wrconn) >= 150000);
+	int			server_version = walrcv_server_version(wrconn);
+	bool		check_columnlist = (server_version >= 150000);
+
+	initStringInfo(&pub_names);
+	get_publications_str(publications, &pub_names, true);
 
 	initStringInfo(&cmd);
-	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
 
 	/* Get column lists for each relation if the publisher supports it */
-	if (check_columnlist)
-		appendStringInfoString(&cmd, ", t.attnames\n");
+	if (check_columnlist && server_version >= 160000)
+		appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname,\n"
+						 "              ( CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)\n"
+						 "                     THEN NULL ELSE gpt.attrs END\n"
+						 "              ) AS attnames\n"
+						 " FROM pg_class c\n"
+						 "   JOIN pg_namespace n ON n.oid = c.relnamespace\n"
+						 "   JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
+						 "          FROM pg_publication\n"
+						 "          WHERE pubname IN ( %s )) as gpt\n"
+						 "       ON gpt.relid = c.oid\n",
+						 pub_names.data);
+	else
+	{
+		/*
+		 * Get the list of tables from publisher, the partitioned table whose
+		 * ancestor is also in this list should be ignored, otherwise the
+		 * initial date in the partitioned table would be replicated twice.
+		 */
 
-	appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
-						   " WHERE t.pubname IN (");
-	get_publications_str(publications, &cmd, true);
-	appendStringInfoChar(&cmd, ')');
+		appendStringInfoString(&cmd, "WITH pub_tabs AS(\n"
+							   " SELECT DISTINCT N.nspname, C.oid, C.relname, C.relispartition\n");
+
+		/* Get column lists for each relation if the publisher supports it */
+		if (check_columnlist)
+			appendStringInfoString(&cmd, ",( CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)\n"
+								   "              THEN NULL ELSE gpt.attrs END\n"
+								   "       ) AS attnames\n");
+
+		appendStringInfo(&cmd, " FROM pg_publication P,\n"
+						 "      LATERAL pg_get_publication_tables(P.pubname) GPT,\n"
+						 "      pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
+						 "  WHERE C.oid = GPT.relid AND P.pubname IN ( %s )\n"
+						 ")\n"
+						 "SELECT DISTINCT pub_tabs.nspname, pub_tabs.relname\n",
+						 pub_names.data);
+
+		/* Get column lists for each relation if the publisher supports it */
+		if (check_columnlist)
+			appendStringInfoString(&cmd, ", pub_tabs.attnames\n");
+
+		appendStringInfoString(&cmd, "FROM pub_tabs\n"
+							   " WHERE (pub_tabs.relispartition IS FALSE\n"
+							   "  OR NOT EXISTS (SELECT 1 FROM pg_partition_ancestors(pub_tabs.oid) as pa\n"
+							   "                  WHERE pa.relid IN (SELECT pub_tabs.oid FROM pub_tabs)\n"
+							   "                   AND pa.relid != pub_tabs.oid))\n");
+	}
 
 	res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
+	pfree(pub_names.data);
 	pfree(cmd.data);
 
 	if (res->status != WALRCV_OK_TUPLES)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a77b293723..6622401320 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11673,11 +11673,13 @@
   prosrc => 'pg_show_replication_origin_status' },
 
 # publications
-{ oid => '6119', descr => 'get information of tables in a publication',
-  proname => 'pg_get_publication_tables', prorows => '1000', proretset => 't',
-  provolatile => 's', prorettype => 'record', proargtypes => 'text',
-  proallargtypes => '{text,oid,int2vector,pg_node_tree}',
-  proargmodes => '{i,o,o,o}', proargnames => '{pubname,relid,attrs,qual}',
+{ oid => '6119',
+  descr => 'get information of tables in one or more publications',
+  proname => 'pg_get_publication_tables', prorows => '1000',
+  provariadic => 'text', proretset => 't', provolatile => 's',
+  prorettype => 'record', proargtypes => '_text',
+  proallargtypes => '{_text,oid,int2vector,pg_node_tree}',
+  proargmodes => '{v,o,o,o}', proargnames => '{pubname,relid,attrs,qual}',
   prosrc => 'pg_get_publication_tables' },
 { oid => '6121',
   descr => 'returns whether a relation can be part of a publication',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index fc3cde3226..2a9a9c9da4 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1448,7 +1448,7 @@ pg_publication_tables| SELECT p.pubname,
              JOIN pg_attribute a ON (((a.attrelid = gpt.relid) AND (a.attnum = k.k))))) AS attnames,
     pg_get_expr(gpt.qual, gpt.relid) AS rowfilter
    FROM pg_publication p,
-    LATERAL pg_get_publication_tables((p.pubname)::text) gpt(relid, attrs, qual),
+    LATERAL pg_get_publication_tables(VARIADIC ARRAY[(p.pubname)::text]) gpt(relid, attrs, qual),
     (pg_class c
      JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
   WHERE (c.oid = gpt.relid);
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 0dfbbabc3b..ae6e2057c8 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -477,12 +477,13 @@ $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab3_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3_1', b text)"
 );
 
-# Note: We create two separate tables, not a partitioned one, so that we can
-# easily identity through which relation were the changes replicated.
+# Note: We only create one table for the partition table (tab4) here.
+# Because we specify option PUBLISH_VIA_PARTITION_ROOT (see pub_all and
+# pub_lower_level above), all data should be replicated to the partition table.
+# So we do not need to create table for the partitioned table.
 $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab4 (a int PRIMARY KEY)");
-$node_subscriber2->safe_psql('postgres',
-	"CREATE TABLE tab4_1 (a int PRIMARY KEY)");
+
 # Publication that sub2 points to now publishes via root, so must update
 # subscription target relations. We set the list of publications so that
 # the FOR ALL TABLES publication is second (the list order matters).
@@ -556,11 +557,6 @@ $result =
   $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4 ORDER BY 1");
 is($result, qq(0), 'inserts into tab4 replicated');
 
-$result =
-  $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4_1 ORDER BY 1");
-is($result, qq(), 'inserts into tab4_1 replicated');
-
-
 # now switch the order of publications in the list, try again, the result
 # should be the same (no dependence on order of pulications)
 $node_subscriber2->safe_psql('postgres',
@@ -584,11 +580,6 @@ $result =
 is( $result, qq(0
 1), 'inserts into tab4 replicated');
 
-$result =
-  $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4_1 ORDER BY 1");
-is($result, qq(), 'inserts into tab4_1 replicated');
-
-
 # update (replicated as update)
 $node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 5");
 $node_publisher->safe_psql('postgres', "UPDATE tab2 SET a = 6 WHERE a = 5");
diff --git a/src/test/subscription/t/028_row_filter.pl b/src/test/subscription/t/028_row_filter.pl
index b1fb2d7cae..d7dcecf960 100644
--- a/src/test/subscription/t/028_row_filter.pl
+++ b/src/test/subscription/t/028_row_filter.pl
@@ -393,6 +393,10 @@ $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_rowfilter_child(a, b) VALUES(0,'0'),(30,'30'),(40,'40')"
 );
 
+# insert data into partitioned table.
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_viaroot_part(a) VALUES(13), (17)");
+
 $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1"
 );
@@ -723,8 +727,11 @@ is($result, qq(t|1), 'check replicated rows to tab_rowfilter_toast');
 # - INSERT (16)        YES, 16 > 15
 $result =
   $node_subscriber->safe_psql('postgres',
-	"SELECT a FROM tab_rowfilter_viaroot_part");
-is($result, qq(16), 'check replicated rows to tab_rowfilter_viaroot_part');
+	"SELECT a FROM tab_rowfilter_viaroot_part ORDER BY 1");
+is($result, qq(16
+17),
+	'check replicated rows to tab_rowfilter_viaroot_part'
+);
 
 # Check there is no data in tab_rowfilter_viaroot_part_1 because rows are
 # replicated via the top most parent table tab_rowfilter_viaroot_part
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index 9fa6e0b35f..3d07b456bc 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -973,7 +973,8 @@ $node_publisher->safe_psql(
 	CREATE TABLE test_root_1 PARTITION OF test_root FOR VALUES FROM (1) TO (10);
 	CREATE TABLE test_root_2 PARTITION OF test_root FOR VALUES FROM (10) TO (20);
 
-	CREATE PUBLICATION pub_root_true FOR TABLE test_root (a) WITH (publish_via_partition_root = true);
+	CREATE PUBLICATION pub_root_true_1 FOR TABLE test_root (a) WITH (publish_via_partition_root = true);
+	CREATE PUBLICATION pub_root_true_2 FOR TABLE test_root_1 (a, b) WITH (publish_via_partition_root = true);
 
 	-- initial data
 	INSERT INTO test_root VALUES (1, 2, 3);
@@ -982,7 +983,7 @@ $node_publisher->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true_1, pub_root_true_2;
 ));
 
 wait_for_subscription_sync($node_subscriber);
-- 
2.23.0.windows.1

