From c73f7262ffef170a0cf02a162bbc934a2e94817a Mon Sep 17 00:00:00 2001
From: "shiy.fnst" <shiy.fnst@fujitsu.com>
Date: Wed, 8 Jun 2022 11:11:44 +0800
Subject: [PATCH v1 2/2] Check partition table replica identity on subscriber

In logical replication, we will check if the target table on subscriber is
updatable. When the target table is a partitioned table, we should check the
target partition, instead of the partitioned table.

Author: Shi yu
---
 src/backend/replication/logical/relation.c | 114 +++++++++++----------
 src/backend/replication/logical/worker.c   |  27 +++--
 2 files changed, 83 insertions(+), 58 deletions(-)

diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 9cc94067d5..190d6d25aa 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -299,6 +299,64 @@ logicalrep_report_missing_attrs(LogicalRepRelation *remoterel,
 	}
 }
 
+/*
+ * Check that replica identity matches.
+ *
+ * We allow for stricter replica identity (fewer columns) on subscriber as
+ * that will not stop us from finding unique tuple. IE, if publisher has
+ * identity (id,timestamp) and subscriber just (id) this will not be a
+ * problem, but in the opposite scenario it will.
+ *
+ * Don't throw any error here just mark the relation entry as not updatable,
+ * as replica identity is only for updates and deletes but inserts can be
+ * replicated even without it.
+ */
+static void
+logicalrep_check_updatable(LogicalRepRelMapEntry *entry)
+{
+	Bitmapset  *idkey;
+	LogicalRepRelation *remoterel = &entry->remoterel;
+	int			i;
+
+	entry->updatable = true;
+	idkey = RelationGetIndexAttrBitmap(entry->localrel,
+									   INDEX_ATTR_BITMAP_IDENTITY_KEY);
+	/* fallback to PK if no replica identity */
+	if (idkey == NULL)
+	{
+		idkey = RelationGetIndexAttrBitmap(entry->localrel,
+										   INDEX_ATTR_BITMAP_PRIMARY_KEY);
+		/*
+		 * If no replica identity index and no PK, the published table
+		 * must have replica identity FULL.
+		 */
+		if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
+			entry->updatable = false;
+	}
+
+	i = -1;
+	while ((i = bms_next_member(idkey, i)) >= 0)
+	{
+		int			attnum = i + FirstLowInvalidHeapAttributeNumber;
+
+		if (!AttrNumberIsForUserDefinedAttr(attnum))
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("logical replication target relation \"%s.%s\" uses "
+							"system columns in REPLICA IDENTITY index",
+							remoterel->nspname, remoterel->relname)));
+
+		attnum = AttrNumberGetAttrOffset(attnum);
+
+		if (entry->attrmap->attnums[attnum] < 0 ||
+			!bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
+		{
+			entry->updatable = false;
+			break;
+		}
+	}
+}
+
 /*
  * Open the local relation associated with the remote one.
  *
@@ -357,7 +415,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 	if (!entry->localrelvalid)
 	{
 		Oid			relid;
-		Bitmapset  *idkey;
 		TupleDesc	desc;
 		MemoryContext oldctx;
 		int			i;
@@ -415,55 +472,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 		/* be tidy */
 		bms_free(missingatts);
 
-		/*
-		 * Check that replica identity matches. We allow for stricter replica
-		 * identity (fewer columns) on subscriber as that will not stop us
-		 * from finding unique tuple. IE, if publisher has identity
-		 * (id,timestamp) and subscriber just (id) this will not be a problem,
-		 * but in the opposite scenario it will.
-		 *
-		 * Don't throw any error here just mark the relation entry as not
-		 * updatable, as replica identity is only for updates and deletes but
-		 * inserts can be replicated even without it.
-		 */
-		entry->updatable = true;
-		idkey = RelationGetIndexAttrBitmap(entry->localrel,
-										   INDEX_ATTR_BITMAP_IDENTITY_KEY);
-		/* fallback to PK if no replica identity */
-		if (idkey == NULL)
-		{
-			idkey = RelationGetIndexAttrBitmap(entry->localrel,
-											   INDEX_ATTR_BITMAP_PRIMARY_KEY);
-
-			/*
-			 * If no replica identity index and no PK, the published table
-			 * must have replica identity FULL.
-			 */
-			if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
-				entry->updatable = false;
-		}
-
-		i = -1;
-		while ((i = bms_next_member(idkey, i)) >= 0)
-		{
-			int			attnum = i + FirstLowInvalidHeapAttributeNumber;
-
-			if (!AttrNumberIsForUserDefinedAttr(attnum))
-				ereport(ERROR,
-						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-						 errmsg("logical replication target relation \"%s.%s\" uses "
-								"system columns in REPLICA IDENTITY index",
-								remoterel->nspname, remoterel->relname)));
-
-			attnum = AttrNumberGetAttrOffset(attnum);
-
-			if (entry->attrmap->attnums[attnum] < 0 ||
-				!bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
-			{
-				entry->updatable = false;
-				break;
-			}
-		}
+		/* Check that replica identity matches. */
+		logicalrep_check_updatable(entry);
 
 		entry->localrelvalid = true;
 	}
@@ -584,7 +594,6 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 	Oid			partOid = RelationGetRelid(partrel);
 	AttrMap    *attrmap = root->attrmap;
 	bool		found;
-	int			i;
 	MemoryContext oldctx;
 
 	if (LogicalRepPartMap == NULL)
@@ -648,7 +657,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 			   attrmap->maplen * sizeof(AttrNumber));
 	}
 
-	entry->updatable = root->updatable;
+	/* Check that replica identity matches. */
+	logicalrep_check_updatable(entry);
 
 	entry->localrelvalid = true;
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fc210a9e7b..4eee9c7bb6 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1735,6 +1735,13 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
 static void
 check_relation_updatable(LogicalRepRelMapEntry *rel)
 {
+	/*
+	 * If it is a partitioned table, we don't check it, we will check its
+	 * partition later.
+	 */
+	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		return;
+
 	/* Updatable, no error. */
 	if (rel->updatable)
 		return;
@@ -2118,6 +2125,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 	TupleTableSlot *remoteslot_part;
 	TupleConversionMap *map;
 	MemoryContext oldctx;
+	LogicalRepRelMapEntry *part_entry;
+	AttrMap	   *attrmap = NULL;
 
 	/* ModifyTableState is needed for ExecFindPartition(). */
 	edata->mtstate = mtstate = makeNode(ModifyTableState);
@@ -2149,8 +2158,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 		remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
 	map = partrelinfo->ri_RootToPartitionMap;
 	if (map != NULL)
-		remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
+	{
+		attrmap = map->attrMap;
+		remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
 												remoteslot_part);
+	}
 	else
 	{
 		remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
@@ -2158,6 +2170,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 	}
 	MemoryContextSwitchTo(oldctx);
 
+	/* Check if we can do the update or delete. */
+	if(operation == CMD_UPDATE || operation == CMD_DELETE)
+	{
+		part_entry = logicalrep_partition_open(relmapentry, partrel,
+											   attrmap);
+		check_relation_updatable(part_entry);
+	}
+
 	switch (operation)
 	{
 		case CMD_INSERT:
@@ -2179,15 +2199,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 			 * suitable partition.
 			 */
 			{
-				AttrMap    *attrmap = map ? map->attrMap : NULL;
-				LogicalRepRelMapEntry *part_entry;
 				TupleTableSlot *localslot;
 				ResultRelInfo *partrelinfo_new;
 				bool		found;
 
-				part_entry = logicalrep_partition_open(relmapentry, partrel,
-													   attrmap);
-
 				/* Get the matching local tuple from the partition. */
 				found = FindReplTupleInLocalRel(estate, partrel,
 												&part_entry->remoterel,
-- 
2.18.4

