From d1f860659a02774eea514cfd65095df591407325 Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Fri, 13 May 2022 09:27:41 +0800
Subject: [PATCH v3] Fix data replicated twice when specifying
 PUBLISH_VIA_PARTITION_ROOT option.

If there are two publications that publish the parent table and the child table
separately, and both specify the option PUBLISH_VIA_PARTITION_ROOT, subscribing
to both publications from one subscription causes initial copy twice. What we
expect is to be copied only once.

To fix this, we extended the API of the function pg_get_publication_tables.
(Change the parameter type from text to any).
So, the function pg_get_publication_tables could also receive the array of
publications. And then, if we specify option viaroot, we could exclude the
partitioned table whose ancestor belongs to this publications array when
getting the table list.
---
 src/backend/catalog/pg_publication.c       | 98 ++++++++++++++++------
 src/backend/commands/subscriptioncmds.c    | 17 ++--
 src/include/catalog/pg_proc.dat            |  6 +-
 src/test/regress/expected/rules.out        |  2 +-
 src/test/subscription/t/013_partition.pl   | 19 ++---
 src/test/subscription/t/028_row_filter.pl  | 11 ++-
 src/test/subscription/t/031_column_list.pl |  5 +-
 7 files changed, 104 insertions(+), 54 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index e2c8bcb279..b2d91c67a3 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1077,20 +1077,24 @@ get_publication_name(Oid pubid, bool missing_ok)
 }
 
 /*
- * Returns Oids of tables in a publication.
+ * Returns Oids of tables in one or more publications.
  */
 Datum
 pg_get_publication_tables(PG_FUNCTION_ARGS)
 {
 	FuncCallContext *funcctx;
-	char	   *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
-	Publication *publication;
-	List	   *tables;
+	List	   *tables = NIL;
 
 	/* stuff done only on the first call of the function */
 	if (SRF_IS_FIRSTCALL())
 	{
 		MemoryContext oldcontext;
+		Oid			typeid;
+		ArrayType  *arr;
+		Datum	   *elems;
+		int			nelems;
+		Publication *publication;
+		int			i;
 
 		/* create a function context for cross-call persistence */
 		funcctx = SRF_FIRSTCALL_INIT();
@@ -1098,31 +1102,64 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 		/* switch to memory context appropriate for multiple function calls */
 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
 
-		publication = GetPublicationByName(pubname, false);
-
-		/*
-		 * Publications support partitioned tables, although all changes are
-		 * replicated using leaf partition identity and schema, so we only
-		 * need those.
-		 */
-		if (publication->alltables)
+		/* get publication(s) */
+		typeid = get_fn_expr_argtype(fcinfo->flinfo, 0);
+		switch (typeid)
 		{
-			tables = GetAllTablesPublicationRelations(publication->pubviaroot);
+			case TEXTARRAYOID:
+				/* get Oids of tables in publications array */
+				arr = PG_GETARG_ARRAYTYPE_P(0);
+				deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
+								  &elems, NULL, &nelems);
+				break;
+			case TEXTOID:
+			case NAMEOID:
+			case UNKNOWNOID:
+				/* get Oids of tables in a publication */
+				elems = (Datum *) palloc(sizeof(Datum));
+				if (typeid == TEXTOID)
+					elems[0] = PG_GETARG_DATUM(0);
+				else
+					elems[0] = CStringGetTextDatum(PG_GETARG_POINTER(0));
+				nelems = 1;
+				break;
+			default:
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("do not support this argument type")));
 		}
-		else
+
+		/* get Oids of tables from each publication */
+		for (i = 0; i < nelems; i++)
 		{
-			List	   *relids,
-					   *schemarelids;
-
-			relids = GetPublicationRelations(publication->oid,
-											 publication->pubviaroot ?
-											 PUBLICATION_PART_ROOT :
-											 PUBLICATION_PART_LEAF);
-			schemarelids = GetAllSchemaPublicationRelations(publication->oid,
-															publication->pubviaroot ?
-															PUBLICATION_PART_ROOT :
-															PUBLICATION_PART_LEAF);
-			tables = list_concat_unique_oid(relids, schemarelids);
+			publication = GetPublicationByName(TextDatumGetCString(elems[i]), true);
+
+			if (!publication)
+				continue;
+
+			/*
+			 * Publications support partitioned tables, although all changes
+			 * are replicated using leaf partition identity and schema, so we
+			 * only need those.
+			 */
+			if (publication->alltables)
+				tables = GetAllTablesPublicationRelations(publication->pubviaroot);
+			else
+			{
+				List	   *relids,
+						   *schemarelids;
+
+				relids = GetPublicationRelations(publication->oid,
+												 publication->pubviaroot ?
+												 PUBLICATION_PART_ROOT :
+												 PUBLICATION_PART_LEAF);
+				schemarelids = GetAllSchemaPublicationRelations(publication->oid,
+																publication->pubviaroot ?
+																PUBLICATION_PART_ROOT :
+																PUBLICATION_PART_LEAF);
+				tables = list_concat_unique_oid(tables, relids);
+				tables = list_concat_unique_oid(tables, schemarelids);
+			}
 
 			/*
 			 * If the publication publishes partition changes via their
@@ -1135,6 +1172,15 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 			if (publication->pubviaroot)
 				tables = filter_partitions(tables);
 		}
+		pfree(elems);
+
+		/*
+		 * We need an additional filter for this case : A partition table is
+		 * published in a publication with viaroot, and its parent or child
+		 * table is published in another publication without viaroot. In this
+		 * case, we should publish only parent table.
+		 */
+		tables = filter_partitions(tables);
 
 		funcctx->user_fctx = (void *) tables;
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 690cdaa426..62f2178626 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1759,17 +1759,22 @@ static List *
 fetch_table_list(WalReceiverConn *wrconn, List *publications)
 {
 	WalRcvExecResult *res;
-	StringInfoData cmd;
+	StringInfoData cmd,
+				pub_names;
 	TupleTableSlot *slot;
 	Oid			tableRow[2] = {TEXTOID, TEXTOID};
 	List	   *tablelist = NIL;
 
+	initStringInfo(&pub_names);
+	get_publications_str(publications, &pub_names, true);
+
 	initStringInfo(&cmd);
-	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
-						   "  FROM pg_catalog.pg_publication_tables t\n"
-						   " WHERE t.pubname IN (");
-	get_publications_str(publications, &cmd, true);
-	appendStringInfoChar(&cmd, ')');
+	appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname"
+					 " FROM pg_class c JOIN pg_namespace n"
+					 "   ON n.oid = c.relnamespace,"
+					 " LATERAL pg_get_publication_tables(array[ %s ]) gst"
+					 " WHERE c.oid = gst.relid;",
+					 pub_names.data);
 
 	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
 	pfree(cmd.data);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index babe16f00a..371f8abe6c 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11673,10 +11673,10 @@
   prosrc => 'pg_show_replication_origin_status' },
 
 # publications
-{ oid => '6119', descr => 'get OIDs of tables in a publication',
+{ oid => '6119', descr => 'get OIDs of tables in one or more publications',
   proname => 'pg_get_publication_tables', prorows => '1000', proretset => 't',
-  provolatile => 's', prorettype => 'oid', proargtypes => 'text',
-  proallargtypes => '{text,oid}', proargmodes => '{i,o}',
+  provolatile => 's', prorettype => 'oid', proargtypes => 'any',
+  proallargtypes => '{any,oid}', proargmodes => '{i,o}',
   proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_tables' },
 { oid => '6121',
   descr => 'returns whether a relation can be part of a publication',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 21effe8315..35f1f4241d 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1439,7 +1439,7 @@ pg_publication_tables| SELECT p.pubname,
     n.nspname AS schemaname,
     c.relname AS tablename
    FROM pg_publication p,
-    LATERAL pg_get_publication_tables((p.pubname)::text) gpt(relid),
+    LATERAL pg_get_publication_tables(p.pubname) gpt(relid),
     (pg_class c
      JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
   WHERE (c.oid = gpt.relid);
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index e7f4a94f19..549252971c 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -477,12 +477,13 @@ $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab3_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3_1', b text)"
 );
 
-# Note: We create two separate tables, not a partitioned one, so that we can
-# easily identity through which relation were the changes replicated.
+# Note: We only create one table for the partition table (tab4) here.
+# Because we specify option PUBLISH_VIA_PARTITION_ROOT (see pub_all and
+# pub_lower_level above), all data should be replicated to the partition table.
+# So we do not need to create table for the partitioned table.
 $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab4 (a int PRIMARY KEY)");
-$node_subscriber2->safe_psql('postgres',
-	"CREATE TABLE tab4_1 (a int PRIMARY KEY)");
+
 # Publication that sub2 points to now publishes via root, so must update
 # subscription target relations. We set the list of publications so that
 # the FOR ALL TABLES publication is second (the list order matters).
@@ -556,11 +557,6 @@ $result =
   $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4 ORDER BY 1");
 is($result, qq(0), 'inserts into tab4 replicated');
 
-$result =
-  $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4_1 ORDER BY 1");
-is($result, qq(), 'inserts into tab4_1 replicated');
-
-
 # now switch the order of publications in the list, try again, the result
 # should be the same (no dependence on order of pulications)
 $node_subscriber2->safe_psql('postgres',
@@ -584,11 +580,6 @@ $result =
 is( $result, qq(0
 1), 'inserts into tab4 replicated');
 
-$result =
-  $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4_1 ORDER BY 1");
-is($result, qq(), 'inserts into tab4_1 replicated');
-
-
 # update (replicated as update)
 $node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 5");
 $node_publisher->safe_psql('postgres', "UPDATE tab2 SET a = 6 WHERE a = 5");
diff --git a/src/test/subscription/t/028_row_filter.pl b/src/test/subscription/t/028_row_filter.pl
index 0dc0a6d10f..c72a98bed7 100644
--- a/src/test/subscription/t/028_row_filter.pl
+++ b/src/test/subscription/t/028_row_filter.pl
@@ -393,6 +393,10 @@ $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_rowfilter_child(a, b) VALUES(0,'0'),(30,'30'),(40,'40')"
 );
 
+# insert data into partitioned table.
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_viaroot_part(a) VALUES(13), (17)");
+
 $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1"
 );
@@ -718,8 +722,11 @@ is($result, qq(t|1), 'check replicated rows to tab_rowfilter_toast');
 # - INSERT (16)        YES, 16 > 15
 $result =
   $node_subscriber->safe_psql('postgres',
-	"SELECT a FROM tab_rowfilter_viaroot_part");
-is($result, qq(16), 'check replicated rows to tab_rowfilter_viaroot_part');
+	"SELECT a FROM tab_rowfilter_viaroot_part ORDER BY 1");
+is($result, qq(16
+17),
+	'check replicated rows to tab_rowfilter_viaroot_part'
+);
 
 # Check there is no data in tab_rowfilter_viaroot_part_1 because rows are
 # replicated via the top most parent table tab_rowfilter_viaroot_part
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index 19812e11f3..6e974aa601 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -1040,7 +1040,8 @@ $node_publisher->safe_psql(
 	CREATE TABLE test_root_1 PARTITION OF test_root FOR VALUES FROM (1) TO (10);
 	CREATE TABLE test_root_2 PARTITION OF test_root FOR VALUES FROM (10) TO (20);
 
-	CREATE PUBLICATION pub_root_true FOR TABLE test_root (a) WITH (publish_via_partition_root = true);
+	CREATE PUBLICATION pub_root_true_1 FOR TABLE test_root (a) WITH (publish_via_partition_root = true);
+	CREATE PUBLICATION pub_root_true_2 FOR TABLE test_root_1 (a, b) WITH (publish_via_partition_root = true);
 
 	-- initial data
 	INSERT INTO test_root VALUES (1, 2, 3);
@@ -1049,7 +1050,7 @@ $node_publisher->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true_1, pub_root_true_2;
 ));
 
 wait_for_subscription_sync($node_subscriber);
-- 
2.18.4

