From 954965d2fd554687e9b9b792ded843c719112d00 Mon Sep 17 00:00:00 2001
From: "houzj.fnst" <houzj.fnst@cn.fujitsu.com>
Date: Mon, 13 Jun 2022 14:40:58 +0800
Subject: [PATCH v6 3/4] Check partition table replica identity on subscriber

In logical replication, we will check if the target table on subscriber is
updatable by comparing the replica identity of the table on publisher with the
table on subscriber. When the target table is a partitioned table, we should
check the replica identity key of target partition, instead of the partitioned
table.
---
 src/backend/replication/logical/relation.c | 121 ++++++++++++---------
 src/backend/replication/logical/worker.c   |  20 +++-
 src/test/subscription/t/013_partition.pl   |  14 +++
 3 files changed, 98 insertions(+), 57 deletions(-)

diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index b7b740361c..1a9ed664dd 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -249,6 +249,72 @@ logicalrep_report_missing_attrs(LogicalRepRelation *remoterel,
 	}
 }
 
+/*
+ * Check if replica identity matches and mark the updatable flag.
+ *
+ * We allow for stricter replica identity (fewer columns) on subscriber as
+ * that will not stop us from finding unique tuple. IE, if publisher has
+ * identity (id,timestamp) and subscriber just (id) this will not be a
+ * problem, but in the opposite scenario it will.
+ *
+ * Don't throw any error here just mark the relation entry as not updatable,
+ * as replica identity is only for updates and deletes but inserts can be
+ * replicated even without it.
+ */
+static void
+logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry)
+{
+	Bitmapset  *idkey;
+	LogicalRepRelation *remoterel = &entry->remoterel;
+	int			i;
+
+	entry->updatable = true;
+
+	/*
+	 * If it is a partitioned table, we don't check it, we will check its
+	 * partition later.
+	 */
+	if (entry->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		return;
+
+	idkey = RelationGetIndexAttrBitmap(entry->localrel,
+									   INDEX_ATTR_BITMAP_IDENTITY_KEY);
+	/* fallback to PK if no replica identity */
+	if (idkey == NULL)
+	{
+		idkey = RelationGetIndexAttrBitmap(entry->localrel,
+										   INDEX_ATTR_BITMAP_PRIMARY_KEY);
+		/*
+		 * If no replica identity index and no PK, the published table
+		 * must have replica identity FULL.
+		 */
+		if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
+			entry->updatable = false;
+	}
+
+	i = -1;
+	while ((i = bms_next_member(idkey, i)) >= 0)
+	{
+		int			attnum = i + FirstLowInvalidHeapAttributeNumber;
+
+		if (!AttrNumberIsForUserDefinedAttr(attnum))
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("logical replication target relation \"%s.%s\" uses "
+							"system columns in REPLICA IDENTITY index",
+							remoterel->nspname, remoterel->relname)));
+
+		attnum = AttrNumberGetAttrOffset(attnum);
+
+		if (entry->attrmap->attnums[attnum] < 0 ||
+			!bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
+		{
+			entry->updatable = false;
+			break;
+		}
+	}
+}
+
 /*
  * Open the local relation associated with the remote one.
  *
@@ -307,7 +373,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 	if (!entry->localrelvalid)
 	{
 		Oid			relid;
-		Bitmapset  *idkey;
 		TupleDesc	desc;
 		MemoryContext oldctx;
 		int			i;
@@ -365,55 +430,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 		/* be tidy */
 		bms_free(missingatts);
 
-		/*
-		 * Check that replica identity matches. We allow for stricter replica
-		 * identity (fewer columns) on subscriber as that will not stop us
-		 * from finding unique tuple. IE, if publisher has identity
-		 * (id,timestamp) and subscriber just (id) this will not be a problem,
-		 * but in the opposite scenario it will.
-		 *
-		 * Don't throw any error here just mark the relation entry as not
-		 * updatable, as replica identity is only for updates and deletes but
-		 * inserts can be replicated even without it.
-		 */
-		entry->updatable = true;
-		idkey = RelationGetIndexAttrBitmap(entry->localrel,
-										   INDEX_ATTR_BITMAP_IDENTITY_KEY);
-		/* fallback to PK if no replica identity */
-		if (idkey == NULL)
-		{
-			idkey = RelationGetIndexAttrBitmap(entry->localrel,
-											   INDEX_ATTR_BITMAP_PRIMARY_KEY);
-
-			/*
-			 * If no replica identity index and no PK, the published table
-			 * must have replica identity FULL.
-			 */
-			if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
-				entry->updatable = false;
-		}
-
-		i = -1;
-		while ((i = bms_next_member(idkey, i)) >= 0)
-		{
-			int			attnum = i + FirstLowInvalidHeapAttributeNumber;
-
-			if (!AttrNumberIsForUserDefinedAttr(attnum))
-				ereport(ERROR,
-						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-						 errmsg("logical replication target relation \"%s.%s\" uses "
-								"system columns in REPLICA IDENTITY index",
-								remoterel->nspname, remoterel->relname)));
-
-			attnum = AttrNumberGetAttrOffset(attnum);
-
-			if (entry->attrmap->attnums[attnum] < 0 ||
-				!bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
-			{
-				entry->updatable = false;
-				break;
-			}
-		}
+		/* Check that replica identity matches. */
+		logicalrep_rel_mark_updatable(entry);
 
 		entry->localrelvalid = true;
 	}
@@ -651,7 +669,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 			   attrmap->maplen * sizeof(AttrNumber));
 	}
 
-	entry->updatable = root->updatable;
+	/* Check that replica identity matches. */
+	logicalrep_rel_mark_updatable(entry);
 
 	entry->localrelvalid = true;
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7c28da3714..c57f8d678b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2123,6 +2123,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 	TupleTableSlot *remoteslot_part;
 	TupleConversionMap *map;
 	MemoryContext oldctx;
+	LogicalRepRelMapEntry *part_entry = NULL;
+	AttrMap	   *attrmap = NULL;
 
 	/* ModifyTableState is needed for ExecFindPartition(). */
 	edata->mtstate = mtstate = makeNode(ModifyTableState);
@@ -2154,8 +2156,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 		remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
 	map = partrelinfo->ri_RootToPartitionMap;
 	if (map != NULL)
-		remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
+	{
+		attrmap = map->attrMap;
+		remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
 												remoteslot_part);
+	}
 	else
 	{
 		remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
@@ -2163,6 +2168,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 	}
 	MemoryContextSwitchTo(oldctx);
 
+	/* Check if we can do the update or delete on the leaf partition. */
+	if(operation == CMD_UPDATE || operation == CMD_DELETE)
+	{
+		part_entry = logicalrep_partition_open(relmapentry, partrel,
+											   attrmap);
+		check_relation_updatable(part_entry);
+	}
+
 	switch (operation)
 	{
 		case CMD_INSERT:
@@ -2184,15 +2197,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 			 * suitable partition.
 			 */
 			{
-				AttrMap    *attrmap = map ? map->attrMap : NULL;
-				LogicalRepRelMapEntry *part_entry;
 				TupleTableSlot *localslot;
 				ResultRelInfo *partrelinfo_new;
 				bool		found;
 
-				part_entry = logicalrep_partition_open(relmapentry, partrel,
-													   attrmap);
-
 				/* Get the matching local tuple from the partition. */
 				found = FindReplTupleInLocalRel(estate, partrel,
 												&part_entry->remoterel,
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 071a9d7b27..a1be3bc7c4 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -867,4 +867,18 @@ $result = $node_subscriber2->safe_psql('postgres',
 	"SELECT a, b, c FROM tab5 ORDER BY 1");
 is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on publisher');
 
+# Alter REPLICA IDENTITY on subscriber.
+# No REPLICA IDENTITY in the partitioned table on subscriber, but what we check
+# is the partition, so it works fine.
+$node_subscriber2->safe_psql('postgres',
+	"ALTER TABLE tab5 REPLICA IDENTITY NOTHING");
+
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 4 WHERE a = 3");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT a, b, c FROM tab5_1 ORDER BY 1");
+is($result, qq(4||1), 'updates of tab5 replicated correctly');
+
 done_testing();
-- 
2.24.0.windows.2

