From 6346e91f251d6f237c805d4bcd8a82d3226100b4 Mon Sep 17 00:00:00 2001
From: "houzj.fnst" <houzj.fnst@cn.fujitsu.com>
Date: Sat, 11 Jun 2022 14:12:39 +0800
Subject: [PATCH] Check partition table replica identity on subscriber

In logical replication, we will check if the target table on subscriber is
updatable by comparing the replica identity of the table on publisher with the
table on subscriber. When the target table is a partitioned table, we should
check the replica identity key of target partition, instead of the partitioned
table.


Author: Shi yu, Hou Zhijie
---
 src/backend/replication/logical/relation.c | 121 +++++++++++++++++------------
 src/backend/replication/logical/worker.c   |  20 +++--
 src/test/subscription/t/013_partition.pl   |  14 ++++
 3 files changed, 98 insertions(+), 57 deletions(-)

diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 5928d13..6901ba5 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -295,6 +295,72 @@ logicalrep_report_missing_attrs(LogicalRepRelation *remoterel,
 }
 
 /*
+ * Check if replica identity matches and mark the updatable flag.
+ *
+ * We allow for stricter replica identity (fewer columns) on subscriber as
+ * that will not stop us from finding unique tuple. IE, if publisher has
+ * identity (id,timestamp) and subscriber just (id) this will not be a
+ * problem, but in the opposite scenario it will.
+ *
+ * Don't throw any error here just mark the relation entry as not updatable,
+ * as replica identity is only for updates and deletes but inserts can be
+ * replicated even without it.
+ */
+static void
+logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry)
+{
+	Bitmapset  *idkey;
+	LogicalRepRelation *remoterel = &entry->remoterel;
+	int			i;
+
+	entry->updatable = true;
+
+	/*
+	 * If it is a partitioned table, we don't check it, we will check its
+	 * partition later.
+	 */
+	if (entry->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		return;
+
+	idkey = RelationGetIndexAttrBitmap(entry->localrel,
+									   INDEX_ATTR_BITMAP_IDENTITY_KEY);
+	/* fallback to PK if no replica identity */
+	if (idkey == NULL)
+	{
+		idkey = RelationGetIndexAttrBitmap(entry->localrel,
+										   INDEX_ATTR_BITMAP_PRIMARY_KEY);
+		/*
+		 * If no replica identity index and no PK, the published table
+		 * must have replica identity FULL.
+		 */
+		if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
+			entry->updatable = false;
+	}
+
+	i = -1;
+	while ((i = bms_next_member(idkey, i)) >= 0)
+	{
+		int			attnum = i + FirstLowInvalidHeapAttributeNumber;
+
+		if (!AttrNumberIsForUserDefinedAttr(attnum))
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("logical replication target relation \"%s.%s\" uses "
+							"system columns in REPLICA IDENTITY index",
+							remoterel->nspname, remoterel->relname)));
+
+		attnum = AttrNumberGetAttrOffset(attnum);
+
+		if (entry->attrmap->attnums[attnum] < 0 ||
+			!bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
+		{
+			entry->updatable = false;
+			break;
+		}
+	}
+}
+
+/*
  * Open the local relation associated with the remote one.
  *
  * Rebuilds the Relcache mapping if it was invalidated by local DDL.
@@ -352,7 +418,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 	if (!entry->localrelvalid)
 	{
 		Oid			relid;
-		Bitmapset  *idkey;
 		TupleDesc	desc;
 		MemoryContext oldctx;
 		int			i;
@@ -410,55 +475,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 		/* be tidy */
 		bms_free(missingatts);
 
-		/*
-		 * Check that replica identity matches. We allow for stricter replica
-		 * identity (fewer columns) on subscriber as that will not stop us
-		 * from finding unique tuple. IE, if publisher has identity
-		 * (id,timestamp) and subscriber just (id) this will not be a problem,
-		 * but in the opposite scenario it will.
-		 *
-		 * Don't throw any error here just mark the relation entry as not
-		 * updatable, as replica identity is only for updates and deletes but
-		 * inserts can be replicated even without it.
-		 */
-		entry->updatable = true;
-		idkey = RelationGetIndexAttrBitmap(entry->localrel,
-										   INDEX_ATTR_BITMAP_IDENTITY_KEY);
-		/* fallback to PK if no replica identity */
-		if (idkey == NULL)
-		{
-			idkey = RelationGetIndexAttrBitmap(entry->localrel,
-											   INDEX_ATTR_BITMAP_PRIMARY_KEY);
-
-			/*
-			 * If no replica identity index and no PK, the published table
-			 * must have replica identity FULL.
-			 */
-			if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
-				entry->updatable = false;
-		}
-
-		i = -1;
-		while ((i = bms_next_member(idkey, i)) >= 0)
-		{
-			int			attnum = i + FirstLowInvalidHeapAttributeNumber;
-
-			if (!AttrNumberIsForUserDefinedAttr(attnum))
-				ereport(ERROR,
-						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-						 errmsg("logical replication target relation \"%s.%s\" uses "
-								"system columns in REPLICA IDENTITY index",
-								remoterel->nspname, remoterel->relname)));
-
-			attnum = AttrNumberGetAttrOffset(attnum);
-
-			if (entry->attrmap->attnums[attnum] < 0 ||
-				!bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
-			{
-				entry->updatable = false;
-				break;
-			}
-		}
+		/* Check that replica identity matches. */
+		logicalrep_rel_mark_updatable(entry);
 
 		entry->localrelvalid = true;
 	}
@@ -641,7 +659,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 			   attrmap->maplen * sizeof(AttrNumber));
 	}
 
-	entry->updatable = root->updatable;
+	/* Check that replica identity matches. */
+	logicalrep_rel_mark_updatable(entry);
 
 	entry->localrelvalid = true;
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 81ce2e9..a1d0537 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2124,6 +2124,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 	TupleTableSlot *remoteslot_part;
 	TupleConversionMap *map;
 	MemoryContext oldctx;
+	LogicalRepRelMapEntry *part_entry;
+	AttrMap	   *attrmap = NULL;
 
 	/* ModifyTableState is needed for ExecFindPartition(). */
 	edata->mtstate = mtstate = makeNode(ModifyTableState);
@@ -2155,8 +2157,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 		remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
 	map = partrelinfo->ri_RootToPartitionMap;
 	if (map != NULL)
-		remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
+	{
+		attrmap = map->attrMap;
+		remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
 												remoteslot_part);
+	}
 	else
 	{
 		remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
@@ -2164,6 +2169,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 	}
 	MemoryContextSwitchTo(oldctx);
 
+	/* Check if we can do the update or delete on the leaf partition. */
+	if(operation == CMD_UPDATE || operation == CMD_DELETE)
+	{
+		part_entry = logicalrep_partition_open(relmapentry, partrel,
+											   attrmap);
+		check_relation_updatable(part_entry);
+	}
+
 	switch (operation)
 	{
 		case CMD_INSERT:
@@ -2185,15 +2198,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 			 * suitable partition.
 			 */
 			{
-				AttrMap    *attrmap = map ? map->attrMap : NULL;
-				LogicalRepRelMapEntry *part_entry;
 				TupleTableSlot *localslot;
 				ResultRelInfo *partrelinfo_new;
 				bool		found;
 
-				part_entry = logicalrep_partition_open(relmapentry, partrel,
-													   attrmap);
-
 				/* Get the matching local tuple from the partition. */
 				found = FindReplTupleInLocalRel(estate, partrel,
 												&part_entry->remoterel,
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index daff675..84b436f 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -868,4 +868,18 @@ $result = $node_subscriber2->safe_psql('postgres',
 	"SELECT a, b, c FROM tab5 ORDER BY 1");
 is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on subscriber');
 
+# Alter REPLICA IDENTITY on subscriber.
+# No REPLICA IDENTITY in the partitioned table on subscriber, but what we check
+# is the partition, so it works fine.
+$node_subscriber2->safe_psql('postgres',
+	"ALTER TABLE tab5 REPLICA IDENTITY NOTHING");
+
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 4 WHERE a = 3");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT a, b, c FROM tab5_1 ORDER BY 1");
+is($result, qq(4||1), 'updates of tab5 replicated correctly');
+
 done_testing();
-- 
2.7.2.windows.1

