From e1fb8c867353262a43419ee1892bbd64f4d613d7 Mon Sep 17 00:00:00 2001
From: "houzj.fnst" <houzj.fnst@cn.fujitsu.com>
Date: Fri, 13 May 2022 13:19:49 +0800
Subject: [PATCH] Disallow combining publication when column list is different
 for the same table

the main purpose of introducing a column list are statically have different
shapes on publisher and subscriber or hide sensitive columns data. In both
cases, it doesn't seems make sense to combine column lists. So disallow the
cases where column list is different for the same table when combining
publications.
---
 src/backend/commands/subscriptioncmds.c     |  24 ++++--
 src/backend/replication/logical/tablesync.c |  61 ++++++++-----
 src/backend/replication/pgoutput/pgoutput.c |  75 ++++++++--------
 src/test/subscription/t/031_column_list.pl  | 127 +++++++++-------------------
 4 files changed, 137 insertions(+), 150 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 690cdaa..c19eb81 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1753,7 +1753,8 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
 
 /*
  * Get the list of tables which belong to specified publications on the
- * publisher connection.
+ * publisher connection. Also get the column list for each table and check if
+ * column lists are the same in different publications.
  */
 static List *
 fetch_table_list(WalReceiverConn *wrconn, List *publications)
@@ -1761,17 +1762,18 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
-	Oid			tableRow[2] = {TEXTOID, TEXTOID};
+	Oid			tableRow[3] = {TEXTOID, TEXTOID, INT2VECTOROID};
 	List	   *tablelist = NIL;
 
 	initStringInfo(&cmd);
-	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
+	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, \n"
+						   "                      t.columnlist\n"
 						   "  FROM pg_catalog.pg_publication_tables t\n"
 						   " WHERE t.pubname IN (");
 	get_publications_str(publications, &cmd, true);
 	appendStringInfoChar(&cmd, ')');
 
-	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+	res = walrcv_exec(wrconn, cmd.data, 3, tableRow);
 	pfree(cmd.data);
 
 	if (res->status != WALRCV_OK_TUPLES)
@@ -1795,7 +1797,19 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 		Assert(!isnull);
 
 		rv = makeRangeVar(nspname, relname, -1);
-		tablelist = lappend(tablelist, rv);
+
+		/*
+		 * We only throw a warning here so that the subcription can still be
+		 * created and let user aware that something is going to fail later and
+		 * they can fix the publications afterwards.
+		 */
+		if (list_member(tablelist, rv))
+			ereport(WARNING,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
+						   nspname, relname));
+		else
+			tablelist = lappend(tablelist, rv);
 
 		ExecClearTuple(slot);
 	}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 994c7a0..42de832 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -771,7 +771,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	{
 		WalRcvExecResult *pubres;
 		TupleTableSlot *slot;
-		Oid			attrsRow[] = {INT2OID};
+		Oid			attrsRow[] = {INT2VECTOROID};
 		StringInfoData pub_names;
 		bool		first = true;
 
@@ -786,19 +786,17 @@ fetch_remote_table_info(char *nspname, char *relname,
 
 		/*
 		 * Fetch info about column lists for the relation (from all the
-		 * publications). We unnest the int2vector values, because that makes
-		 * it easier to combine lists by simply adding the attnums to a new
-		 * bitmap (without having to parse the int2vector data). This
-		 * preserves NULL values, so that if one of the publications has no
-		 * column list, we'll know that.
+		 * publications).
 		 */
 		resetStringInfo(&cmd);
 		appendStringInfo(&cmd,
-						 "SELECT DISTINCT unnest"
+						 "SELECT DISTINCT"
+						 "  (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
+						 "   THEN NULL ELSE gpt.attrs END)"
 						 "  FROM pg_publication p,"
-						 "  LATERAL pg_get_publication_tables(p.pubname) gpt"
-						 "  LEFT OUTER JOIN unnest(gpt.attrs) ON TRUE"
-						 " WHERE gpt.relid = %u"
+						 "  LATERAL pg_get_publication_tables(p.pubname) gpt,"
+						 "  pg_class c"
+						 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
 						 "   AND p.pubname IN ( %s )",
 						 lrel->remoteid,
 						 pub_names.data);
@@ -812,27 +810,48 @@ fetch_remote_table_info(char *nspname, char *relname,
 					 errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
 							nspname, relname, pubres->err)));
 
+		first = true;
+
 		/*
-		 * Merge the column lists (from different publications) by creating a
-		 * single bitmap with all the attnums. If we find a NULL value, that
-		 * means one of the publications has no column list for the table
-		 * we're syncing.
+		 * Traverse the column lists from different publications and build a
+		 * single bitmap with the attnums.
+		 *
+		 * During the loop, check that if all the column lists are the same and
+		 * report an error if not.
+		 *
+		 * If we find a NULL value, that means one of the publications has no
+		 * column list for the table we're syncing.
 		 */
 		slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
 		while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
 		{
 			Datum		cfval = slot_getattr(slot, 1, &isnull);
+			Bitmapset  *cols = NULL;
 
-			/* NULL means empty column list, so we're done. */
-			if (isnull)
+			if (!isnull)
 			{
-				bms_free(included_cols);
-				included_cols = NULL;
-				break;
+				ArrayType  *arr;
+				int			nelems;
+				int16	   *elems;
+
+				arr = DatumGetArrayTypeP(cfval);
+				nelems = ARR_DIMS(arr)[0];
+				elems = (int16 *) ARR_DATA_PTR(arr);
+
+				for (int i = 0; i < nelems; i++)
+					cols = bms_add_member(cols, elems[i]);
 			}
 
-			included_cols = bms_add_member(included_cols,
-										   DatumGetInt16(cfval));
+			if (first)
+			{
+				included_cols = cols;
+				first = false;
+			}
+			else if (!bms_equal(included_cols, cols))
+				ereport(ERROR,
+						errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+						errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
+							   nspname, relname));
 
 			ExecClearTuple(slot);
 		}
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 42c06af..4f58df2 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -979,12 +979,17 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
 						  RelationSyncEntry *entry)
 {
 	ListCell   *lc;
+	bool		first = true;
+	Relation	relation = RelationIdGetRelation(entry->publish_as_relid);
 
 	/*
 	 * Find if there are any column lists for this relation. If there are,
-	 * build a bitmap merging all the column lists.
+	 * build a bitmap using the column lists.
 	 *
-	 * All the given publication-table mappings must be checked.
+	 * Note that we don't support the case where column list is different for
+	 * the same table when combining publications. But we still need to check
+	 * all the given publication-table mappings and report an error if any
+	 * publications have different column list.
 	 *
 	 * Multiple publications might have multiple column lists for this
 	 * relation.
@@ -997,12 +1002,7 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
 		Publication *pub = lfirst(lc);
 		HeapTuple	cftuple = NULL;
 		Datum		cfdatum = 0;
-
-		/*
-		 * Assume there's no column list. Only if we find pg_publication_rel
-		 * entry with a column list we'll switch it to false.
-		 */
-		bool		pub_no_list = true;
+		Bitmapset	*cols = NULL;
 
 		/*
 		 * If the publication is FOR ALL TABLES then it is treated the same as
@@ -1011,6 +1011,8 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
 		 */
 		if (!pub->alltables)
 		{
+			bool		pub_no_list = true;
+
 			/*
 			 * Check for the presence of a column list in this publication.
 			 *
@@ -1024,51 +1026,48 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
 
 			if (HeapTupleIsValid(cftuple))
 			{
-				/*
-				 * Lookup the column list attribute.
-				 *
-				 * Note: We update the pub_no_list value directly, because if
-				 * the value is NULL, we have no list (and vice versa).
-				 */
+				/* Lookup the column list attribute. */
 				cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
 										  Anum_pg_publication_rel_prattrs,
 										  &pub_no_list);
 
-				/*
-				 * Build the column list bitmap in the per-entry context.
-				 *
-				 * We need to merge column lists from all publications, so we
-				 * update the same bitmapset. If the column list is null, we
-				 * interpret it as replicating all columns.
-				 */
+				/* Build the column list bitmap in the per-entry context. */
 				if (!pub_no_list)	/* when not null */
 				{
 					pgoutput_ensure_entry_cxt(data, entry);
 
-					entry->columns = pub_collist_to_bitmapset(entry->columns,
-															  cfdatum,
-															  entry->entry_cxt);
+					cols = pub_collist_to_bitmapset(cols, cfdatum,
+													entry->entry_cxt);
+
+					/*
+					 * If column list includes all the columns of the table,
+					 * set it to NULL.
+					 */
+					if (bms_num_members(cols) == RelationGetNumberOfAttributes(relation))
+					{
+						bms_free(cols);
+						cols = NULL;
+					}
 				}
+
+				ReleaseSysCache(cftuple);
 			}
 		}
 
-		/*
-		 * Found a publication with no column list, so we're done. But first
-		 * discard column list we might have from preceding publications.
-		 */
-		if (pub_no_list)
+		if (first)
 		{
-			if (cftuple)
-				ReleaseSysCache(cftuple);
-
-			bms_free(entry->columns);
-			entry->columns = NULL;
-
-			break;
+			entry->columns = cols;
+			first = false;
 		}
-
-		ReleaseSysCache(cftuple);
+		else if (!bms_equal(entry->columns, cols))
+			ereport(ERROR,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
+						   get_namespace_name(RelationGetNamespace(relation)),
+						   RelationGetRelationName(relation)));
 	}							/* loop all subscribed publications */
+
+	RelationClose(relation);
 }
 
 /*
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index 19812e1..b454a05 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -21,6 +21,8 @@ $node_subscriber->start;
 
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
 
+my $offset = 0;
+
 sub wait_for_subscription_sync
 {
 	my ($node) = @_;
@@ -361,13 +363,13 @@ is( $result, qq(1|abc
 2|xyz), 'update on column tab2.c is not replicated');
 
 
-# TEST: add a table to two publications with different column lists, and
+# TEST: add a table to two publications with same column lists, and
 # create a single subscription replicating both publications
 $node_publisher->safe_psql(
 	'postgres', qq(
 	CREATE TABLE tab5 (a int PRIMARY KEY, b int, c int, d int);
 	CREATE PUBLICATION pub2 FOR TABLE tab5 (a, b);
-	CREATE PUBLICATION pub3 FOR TABLE tab5 (a, d);
+	CREATE PUBLICATION pub3 FOR TABLE tab5 (a, b);
 
 	-- insert a couple initial records
 	INSERT INTO tab5 VALUES (1, 11, 111, 1111);
@@ -388,8 +390,7 @@ wait_for_subscription_sync($node_subscriber);
 
 $node_publisher->wait_for_catchup('sub1');
 
-# insert data and make sure all the columns (union of the columns lists)
-# get fully replicated
+# insert data and make sure the columns in column list get fully replicated
 $node_publisher->safe_psql(
 	'postgres', qq(
 	INSERT INTO tab5 VALUES (3, 33, 333, 3333);
@@ -399,42 +400,12 @@ $node_publisher->safe_psql(
 $node_publisher->wait_for_catchup('sub1');
 
 is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab5 ORDER BY a"),
-	qq(1|11|1111
-2|22|2222
-3|33|3333
-4|44|4444),
+	qq(1|11|
+2|22|
+3|33|
+4|44|),
 	'overlapping publications with overlapping column lists');
 
-# and finally, remove the column list for one of the publications, which
-# means replicating all columns (removing the column list), but first add
-# the missing column to the table on subscriber
-$node_publisher->safe_psql(
-	'postgres', qq(
-	ALTER PUBLICATION pub3 SET TABLE tab5;
-));
-
-$node_subscriber->safe_psql(
-	'postgres', qq(
-	ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION;
-	ALTER TABLE tab5 ADD COLUMN c INT;
-));
-
-wait_for_subscription_sync($node_subscriber);
-
-$node_publisher->safe_psql(
-	'postgres', qq(
-	INSERT INTO tab5 VALUES (5, 55, 555, 5555);
-));
-
-$node_publisher->wait_for_catchup('sub1');
-
-is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab5 ORDER BY a"),
-	qq(1|11|1111|
-2|22|2222|
-3|33|3333|
-4|44|4444|
-5|55|5555|555),
-	'overlapping publications with overlapping column lists');
 
 # TEST: create a table with a column list, then change the replica
 # identity by replacing a primary key (but use a different column in
@@ -900,57 +871,21 @@ is( $node_subscriber->safe_psql(
 3|),
 	'partitions with different replica identities not replicated correctly');
 
-# TEST: With a table included in multiple publications, we should use a
-# union of the column lists. So with column lists (a,b) and (a,c) we
-# should replicate (a,b,c).
 
-$node_publisher->safe_psql(
-	'postgres', qq(
-	CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int);
-	CREATE PUBLICATION pub_mix_1 FOR TABLE test_mix_1 (a, b);
-	CREATE PUBLICATION pub_mix_2 FOR TABLE test_mix_1 (a, c);
-
-	-- initial data
-	INSERT INTO test_mix_1 VALUES (1, 2, 3);
-));
-
-$node_subscriber->safe_psql(
-	'postgres', qq(
-	CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int);
-	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub_mix_1, pub_mix_2;
-));
-
-wait_for_subscription_sync($node_subscriber);
-
-$node_publisher->safe_psql(
-	'postgres', qq(
-	INSERT INTO test_mix_1 VALUES (4, 5, 6);
-));
-
-$node_publisher->wait_for_catchup('sub1');
-
-is( $node_subscriber->safe_psql(
-		'postgres', "SELECT * FROM test_mix_1 ORDER BY a"),
-	qq(1|2|3
-4|5|6),
-	'a mix of publications should use a union of column list');
-
-
-# TEST: With a table included in multiple publications, we should use a
-# union of the column lists. If any of the publications is FOR ALL
-# TABLES, we should replicate all columns.
+# TEST: With a table included in the publications is FOR ALL TABLES, it
+# means replicate all columns.
 
 # drop unnecessary tables, so as not to interfere with the FOR ALL TABLES
 $node_publisher->safe_psql(
 	'postgres', qq(
-	DROP TABLE tab1, tab2, tab3, tab4, tab5, tab6, tab7, test_mix_1,
+	DROP TABLE tab1, tab2, tab3, tab4, tab5, tab6, tab7,
 			   test_part, test_part_a, test_part_b, test_part_c, test_part_d;
 ));
 
 $node_publisher->safe_psql(
 	'postgres', qq(
 	CREATE TABLE test_mix_2 (a int PRIMARY KEY, b int, c int);
-	CREATE PUBLICATION pub_mix_3 FOR TABLE test_mix_2 (a, b);
+	CREATE PUBLICATION pub_mix_3 FOR TABLE test_mix_2 (a, b, c);
 	CREATE PUBLICATION pub_mix_4 FOR ALL TABLES;
 
 	-- initial data
@@ -976,12 +911,11 @@ $node_publisher->wait_for_catchup('sub1');
 is( $node_subscriber->safe_psql('postgres', "SELECT * FROM test_mix_2"),
 	qq(1|2|3
 4|5|6),
-	'a mix of publications should use a union of column list');
+	'all columns should be replicated');
 
 
-# TEST: With a table included in multiple publications, we should use a
-# union of the column lists. If any of the publications is FOR ALL
-# TABLES IN SCHEMA, we should replicate all columns.
+# TEST: With a table included in the publication which is FOR ALL TABLES
+# IN SCHEMA, it means replicate all columns.
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
@@ -993,7 +927,7 @@ $node_publisher->safe_psql(
 	'postgres', qq(
 	DROP TABLE test_mix_2;
 	CREATE TABLE test_mix_3 (a int PRIMARY KEY, b int, c int);
-	CREATE PUBLICATION pub_mix_5 FOR TABLE test_mix_3 (a, b);
+	CREATE PUBLICATION pub_mix_5 FOR TABLE test_mix_3 (a, b, c);
 	CREATE PUBLICATION pub_mix_6 FOR ALL TABLES IN SCHEMA public;
 
 	-- initial data
@@ -1017,7 +951,7 @@ $node_publisher->wait_for_catchup('sub1');
 is( $node_subscriber->safe_psql('postgres', "SELECT * FROM test_mix_3"),
 	qq(1|2|3
 4|5|6),
-	'a mix of publications should use a union of column list');
+	'all columns should be replicated');
 
 
 # TEST: Check handling of publish_via_partition_root - if a partition is
@@ -1074,7 +1008,7 @@ is( $node_subscriber->safe_psql(
 # TEST: Multiple publications which publish schema of parent table and
 # partition. The partition is published through two publications, once
 # through a schema (so no column list) containing the parent, and then
-# also directly (with a columns list). The expected outcome is there is
+# also directly (with all columns). The expected outcome is there is
 # no column list.
 
 $node_publisher->safe_psql(
@@ -1086,7 +1020,7 @@ $node_publisher->safe_psql(
 	CREATE TABLE t_1 PARTITION OF s1.t FOR VALUES FROM (1) TO (10);
 
 	CREATE PUBLICATION pub1 FOR ALL TABLES IN SCHEMA s1;
-	CREATE PUBLICATION pub2 FOR TABLE t_1(b);
+	CREATE PUBLICATION pub2 FOR TABLE t_1(a, b, c);
 
 	-- initial data
 	INSERT INTO s1.t VALUES (1, 2, 3);
@@ -1233,6 +1167,27 @@ is( $node_subscriber->safe_psql(
 	'publication containing both parent and child relation');
 
 
+# TEST: With a table included in multiple publications with different column
+# lists, we should catch the error in the log
+
+$node_publisher->safe_psql('postgres', qq(
+	CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int);
+	CREATE PUBLICATION pub_mix_1 FOR TABLE test_mix_1 (a, b);
+	CREATE PUBLICATION pub_mix_2 FOR TABLE test_mix_1 (a, c);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+	CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int);
+	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub_mix_1, pub_mix_2 WITH (copy_data = false);
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+my $logfile = slurp_file($node_subscriber->logfile, $offset);
+ok( $logfile =~
+	  qr/cannot use different column lists for table "public.test_mix_1" in different publications/,
+	'different column lists detected');
+
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
-- 
2.7.2.windows.1

