From 4ca52204c9ba749ade90dfee133ecd16a905e060 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Mon, 28 Oct 2024 14:53:27 +0800
Subject: [PATCH v44 1/2] Support logical replication of generated columns in
 column list.

Allow logical replication to publish generated columns if they are explicitly
mentioned in the column list.
---
 doc/src/sgml/protocol.sgml                  |  4 +-
 doc/src/sgml/ref/create_publication.sgml    |  3 +-
 src/backend/catalog/pg_publication.c        | 10 +---
 src/backend/replication/logical/proto.c     | 61 +++++++++++----------
 src/backend/replication/pgoutput/pgoutput.c | 24 +++++---
 src/include/replication/logicalproto.h      |  2 +
 src/test/regress/expected/publication.out   |  6 +-
 src/test/regress/sql/publication.sql        |  6 +-
 src/test/subscription/t/031_column_list.pl  | 36 ++++++++++--
 9 files changed, 93 insertions(+), 59 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 057c46f3f5..71b6b2a535 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -6544,7 +6544,7 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
 
      <para>
       Next, the following message part appears for each column included in
-      the publication (except generated columns):
+      the publication:
      </para>
 
      <variablelist>
@@ -7477,7 +7477,7 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
      </variablelist>
 
      <para>
-      Next, one of the following submessages appears for each column (except generated columns):
+      Next, one of the following submessages appears for each column:
 
       <variablelist>
        <varlistentry>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index fd9c5deac9..597aab41a2 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -88,7 +88,8 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
      </para>
 
      <para>
-      When a column list is specified, only the named columns are replicated.
+      When a column list is specified, all columns (except generated columns)
+      of the table are replicated.
       If no column list is specified, all columns of the table are replicated
       through this publication, including any columns added later. It has no
       effect on <literal>TRUNCATE</literal> commands. See
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 7e5e357fd9..17a6093d06 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -500,8 +500,7 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
  * pub_collist_validate
  *		Process and validate the 'columns' list and ensure the columns are all
  *		valid to use for a publication.  Checks for and raises an ERROR for
- * 		any; unknown columns, system columns, duplicate columns or generated
- *		columns.
+ * 		any unknown columns, system columns, or duplicate columns.
  *
  * Looks up each column's attnum and returns a 0-based Bitmapset of the
  * corresponding attnums.
@@ -511,7 +510,6 @@ pub_collist_validate(Relation targetrel, List *columns)
 {
 	Bitmapset  *set = NULL;
 	ListCell   *lc;
-	TupleDesc	tupdesc = RelationGetDescr(targetrel);
 
 	foreach(lc, columns)
 	{
@@ -530,12 +528,6 @@ pub_collist_validate(Relation targetrel, List *columns)
 					errmsg("cannot use system column \"%s\" in publication column list",
 						   colname));
 
-		if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated)
-			ereport(ERROR,
-					errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
-					errmsg("cannot use generated column \"%s\" in publication column list",
-						   colname));
-
 		if (bms_is_member(attnum, set))
 			ereport(ERROR,
 					errcode(ERRCODE_DUPLICATE_OBJECT),
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 980f6e2741..e4a95b9d71 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -40,19 +40,6 @@ static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
 static void logicalrep_write_namespace(StringInfo out, Oid nspid);
 static const char *logicalrep_read_namespace(StringInfo in);
 
-/*
- * Check if a column is covered by a column list.
- *
- * Need to be careful about NULL, which is treated as a column list covering
- * all columns.
- */
-static bool
-column_in_column_list(int attnum, Bitmapset *columns)
-{
-	return (columns == NULL || bms_is_member(attnum, columns));
-}
-
-
 /*
  * Write BEGIN to the output stream.
  */
@@ -781,10 +768,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
 	{
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 
-		if (att->attisdropped || att->attgenerated)
-			continue;
-
-		if (!column_in_column_list(att->attnum, columns))
+		if (!logicalrep_should_publish_column(att, columns))
 			continue;
 
 		nliveatts++;
@@ -802,10 +786,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
 		Form_pg_type typclass;
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 
-		if (att->attisdropped || att->attgenerated)
-			continue;
-
-		if (!column_in_column_list(att->attnum, columns))
+		if (!logicalrep_should_publish_column(att, columns))
 			continue;
 
 		if (isnull[i])
@@ -938,10 +919,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
 	{
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 
-		if (att->attisdropped || att->attgenerated)
-			continue;
-
-		if (!column_in_column_list(att->attnum, columns))
+		if (!logicalrep_should_publish_column(att, columns))
 			continue;
 
 		nliveatts++;
@@ -959,10 +937,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 		uint8		flags = 0;
 
-		if (att->attisdropped || att->attgenerated)
-			continue;
-
-		if (!column_in_column_list(att->attnum, columns))
+		if (!logicalrep_should_publish_column(att, columns))
 			continue;
 
 		/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
@@ -1269,3 +1244,31 @@ logicalrep_message_type(LogicalRepMsgType action)
 
 	return err_unknown;
 }
+
+/*
+ * Check if the column 'att' of a table should be published.
+ *
+ * 'columns' represents the column list specified for that table in the
+ * publication.
+ */
+bool
+logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns)
+{
+	if (att->attisdropped)
+		return false;
+
+	/*
+	 * Skip publishing generated columns if they are not included in the
+	 * column list.
+	 */
+	if (!columns && att->attgenerated)
+		return false;
+
+	/*
+	 * Check if a column is covered by a column list.
+	 */
+	if (columns && !bms_is_member(att->attnum, columns))
+		return false;
+
+	return true;
+}
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 00e7024563..f3b4084419 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -766,16 +766,12 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 	{
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 
-		if (att->attisdropped || att->attgenerated)
+		if (!logicalrep_should_publish_column(att, columns))
 			continue;
 
 		if (att->atttypid < FirstGenbkiObjectId)
 			continue;
 
-		/* Skip this attribute if it's not present in the column list */
-		if (columns != NULL && !bms_is_member(att->attnum, columns))
-			continue;
-
 		OutputPluginPrepareWrite(ctx, false);
 		logicalrep_write_typ(ctx->out, xid, att->atttypid);
 		OutputPluginWrite(ctx, false);
@@ -1074,6 +1070,7 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
 					int			i;
 					int			nliveatts = 0;
 					TupleDesc	desc = RelationGetDescr(relation);
+					bool		gencolpresent = false;
 
 					pgoutput_ensure_entry_cxt(data, entry);
 
@@ -1085,17 +1082,26 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
 					{
 						Form_pg_attribute att = TupleDescAttr(desc, i);
 
-						if (att->attisdropped || att->attgenerated)
+						if (att->attisdropped)
+							continue;
+
+						if (att->attgenerated)
+						{
+							if (bms_is_member(att->attnum, cols))
+								gencolpresent = true;
+
 							continue;
+						}
+
 
 						nliveatts++;
 					}
 
 					/*
-					 * If column list includes all the columns of the table,
-					 * set it to NULL.
+					 * If column list includes all the columns of the table
+					 * and there are no generated columns, set it to NULL.
 					 */
-					if (bms_num_members(cols) == nliveatts)
+					if (bms_num_members(cols) == nliveatts && !gencolpresent)
 					{
 						bms_free(cols);
 						cols = NULL;
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index c409638a2e..b219f22655 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -270,5 +270,7 @@ extern void logicalrep_read_stream_abort(StringInfo in,
 										 LogicalRepStreamAbortData *abort_data,
 										 bool read_abort_info);
 extern const char *logicalrep_message_type(LogicalRepMsgType action);
+extern bool logicalrep_should_publish_column(Form_pg_attribute att,
+											 Bitmapset *columns);
 
 #endif							/* LOGICAL_PROTO_H */
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 660245ed0c..2da84964fb 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -687,9 +687,6 @@ UPDATE testpub_tbl5 SET a = 1;
 ERROR:  cannot update table "testpub_tbl5"
 DETAIL:  Column list used by the publication does not cover the replica identity.
 ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
--- error: generated column "d" can't be in list
-ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
-ERROR:  cannot use generated column "d" in publication column list
 -- error: system attributes "ctid" not allowed in column list
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid);
 ERROR:  cannot use system column "ctid" in publication column list
@@ -717,6 +714,9 @@ UPDATE testpub_tbl5 SET a = 1;
 ERROR:  cannot update table "testpub_tbl5"
 DETAIL:  Column list used by the publication does not cover the replica identity.
 ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
+-- ok: generated column "d" can be in the list too
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (d);
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
 -- error: change the replica identity to "b", and column list to (a, c)
 -- then update fails, because (a, c) does not cover replica identity
 ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index f68a5b5986..7f497c8af7 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -413,8 +413,6 @@ ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);
 UPDATE testpub_tbl5 SET a = 1;
 ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
--- error: generated column "d" can't be in list
-ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
 -- error: system attributes "ctid" not allowed in column list
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid);
 ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl1 (id, ctid);
@@ -435,6 +433,10 @@ ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key;
 UPDATE testpub_tbl5 SET a = 1;
 ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
 
+-- ok: generated column "d" can be in the list too
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (d);
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
+
 -- error: change the replica identity to "b", and column list to (a, c)
 -- then update fails, because (a, c) does not cover replica identity
 ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key;
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index 9a97fa5020..d4408c1cd7 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -1202,10 +1202,11 @@ $result = $node_publisher->safe_psql(
 is( $result, qq(t
 t), 'check the number of columns in the old tuple');
 
-# TEST: Generated and dropped columns are not considered for the column list.
-# So, the publication having a column list except for those columns and a
-# publication without any column (aka all columns as part of the columns
-# list) are considered to have the same column list.
+# TEST: Dropped columns are not considered for the column list, and generated
+# columns are not replicated if they are not explicitly included in the column
+# list. So, the publication having a column list except for those columns and a
+# publication without any column (aka all columns as part of the columns list)
+# are considered to have the same column list.
 $node_publisher->safe_psql(
 	'postgres', qq(
 	CREATE TABLE test_mix_4 (a int PRIMARY KEY, b int, c int, d int GENERATED ALWAYS AS (a + 1) STORED);
@@ -1275,6 +1276,33 @@ ok( $stderr =~
 	  qr/cannot use different column lists for table "public.test_mix_1" in different publications/,
 	'different column lists detected');
 
+# TEST: Generated columns are considered for the column list.
+$node_publisher->safe_psql(
+	'postgres', qq(
+	CREATE TABLE test_gen (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a + 1) STORED);
+	CREATE PUBLICATION pub_gen FOR TABLE test_gen (a, b);
+));
+
+$node_subscriber->safe_psql(
+	'postgres', qq(
+	CREATE TABLE test_gen (a int PRIMARY KEY, b int);
+	CREATE SUBSCRIPTION sub_gen CONNECTION '$publisher_connstr' PUBLICATION pub_gen;
+));
+
+$node_subscriber->wait_for_subscription_sync;
+
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO test_gen VALUES (1);
+));
+
+$node_publisher->wait_for_catchup('sub_gen');
+
+is( $node_subscriber->safe_psql(
+		'postgres', "SELECT * FROM test_gen ORDER BY a"),
+	qq(1|2),
+	'replication with generated columns in column list');
+
 # TEST: If the column list is changed after creating the subscription, we
 # should catch the error reported by walsender.
 
-- 
2.34.1

