From aa3fa3a3de41b37f292308654c697a8c23ebb6a7 Mon Sep 17 00:00:00 2001
From: "houzj.fnst" <houzj.fnst@cn.fujitsu.com>
Date: Fri, 13 May 2022 13:19:49 +0800
Subject: [PATCH] Prohibit combining publications with different column lists.

The main purpose of introducing a column list is to have statically
different shapes on publisher and subscriber or hide sensitive column
data. In both cases, it doesn't seems to make sense to combine column
lists.

So, we disallow the cases where the column list is different for the same
table when combining publications. It can be later extended to combine the
column lists for selective cases where required.

Reported-by: Alvaro Herrera
Author: Hou Zhijie
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/202204251548.mudq7jbqnh7r@alvherre.pgsql
---
 doc/src/sgml/ref/alter_publication.sgml     |  12 +++-
 doc/src/sgml/ref/create_subscription.sgml   |   5 ++
 src/backend/commands/subscriptioncmds.c     |  23 ++++--
 src/backend/replication/logical/tablesync.c |  60 ++++++++++------
 src/backend/replication/pgoutput/pgoutput.c |  77 ++++++++++----------
 src/test/subscription/t/031_column_list.pl  | 105 +++++-----------------------
 6 files changed, 127 insertions(+), 155 deletions(-)

diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index e2cce49..f03933a 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -116,7 +116,17 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
 
      <para>
       Optionally, a column list can be specified.  See <xref
-      linkend="sql-createpublication"/> for details.
+      linkend="sql-createpublication"/> for details. Note that a subscription
+      having several publications in which the same table has been published
+      with different column lists is not supported. So, changing the column
+      lists of the tables being subscribed could cause inconsistency of column
+      lists among publications in which case <command>ALTER PUBLICATION</command>
+      command will be successful but later the WalSender in publisher or the
+      subscriber may throw an error. In this scenario, the user needs to
+      recreate the subscription after adjusting the column list or drop the
+      problematic publication using
+      <literal>ALTER SUBSCRIPTION ... DROP PUBLICATION</literal> and then add
+      it back after adjusting the column list.
      </para>
 
      <para>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 203bb41..f6f82a0 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -356,6 +356,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
   </para>
 
   <para>
+   Subscription having several publications in which the same table has been
+   published with different column lists is not supported.
+  </para>
+
+  <para>
    We allow non-existent publications to be specified so that users can add
    those later. This means
    <link linkend="catalog-pg-subscription"><structname>pg_subscription</structname></link>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 690cdaa..991b2c1 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1753,7 +1753,8 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
 
 /*
  * Get the list of tables which belong to specified publications on the
- * publisher connection.
+ * publisher connection. Also get the column list for each table and check if
+ * column lists are the same in different publications.
  */
 static List *
 fetch_table_list(WalReceiverConn *wrconn, List *publications)
@@ -1761,17 +1762,18 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
-	Oid			tableRow[2] = {TEXTOID, TEXTOID};
+	Oid			tableRow[3] = {TEXTOID, TEXTOID, NAMEARRAYOID};
 	List	   *tablelist = NIL;
 
 	initStringInfo(&cmd);
-	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
+	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, \n"
+						   "                      t.attnames\n"
 						   "  FROM pg_catalog.pg_publication_tables t\n"
 						   " WHERE t.pubname IN (");
 	get_publications_str(publications, &cmd, true);
 	appendStringInfoChar(&cmd, ')');
 
-	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+	res = walrcv_exec(wrconn, cmd.data, 3, tableRow);
 	pfree(cmd.data);
 
 	if (res->status != WALRCV_OK_TUPLES)
@@ -1795,7 +1797,18 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 		Assert(!isnull);
 
 		rv = makeRangeVar(nspname, relname, -1);
-		tablelist = lappend(tablelist, rv);
+
+		/*
+		 * We don't support the case where column list is different for the
+		 * same table in different publications.
+		 */
+		if (list_member(tablelist, rv))
+			ereport(ERROR,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
+						   nspname, relname));
+		else
+			tablelist = lappend(tablelist, rv);
 
 		ExecClearTuple(slot);
 	}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 994c7a0..9ce4b9f 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -771,7 +771,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	{
 		WalRcvExecResult *pubres;
 		TupleTableSlot *slot;
-		Oid			attrsRow[] = {INT2OID};
+		Oid			attrsRow[] = {INT2VECTOROID};
 		StringInfoData pub_names;
 		bool		first = true;
 
@@ -786,19 +786,17 @@ fetch_remote_table_info(char *nspname, char *relname,
 
 		/*
 		 * Fetch info about column lists for the relation (from all the
-		 * publications). We unnest the int2vector values, because that makes
-		 * it easier to combine lists by simply adding the attnums to a new
-		 * bitmap (without having to parse the int2vector data). This
-		 * preserves NULL values, so that if one of the publications has no
-		 * column list, we'll know that.
+		 * publications).
 		 */
 		resetStringInfo(&cmd);
 		appendStringInfo(&cmd,
-						 "SELECT DISTINCT unnest"
+						 "SELECT DISTINCT"
+						 "  (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
+						 "   THEN NULL ELSE gpt.attrs END)"
 						 "  FROM pg_publication p,"
-						 "  LATERAL pg_get_publication_tables(p.pubname) gpt"
-						 "  LEFT OUTER JOIN unnest(gpt.attrs) ON TRUE"
-						 " WHERE gpt.relid = %u"
+						 "  LATERAL pg_get_publication_tables(p.pubname) gpt,"
+						 "  pg_class c"
+						 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
 						 "   AND p.pubname IN ( %s )",
 						 lrel->remoteid,
 						 pub_names.data);
@@ -813,26 +811,42 @@ fetch_remote_table_info(char *nspname, char *relname,
 							nspname, relname, pubres->err)));
 
 		/*
-		 * Merge the column lists (from different publications) by creating a
-		 * single bitmap with all the attnums. If we find a NULL value, that
-		 * means one of the publications has no column list for the table
-		 * we're syncing.
+		 * We don't support the case where column list is different for the
+		 * same table when combining publications. So there should be only one
+		 * row returned. Although we already checked this when creating
+		 * subscription, we still need to check here in case the column list
+		 * was changed afterwards.
+		 */
+		if (tuplestore_tuple_count(pubres->tuplestore) > 1)
+			ereport(ERROR,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
+						   nspname, relname));
+
+		/*
+		 * Get the column list and build a single bitmap with the attnums.
+		 *
+		 * If we find a NULL value, it means all the columns should be
+		 * replicated.
 		 */
 		slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
-		while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
+		if (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
 		{
 			Datum		cfval = slot_getattr(slot, 1, &isnull);
 
-			/* NULL means empty column list, so we're done. */
-			if (isnull)
+			if (!isnull)
 			{
-				bms_free(included_cols);
-				included_cols = NULL;
-				break;
-			}
+				ArrayType  *arr;
+				int			nelems;
+				int16	   *elems;
+
+				arr = DatumGetArrayTypeP(cfval);
+				nelems = ARR_DIMS(arr)[0];
+				elems = (int16 *) ARR_DATA_PTR(arr);
 
-			included_cols = bms_add_member(included_cols,
-										   DatumGetInt16(cfval));
+				for (natt = 0; natt < nelems; natt++)
+					included_cols = bms_add_member(included_cols, elems[natt]);
+			}
 
 			ExecClearTuple(slot);
 		}
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 42c06af..3d888a5 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -979,30 +979,30 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
 						  RelationSyncEntry *entry)
 {
 	ListCell   *lc;
+	bool		first = true;
+	Relation	relation = RelationIdGetRelation(entry->publish_as_relid);
 
 	/*
 	 * Find if there are any column lists for this relation. If there are,
-	 * build a bitmap merging all the column lists.
+	 * build a bitmap using the column lists.
 	 *
-	 * All the given publication-table mappings must be checked.
+	 * Note that we don't support the case where column list is different for
+	 * the same table when combining publications. But we still need to check
+	 * all the given publication-table mappings and report an error if any
+	 * publications have different column list.
 	 *
 	 * Multiple publications might have multiple column lists for this
 	 * relation.
 	 *
 	 * FOR ALL TABLES and FOR ALL TABLES IN SCHEMA implies "don't use column
-	 * list" so it takes precedence.
+	 * list".
 	 */
 	foreach(lc, publications)
 	{
 		Publication *pub = lfirst(lc);
 		HeapTuple	cftuple = NULL;
 		Datum		cfdatum = 0;
-
-		/*
-		 * Assume there's no column list. Only if we find pg_publication_rel
-		 * entry with a column list we'll switch it to false.
-		 */
-		bool		pub_no_list = true;
+		Bitmapset	*cols = NULL;
 
 		/*
 		 * If the publication is FOR ALL TABLES then it is treated the same as
@@ -1011,6 +1011,8 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
 		 */
 		if (!pub->alltables)
 		{
+			bool		pub_no_list = true;
+
 			/*
 			 * Check for the presence of a column list in this publication.
 			 *
@@ -1024,51 +1026,48 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
 
 			if (HeapTupleIsValid(cftuple))
 			{
-				/*
-				 * Lookup the column list attribute.
-				 *
-				 * Note: We update the pub_no_list value directly, because if
-				 * the value is NULL, we have no list (and vice versa).
-				 */
+				/* Lookup the column list attribute. */
 				cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
 										  Anum_pg_publication_rel_prattrs,
 										  &pub_no_list);
 
-				/*
-				 * Build the column list bitmap in the per-entry context.
-				 *
-				 * We need to merge column lists from all publications, so we
-				 * update the same bitmapset. If the column list is null, we
-				 * interpret it as replicating all columns.
-				 */
+				/* Build the column list bitmap in the per-entry context. */
 				if (!pub_no_list)	/* when not null */
 				{
 					pgoutput_ensure_entry_cxt(data, entry);
 
-					entry->columns = pub_collist_to_bitmapset(entry->columns,
-															  cfdatum,
-															  entry->entry_cxt);
+					cols = pub_collist_to_bitmapset(cols, cfdatum,
+													entry->entry_cxt);
+
+					/*
+					 * If column list includes all the columns of the table,
+					 * set it to NULL.
+					 */
+					if (bms_num_members(cols) == RelationGetNumberOfAttributes(relation))
+					{
+						bms_free(cols);
+						cols = NULL;
+					}
 				}
+
+				ReleaseSysCache(cftuple);
 			}
 		}
 
-		/*
-		 * Found a publication with no column list, so we're done. But first
-		 * discard column list we might have from preceding publications.
-		 */
-		if (pub_no_list)
+		if (first)
 		{
-			if (cftuple)
-				ReleaseSysCache(cftuple);
-
-			bms_free(entry->columns);
-			entry->columns = NULL;
-
-			break;
+			entry->columns = cols;
+			first = false;
 		}
-
-		ReleaseSysCache(cftuple);
+		else if (!bms_equal(entry->columns, cols))
+			ereport(ERROR,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
+						   get_namespace_name(RelationGetNamespace(relation)),
+						   RelationGetRelationName(relation)));
 	}							/* loop all subscribed publications */
+
+	RelationClose(relation);
 }
 
 /*
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index 19812e1..22be6fd 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -361,13 +361,13 @@ is( $result, qq(1|abc
 2|xyz), 'update on column tab2.c is not replicated');
 
 
-# TEST: add a table to two publications with different column lists, and
+# TEST: add a table to two publications with same column lists, and
 # create a single subscription replicating both publications
 $node_publisher->safe_psql(
 	'postgres', qq(
 	CREATE TABLE tab5 (a int PRIMARY KEY, b int, c int, d int);
 	CREATE PUBLICATION pub2 FOR TABLE tab5 (a, b);
-	CREATE PUBLICATION pub3 FOR TABLE tab5 (a, d);
+	CREATE PUBLICATION pub3 FOR TABLE tab5 (a, b);
 
 	-- insert a couple initial records
 	INSERT INTO tab5 VALUES (1, 11, 111, 1111);
@@ -388,8 +388,7 @@ wait_for_subscription_sync($node_subscriber);
 
 $node_publisher->wait_for_catchup('sub1');
 
-# insert data and make sure all the columns (union of the columns lists)
-# get fully replicated
+# insert data and make sure the columns in column list get fully replicated
 $node_publisher->safe_psql(
 	'postgres', qq(
 	INSERT INTO tab5 VALUES (3, 33, 333, 3333);
@@ -399,42 +398,12 @@ $node_publisher->safe_psql(
 $node_publisher->wait_for_catchup('sub1');
 
 is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab5 ORDER BY a"),
-	qq(1|11|1111
-2|22|2222
-3|33|3333
-4|44|4444),
+	qq(1|11|
+2|22|
+3|33|
+4|44|),
 	'overlapping publications with overlapping column lists');
 
-# and finally, remove the column list for one of the publications, which
-# means replicating all columns (removing the column list), but first add
-# the missing column to the table on subscriber
-$node_publisher->safe_psql(
-	'postgres', qq(
-	ALTER PUBLICATION pub3 SET TABLE tab5;
-));
-
-$node_subscriber->safe_psql(
-	'postgres', qq(
-	ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION;
-	ALTER TABLE tab5 ADD COLUMN c INT;
-));
-
-wait_for_subscription_sync($node_subscriber);
-
-$node_publisher->safe_psql(
-	'postgres', qq(
-	INSERT INTO tab5 VALUES (5, 55, 555, 5555);
-));
-
-$node_publisher->wait_for_catchup('sub1');
-
-is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab5 ORDER BY a"),
-	qq(1|11|1111|
-2|22|2222|
-3|33|3333|
-4|44|4444|
-5|55|5555|555),
-	'overlapping publications with overlapping column lists');
 
 # TEST: create a table with a column list, then change the replica
 # identity by replacing a primary key (but use a different column in
@@ -900,57 +869,21 @@ is( $node_subscriber->safe_psql(
 3|),
 	'partitions with different replica identities not replicated correctly');
 
-# TEST: With a table included in multiple publications, we should use a
-# union of the column lists. So with column lists (a,b) and (a,c) we
-# should replicate (a,b,c).
 
-$node_publisher->safe_psql(
-	'postgres', qq(
-	CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int);
-	CREATE PUBLICATION pub_mix_1 FOR TABLE test_mix_1 (a, b);
-	CREATE PUBLICATION pub_mix_2 FOR TABLE test_mix_1 (a, c);
-
-	-- initial data
-	INSERT INTO test_mix_1 VALUES (1, 2, 3);
-));
-
-$node_subscriber->safe_psql(
-	'postgres', qq(
-	CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int);
-	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub_mix_1, pub_mix_2;
-));
-
-wait_for_subscription_sync($node_subscriber);
-
-$node_publisher->safe_psql(
-	'postgres', qq(
-	INSERT INTO test_mix_1 VALUES (4, 5, 6);
-));
-
-$node_publisher->wait_for_catchup('sub1');
-
-is( $node_subscriber->safe_psql(
-		'postgres', "SELECT * FROM test_mix_1 ORDER BY a"),
-	qq(1|2|3
-4|5|6),
-	'a mix of publications should use a union of column list');
-
-
-# TEST: With a table included in multiple publications, we should use a
-# union of the column lists. If any of the publications is FOR ALL
-# TABLES, we should replicate all columns.
+# TEST: With a table included in the publications which is FOR ALL TABLES, it
+# means replicate all columns.
 
 # drop unnecessary tables, so as not to interfere with the FOR ALL TABLES
 $node_publisher->safe_psql(
 	'postgres', qq(
-	DROP TABLE tab1, tab2, tab3, tab4, tab5, tab6, tab7, test_mix_1,
+	DROP TABLE tab1, tab2, tab3, tab4, tab5, tab6, tab7,
 			   test_part, test_part_a, test_part_b, test_part_c, test_part_d;
 ));
 
 $node_publisher->safe_psql(
 	'postgres', qq(
 	CREATE TABLE test_mix_2 (a int PRIMARY KEY, b int, c int);
-	CREATE PUBLICATION pub_mix_3 FOR TABLE test_mix_2 (a, b);
+	CREATE PUBLICATION pub_mix_3 FOR TABLE test_mix_2 (a, b, c);
 	CREATE PUBLICATION pub_mix_4 FOR ALL TABLES;
 
 	-- initial data
@@ -976,12 +909,11 @@ $node_publisher->wait_for_catchup('sub1');
 is( $node_subscriber->safe_psql('postgres', "SELECT * FROM test_mix_2"),
 	qq(1|2|3
 4|5|6),
-	'a mix of publications should use a union of column list');
+	'all columns should be replicated');
 
 
-# TEST: With a table included in multiple publications, we should use a
-# union of the column lists. If any of the publications is FOR ALL
-# TABLES IN SCHEMA, we should replicate all columns.
+# TEST: With a table included in the publication which is FOR ALL TABLES
+# IN SCHEMA, it means replicate all columns.
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
@@ -993,7 +925,7 @@ $node_publisher->safe_psql(
 	'postgres', qq(
 	DROP TABLE test_mix_2;
 	CREATE TABLE test_mix_3 (a int PRIMARY KEY, b int, c int);
-	CREATE PUBLICATION pub_mix_5 FOR TABLE test_mix_3 (a, b);
+	CREATE PUBLICATION pub_mix_5 FOR TABLE test_mix_3 (a, b, c);
 	CREATE PUBLICATION pub_mix_6 FOR ALL TABLES IN SCHEMA public;
 
 	-- initial data
@@ -1017,7 +949,7 @@ $node_publisher->wait_for_catchup('sub1');
 is( $node_subscriber->safe_psql('postgres', "SELECT * FROM test_mix_3"),
 	qq(1|2|3
 4|5|6),
-	'a mix of publications should use a union of column list');
+	'all columns should be replicated');
 
 
 # TEST: Check handling of publish_via_partition_root - if a partition is
@@ -1074,7 +1006,7 @@ is( $node_subscriber->safe_psql(
 # TEST: Multiple publications which publish schema of parent table and
 # partition. The partition is published through two publications, once
 # through a schema (so no column list) containing the parent, and then
-# also directly (with a columns list). The expected outcome is there is
+# also directly (with all columns). The expected outcome is there is
 # no column list.
 
 $node_publisher->safe_psql(
@@ -1086,7 +1018,7 @@ $node_publisher->safe_psql(
 	CREATE TABLE t_1 PARTITION OF s1.t FOR VALUES FROM (1) TO (10);
 
 	CREATE PUBLICATION pub1 FOR ALL TABLES IN SCHEMA s1;
-	CREATE PUBLICATION pub2 FOR TABLE t_1(b);
+	CREATE PUBLICATION pub2 FOR TABLE t_1(a, b, c);
 
 	-- initial data
 	INSERT INTO s1.t VALUES (1, 2, 3);
@@ -1232,7 +1164,6 @@ is( $node_subscriber->safe_psql(
 4||),
 	'publication containing both parent and child relation');
 
-
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
-- 
2.7.2.windows.1

