From 8b25dfd1976b1450bcf75c6b2c126707d221561a Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Fri, 13 Sep 2024 00:37:06 +0530
Subject: [PATCH v31 2/2] Support replication of generated column during
 initial sync

When 'copy_data' is true, during the initial sync, the data is replicated from
the publisher to the subscriber using the COPY command. The normal COPY
command does not copy generated columns, so when 'publish_generated_columns'
is true, we need to copy using the syntax:
'COPY (SELECT column_name FROM table_name) TO STDOUT'.

Summary:

when (publish_generated_columns = true)

* publisher not-generated column => subscriber not-generated column:
This is just normal logical replication (not changed by this patch).

* publisher not-generated column => subscriber generated column: This
will give ERROR.

* publisher generated column => subscriber not-generated column: The
publisher generated column value is copied.

* publisher generated column => subscriber generated column: This
will give ERROR.

when (publish_generated_columns = false)

* publisher not-generated column => subscriber not-generated column:
This is just normal logical replication (not changed by this patch).

* publisher not-generated column => subscriber generated column: This
will give ERROR.

* publisher generated column => subscriber not-generated column:
Publisher generated column is not replicated. The subscriber column
will be filled with the subscriber-side default data.

* publisher generated column => subscriber generated column: Publisher
generated column is not replicated. The subscriber generated column
will be filed with the subscriber-side computed or default data.
---
 doc/src/sgml/ref/create_publication.sgml    |   4 -
 src/backend/replication/logical/relation.c  |   2 +-
 src/backend/replication/logical/tablesync.c | 178 +++++++++++++++++---
 src/include/replication/logicalrelation.h   |   3 +-
 4 files changed, 157 insertions(+), 30 deletions(-)

diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index e133dc30d7..1973857586 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -235,10 +235,6 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
           This option is only available for replicating generated column data from the publisher
           to a regular, non-generated column in the subscriber.
          </para>
-         <para>
-         This parameter can only be set <literal>true</literal> if <literal>copy_data</literal> is
-         set to <literal>false</literal>.
-         </para>
         </listitem>
        </varlistentry>
 
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index f139e7b01e..338b083696 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -205,7 +205,7 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel)
  *
  * Returns -1 if not found.
  */
-static int
+int
 logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
 {
 	int			i;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e03e761392..723c44cf3b 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -118,6 +118,7 @@
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/rel.h"
 #include "utils/rls.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
@@ -693,20 +694,67 @@ process_syncing_tables(XLogRecPtr current_lsn)
 
 /*
  * Create list of columns for COPY based on logical relation mapping.
+ * Exclude columns that are subscription table generated columns.
  */
 static List *
-make_copy_attnamelist(LogicalRepRelMapEntry *rel)
+make_copy_attnamelist(LogicalRepRelMapEntry *rel, bool *remotegenlist)
 {
 	List	   *attnamelist = NIL;
-	int			i;
+	bool	   *localgenlist;
+	TupleDesc	desc;
 
-	for (i = 0; i < rel->remoterel.natts; i++)
+	desc = RelationGetDescr(rel->localrel);
+	localgenlist = palloc0(rel->remoterel.natts * sizeof(bool));
+
+	/*
+	 * This loop checks for generated columns of the subscription table.
+	 */
+	for (int i = 0; i < desc->natts; i++)
 	{
-		attnamelist = lappend(attnamelist,
-							  makeString(rel->remoterel.attnames[i]));
+		int			remote_attnum;
+		Form_pg_attribute attr = TupleDescAttr(desc, i);
+
+		if (!attr->attgenerated)
+			continue;
+
+		remote_attnum = logicalrep_rel_att_by_name(&rel->remoterel,
+												   NameStr(attr->attname));
+
+		if (remote_attnum >= 0)
+		{
+			/*
+			 * Check if the subscription table generated column has same name
+			 * as a non-generated column in the corresponding publication
+			 * table.
+			 */
+			if (!remotegenlist[remote_attnum])
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("logical replication target relation \"%s.%s\" has a generated column \"%s\" "
+								"but corresponding column on source relation is not a generated column",
+								rel->remoterel.nspname, rel->remoterel.relname, NameStr(attr->attname))));
+
+			/*
+			 * 'localgenlist' records that this is a generated column in the
+			 * subscription table. Later, we use this information to skip
+			 * adding this column to the column list for COPY.
+			 */
+			localgenlist[remote_attnum] = true;
+		}
 	}
 
+	/*
+	 * Construct column list for COPY, excluding columns that are subscription
+	 * table generated columns.
+	 */
+	for (int i = 0; i < rel->remoterel.natts; i++)
+	{
+		if (!localgenlist[i])
+			attnamelist = lappend(attnamelist,
+								  makeString(rel->remoterel.attnames[i]));
+	}
 
+	pfree(localgenlist);
 	return attnamelist;
 }
 
@@ -791,19 +839,22 @@ copy_read_data(void *outbuf, int minread, int maxread)
  * qualifications to be used in the COPY command.
  */
 static void
-fetch_remote_table_info(char *nspname, char *relname,
+fetch_remote_table_info(char *nspname, char *relname, bool **remotegenlist_res,
 						LogicalRepRelation *lrel, List **qual)
 {
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
 	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID};
-	Oid			attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
+	Oid			attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
 	Oid			qualRow[] = {TEXTOID};
 	bool		isnull;
+	bool	   *remotegenlist;
+	bool		hasgencolpub;
 	int			natt;
 	ListCell   *lc;
 	Bitmapset  *included_cols = NULL;
+	int			server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
 
 	lrel->nspname = nspname;
 	lrel->relname = relname;
@@ -846,12 +897,13 @@ fetch_remote_table_info(char *nspname, char *relname,
 
 
 	/*
-	 * Get column lists for each relation.
+	 * Get column lists for each relation and check if any of the publication
+	 * has generated column option.
 	 *
 	 * We need to do this before fetching info about column names and types,
 	 * so that we can skip columns that should not be replicated.
 	 */
-	if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+	if (server_version >= 150000)
 	{
 		WalRcvExecResult *pubres;
 		TupleTableSlot *tslot;
@@ -937,6 +989,40 @@ fetch_remote_table_info(char *nspname, char *relname,
 
 		walrcv_clear_result(pubres);
 
+		/* Check if any of the publication has generated column option */
+		if (server_version >= 180000)
+		{
+			WalRcvExecResult *gencolres;
+			Oid			gencolsRow[] = {BOOLOID};
+
+			resetStringInfo(&cmd);
+			appendStringInfo(&cmd,
+							 "SELECT count(*) > 0 FROM pg_catalog.pg_publication "
+							 "WHERE pubname IN ( %s ) AND pubgencolumns = 't'",
+							 pub_names.data);
+
+			gencolres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+									lengthof(gencolsRow), gencolsRow);
+			if (gencolres->status != WALRCV_OK_TUPLES)
+				ereport(ERROR,
+						errcode(ERRCODE_CONNECTION_FAILURE),
+						errmsg("could not fetch gencolumns information from publication list: %s",
+							   pub_names.data));
+
+			tslot = MakeSingleTupleTableSlot(gencolres->tupledesc, &TTSOpsMinimalTuple);
+			if (!tuplestore_gettupleslot(gencolres->tuplestore, true, false, tslot))
+				ereport(ERROR,
+						errcode(ERRCODE_UNDEFINED_OBJECT),
+						errmsg("failed to fetch tuple for gencols from publication list: %s",
+							   pub_names.data));
+
+			hasgencolpub = DatumGetBool(slot_getattr(tslot, 1, &isnull));
+			Assert(!isnull);
+
+			ExecClearTuple(tslot);
+			walrcv_clear_result(gencolres);
+		}
+
 		pfree(pub_names.data);
 	}
 
@@ -948,20 +1034,33 @@ fetch_remote_table_info(char *nspname, char *relname,
 					 "SELECT a.attnum,"
 					 "       a.attname,"
 					 "       a.atttypid,"
-					 "       a.attnum = ANY(i.indkey)"
+					 "       a.attnum = ANY(i.indkey)");
+
+	if (server_version >= 180000)
+		appendStringInfo(&cmd, ", a.attgenerated != ''");
+
+	appendStringInfo(&cmd,
 					 "  FROM pg_catalog.pg_attribute a"
 					 "  LEFT JOIN pg_catalog.pg_index i"
 					 "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
 					 " WHERE a.attnum > 0::pg_catalog.int2"
-					 "   AND NOT a.attisdropped %s"
+					 "   AND NOT a.attisdropped", lrel->remoteid);
+
+	if (server_version >= 120000)
+	{
+		bool		gencols_allowed = server_version >= 180000 && hasgencolpub;
+
+		if (!gencols_allowed)
+			appendStringInfo(&cmd, " AND a.attgenerated = ''");
+	}
+
+	appendStringInfo(&cmd,
 					 "   AND a.attrelid = %u"
 					 " ORDER BY a.attnum",
-					 lrel->remoteid,
-					 (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
-					  "AND a.attgenerated = ''" : ""),
 					 lrel->remoteid);
+
 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
-					  lengthof(attrRow), attrRow);
+					  server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
 
 	if (res->status != WALRCV_OK_TUPLES)
 		ereport(ERROR,
@@ -973,6 +1072,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
 	lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
 	lrel->attkeys = NULL;
+	remotegenlist = palloc0(MaxTupleAttributeNumber * sizeof(bool));
 
 	/*
 	 * Store the columns as a list of names.  Ignore those that are not
@@ -1005,6 +1105,9 @@ fetch_remote_table_info(char *nspname, char *relname,
 		if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
 			lrel->attkeys = bms_add_member(lrel->attkeys, natt);
 
+		if (server_version >= 120000)
+			remotegenlist[natt] = DatumGetBool(slot_getattr(slot, 5, &isnull));
+
 		/* Should never happen. */
 		if (++natt >= MaxTupleAttributeNumber)
 			elog(ERROR, "too many columns in remote table \"%s.%s\"",
@@ -1015,7 +1118,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	ExecDropSingleTupleTableSlot(slot);
 
 	lrel->natts = natt;
-
+	*remotegenlist_res = remotegenlist;
 	walrcv_clear_result(res);
 
 	/*
@@ -1037,7 +1140,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	 * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
 	 * that includes this relation
 	 */
-	if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+	if (server_version >= 150000)
 	{
 		StringInfoData pub_names;
 
@@ -1123,10 +1226,13 @@ copy_table(Relation rel)
 	List	   *attnamelist;
 	ParseState *pstate;
 	List	   *options = NIL;
+	bool	   *remotegenlist;
+	bool		gencol_copy_needed = false;
 
 	/* Get the publisher relation info. */
 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
-							RelationGetRelationName(rel), &lrel, &qual);
+							RelationGetRelationName(rel), &remotegenlist,
+							&lrel, &qual);
 
 	/* Put the relation into relmap. */
 	logicalrep_relmap_update(&lrel);
@@ -1135,11 +1241,29 @@ copy_table(Relation rel)
 	relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
 	Assert(rel == relmapentry->localrel);
 
+	attnamelist = make_copy_attnamelist(relmapentry, remotegenlist);
+
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
 
-	/* Regular table with no row filter */
-	if (lrel.relkind == RELKIND_RELATION && qual == NIL)
+	/*
+	 * Check if the remote table has any generated columns that should be
+	 * copied.
+	 */
+	for (int i = 0; i < relmapentry->remoterel.natts; i++)
+	{
+		if (remotegenlist[i])
+		{
+			gencol_copy_needed = true;
+			break;
+		}
+	}
+
+	/*
+	 * Regular table with no row filter and copy of generated columns is not
+	 * necessary.
+	 */
+	if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_copy_needed)
 	{
 		appendStringInfo(&cmd, "COPY %s",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
@@ -1173,13 +1297,20 @@ copy_table(Relation rel)
 		 * (SELECT ...), but we can't just do SELECT * because we need to not
 		 * copy generated columns. For tables with any row filters, build a
 		 * SELECT query with OR'ed row filters for COPY.
+		 *
+		 * We also need to use this same COPY (SELECT ...) syntax when
+		 * 'publish_generated_columns' is specified as true and the remote
+		 * table has generated columns, because copy of generated columns is
+		 * not supported by the normal COPY.
 		 */
+		int			i = 0;
+
 		appendStringInfoString(&cmd, "COPY (SELECT ");
-		for (int i = 0; i < lrel.natts; i++)
+		foreach_node(String, att_name, attnamelist)
 		{
-			appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
-			if (i < lrel.natts - 1)
+			if (i++)
 				appendStringInfoString(&cmd, ", ");
+			appendStringInfoString(&cmd, quote_identifier(strVal(att_name)));
 		}
 
 		appendStringInfoString(&cmd, " FROM ");
@@ -1237,7 +1368,6 @@ copy_table(Relation rel)
 	(void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
 										 NULL, false, false);
 
-	attnamelist = make_copy_attnamelist(relmapentry);
 	cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
 
 	/* Do the copy */
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index e687b40a56..8cdb7affbf 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -41,7 +41,8 @@ typedef struct LogicalRepRelMapEntry
 
 extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
 extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel);
-
+extern int	logicalrep_rel_att_by_name(LogicalRepRelation *remoterel,
+									   const char *attname);
 extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
 												  LOCKMODE lockmode);
 extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root,
-- 
2.41.0.windows.3

