From d5d2dbd9f28db3a66a38cba76710d2b5679b09c2 Mon Sep 17 00:00:00 2001
From: Amit Langote <amitlangote09@gmail.com>
Date: Thu, 23 Jan 2020 11:49:01 +0900
Subject: [PATCH v14 2/3] Add subscription support to replicate into
 partitioned tables

Mainly, this adds support code in logical/worker.c for applying
replicated operations whose target is a partitioned table to its
relevant partitions.
---
 src/backend/executor/execReplication.c      |  14 +-
 src/backend/replication/logical/relation.c  | 167 ++++++++++++++++
 src/backend/replication/logical/tablesync.c |   1 -
 src/backend/replication/logical/worker.c    | 298 +++++++++++++++++++++++++++-
 src/include/replication/logicalrelation.h   |   2 +
 src/test/subscription/t/013_partition.pl    |  31 ++-
 6 files changed, 486 insertions(+), 27 deletions(-)

diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 7194bec..dc8a01a 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -594,17 +594,9 @@ CheckSubscriptionRelkind(char relkind, const char *nspname,
 						 const char *relname)
 {
 	/*
-	 * We currently only support writing to regular tables.  However, give a
-	 * more specific error for partitioned and foreign tables.
+	 * Give a more specific error for foreign tables.
 	 */
-	if (relkind == RELKIND_PARTITIONED_TABLE)
-		ereport(ERROR,
-				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
-						nspname, relname),
-				 errdetail("\"%s.%s\" is a partitioned table.",
-						   nspname, relname)));
-	else if (relkind == RELKIND_FOREIGN_TABLE)
+	if (relkind == RELKIND_FOREIGN_TABLE)
 		ereport(ERROR,
 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
 				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
@@ -612,7 +604,7 @@ CheckSubscriptionRelkind(char relkind, const char *nspname,
 				 errdetail("\"%s.%s\" is a foreign table.",
 						   nspname, relname)));
 
-	if (relkind != RELKIND_RELATION)
+	if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
 		ereport(ERROR,
 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
 				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 3d7291b..6b88bde 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -34,6 +34,7 @@ static MemoryContext LogicalRepRelMapContext = NULL;
 
 static HTAB *LogicalRepRelMap = NULL;
 static HTAB *LogicalRepTypMap = NULL;
+static HTAB *LogicalRepPartMap = NULL;
 
 
 /*
@@ -472,3 +473,169 @@ logicalrep_typmap_gettypname(Oid remoteid)
 	Assert(OidIsValid(entry->remoteid));
 	return psprintf("%s.%s", entry->nspname, entry->typname);
 }
+
+/*
+ * Partition cache: look up partition LogicalRepRelMapEntry's
+ *
+ * Unlike relation map cache, this is keyed by partition OID, not remote
+ * relation OID, because we only have to use this cache in the case where
+ * partitions are not directly mapped to any remote relation, such as when
+ * replication is occurring with one of their ancestors as target.
+ */
+
+/*
+ * Relcache invalidation callback
+ */
+static void
+logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
+{
+	LogicalRepRelMapEntry *entry;
+
+	/* Just to be sure. */
+	if (LogicalRepPartMap == NULL)
+		return;
+
+	if (reloid != InvalidOid)
+	{
+		HASH_SEQ_STATUS status;
+
+		hash_seq_init(&status, LogicalRepPartMap);
+
+		/* TODO, use inverse lookup hashtable? */
+		while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+		{
+			if (entry->localreloid == reloid)
+			{
+				entry->localreloid = InvalidOid;
+				hash_seq_term(&status);
+				break;
+			}
+		}
+	}
+	else
+	{
+		/* invalidate all cache entries */
+		HASH_SEQ_STATUS status;
+
+		hash_seq_init(&status, LogicalRepPartMap);
+
+		while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+			entry->localreloid = InvalidOid;
+	}
+}
+
+/*
+ * Initialize the partition map cache.
+ */
+static void
+logicalrep_partmap_init(void)
+{
+	HASHCTL		ctl;
+
+	if (!LogicalRepRelMapContext)
+		LogicalRepRelMapContext =
+			AllocSetContextCreate(CacheMemoryContext,
+								  "LogicalRepPartMapContext",
+								  ALLOCSET_DEFAULT_SIZES);
+
+	/* Initialize the relation hash table. */
+	MemSet(&ctl, 0, sizeof(ctl));
+	ctl.keysize = sizeof(Oid);	/* partition OID */
+	ctl.entrysize = sizeof(LogicalRepRelMapEntry);
+	ctl.hcxt = LogicalRepRelMapContext;
+
+	LogicalRepPartMap = hash_create("logicalrep partition map cache", 64, &ctl,
+								   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+	/* Watch for invalidation events. */
+	CacheRegisterRelcacheCallback(logicalrep_partmap_invalidate_cb,
+								  (Datum) 0);
+}
+
+/*
+ * logicalrep_partition_open
+ *
+ * Returned entry reuses most of the values of the root table's entry, save
+ * the attribute map, which can be different for the partition.
+ *
+ * Note there's no logialrep_partition_close, because the caller closes the
+ * the component relation.
+ */
+LogicalRepRelMapEntry *
+logicalrep_partition_open(LogicalRepRelMapEntry *root,
+						  Relation partrel, AttrMap *map)
+{
+	LogicalRepRelMapEntry *entry;
+	LogicalRepRelation *remoterel = &root->remoterel;
+	Oid			partOid = RelationGetRelid(partrel);
+	AttrMap	   *attrmap = root->attrmap;
+	bool		found;
+	int			i;
+	MemoryContext oldctx;
+
+	if (LogicalRepPartMap == NULL)
+		logicalrep_partmap_init();
+
+	/* Search for existing entry. */
+	entry = hash_search(LogicalRepPartMap, (void *) &partOid,
+						HASH_ENTER, &found);
+
+	if (found)
+		return entry;
+
+	memset(entry, 0, sizeof(LogicalRepRelMapEntry));
+
+	/* Make cached copy of the data */
+	oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
+
+	/* Remote relation is used as-is from the root's entry. */
+	entry->remoterel.remoteid = remoterel->remoteid;
+	entry->remoterel.nspname = pstrdup(remoterel->nspname);
+	entry->remoterel.relname = pstrdup(remoterel->relname);
+	entry->remoterel.natts = remoterel->natts;
+	entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
+	entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
+	for (i = 0; i < remoterel->natts; i++)
+	{
+		entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
+		entry->remoterel.atttyps[i] = remoterel->atttyps[i];
+	}
+	entry->remoterel.replident = remoterel->replident;
+	entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
+
+	entry->localrel = partrel;
+	entry->localreloid = partOid;
+
+	/*
+	 * If the partition's attributes don't match the root relation's, we'll
+	 * need to make a new attrmap which maps partition attribute numbers to
+	 * remoterel's, instead the original which maps root relation's attribute
+	 * numbers to remoterel's.
+	 *
+	 * Note that 'map' which comes from the tuple routing data structure
+	 * contains 1-based attribute numbers (of the parent relation).  However,
+	 * the map in 'entry', a logical replication data structure, contains
+	 * 0-based attribute numbers (of the remote relation).
+	 */
+	if (map)
+	{
+		AttrNumber	attno;
+
+		entry->attrmap = make_attrmap(map->maplen);
+		for (attno = 0; attno < entry->attrmap->maplen; attno++)
+		{
+			AttrNumber	root_attno = map->attnums[attno];
+
+			entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
+		}
+	}
+	else
+		entry->attrmap = attrmap;
+
+	entry->updatable = root->updatable;
+
+	/* state and statelsn are left set to 0. */
+	MemoryContextSwitchTo(oldctx);
+
+	return entry;
+}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index a60c666..c27d970 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -762,7 +762,6 @@ copy_table(Relation rel)
 	/* Map the publisher relation to local one. */
 	relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
 	Assert(rel == relmapentry->localrel);
-	Assert(relmapentry->localrel->rd_rel->relkind == RELKIND_RELATION);
 
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 51c0278..9871d1f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -29,11 +29,14 @@
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
 #include "catalog/namespace.h"
+#include "catalog/partition.h"
+#include "catalog/pg_inherits.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "commands/tablecmds.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
+#include "executor/execPartition.h"
 #include "executor/nodeModifyTable.h"
 #include "funcapi.h"
 #include "libpq/pqformat.h"
@@ -126,6 +129,12 @@ static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
 									LogicalRepRelation *remoterel,
 									TupleTableSlot *remoteslot,
 									TupleTableSlot **localslot);
+static void apply_handle_tuple_routing(ResultRelInfo *relinfo,
+									   EState *estate,
+									   TupleTableSlot *remoteslot,
+									   LogicalRepTupleData *newtup,
+									   LogicalRepRelMapEntry *relmapentry,
+									   CmdType operation);
 
 /*
  * Should this worker apply changes for given relation.
@@ -636,9 +645,13 @@ apply_handle_insert(StringInfo s)
 	slot_fill_defaults(rel, estate, remoteslot);
 	MemoryContextSwitchTo(oldctx);
 
-	Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
-	apply_handle_insert_internal(estate->es_result_relation_info, estate,
-								 remoteslot);
+	/* For a partitioned table, insert the tuple into a partition. */
+	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		apply_handle_tuple_routing(estate->es_result_relation_info, estate,
+								   remoteslot, NULL, rel, CMD_INSERT);
+	else
+		apply_handle_insert_internal(estate->es_result_relation_info, estate,
+									 remoteslot);
 
 	PopActiveSnapshot();
 
@@ -767,9 +780,13 @@ apply_handle_update(StringInfo s)
 						has_oldtup ? oldtup.values : newtup.values);
 	MemoryContextSwitchTo(oldctx);
 
-	Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
-	apply_handle_update_internal(estate->es_result_relation_info, estate,
-								 remoteslot, &newtup, rel);
+	/* For a partitioned table, apply update to correct partition. */
+	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		apply_handle_tuple_routing(estate->es_result_relation_info, estate,
+								   remoteslot, &newtup, rel, CMD_UPDATE);
+	else
+		apply_handle_update_internal(estate->es_result_relation_info, estate,
+									 remoteslot, &newtup, rel);
 
 	PopActiveSnapshot();
 
@@ -886,9 +903,13 @@ apply_handle_delete(StringInfo s)
 	slot_store_cstrings(remoteslot, rel, oldtup.values);
 	MemoryContextSwitchTo(oldctx);
 
-	Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
-	apply_handle_delete_internal(estate->es_result_relation_info, estate,
-								 remoteslot, &rel->remoterel);
+	/* For a partitioned table, apply delete to correct partition. */
+	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		apply_handle_tuple_routing(estate->es_result_relation_info, estate,
+								   remoteslot, NULL, rel, CMD_DELETE);
+	else
+		apply_handle_delete_internal(estate->es_result_relation_info, estate,
+									 remoteslot, &rel->remoterel);
 
 	PopActiveSnapshot();
 
@@ -975,6 +996,212 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
 }
 
 /*
+ * This handles insert, update, delete on a partitioned table.
+ */
+static void
+apply_handle_tuple_routing(ResultRelInfo *relinfo,
+						   EState *estate,
+						   TupleTableSlot *remoteslot,
+						   LogicalRepTupleData *newtup,
+						   LogicalRepRelMapEntry *relmapentry,
+						   CmdType operation)
+{
+	Relation	parentrel = relinfo->ri_RelationDesc;
+	ModifyTableState *mtstate = NULL;
+	PartitionTupleRouting *proute = NULL;
+	ResultRelInfo *partrelinfo;
+	Relation	partrel;
+	TupleTableSlot *remoteslot_part;
+	PartitionRoutingInfo *partinfo;
+	TupleConversionMap *map;
+	MemoryContext oldctx;
+
+	/* ModifyTableState is needed for ExecFindPartition(). */
+	mtstate = makeNode(ModifyTableState);
+	mtstate->ps.plan = NULL;
+	mtstate->ps.state = estate;
+	mtstate->operation = operation;
+	mtstate->resultRelInfo = relinfo;
+	proute = ExecSetupPartitionTupleRouting(estate, mtstate, parentrel);
+
+	/*
+	 * Find a partition for the tuple contained in remoteslot.
+	 */
+	Assert(remoteslot != NULL);
+	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+	partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
+									remoteslot, estate);
+	Assert(partrelinfo != NULL);
+	partrel = partrelinfo->ri_RelationDesc;
+
+	/* Convert the tuple to match the partition's rowtype. */
+	partinfo = partrelinfo->ri_PartitionInfo;
+	remoteslot_part = partinfo->pi_PartitionTupleSlot;
+	if (remoteslot_part == NULL)
+		remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
+	map = partinfo->pi_RootToPartitionMap;
+	if (map != NULL)
+		remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
+												remoteslot_part);
+	else
+	{
+		remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
+		slot_getallattrs(remoteslot_part);
+	}
+	MemoryContextSwitchTo(oldctx);
+
+	estate->es_result_relation_info = partrelinfo;
+	switch (operation)
+	{
+		case CMD_INSERT:
+			apply_handle_insert_internal(partrelinfo, estate,
+										 remoteslot_part);
+			break;
+
+		case CMD_DELETE:
+			apply_handle_delete_internal(partrelinfo, estate,
+										 remoteslot_part,
+										 &relmapentry->remoterel);
+			break;
+
+		case CMD_UPDATE:
+			/*
+			 * For UPDATE, depending on whether or not the updated tuple
+			 * satisfies the partition's constraint, perform a simple UPDATE
+			 * UPDATE of the partition or move the updated tuple into a
+			 * different suitable partition.
+			 */
+			{
+				AttrMap	   *attrmap = map ? map->attrMap : NULL;
+				LogicalRepRelMapEntry *part_entry;
+				TupleTableSlot *localslot;
+				ResultRelInfo *partrelinfo_new;
+				TupleTableSlot *remoteslot_new;
+				bool		found;
+
+				part_entry = logicalrep_partition_open(relmapentry, partrel,
+													   attrmap);
+
+				/* Get the matching local tuple from the partition. */
+				found = FindReplTupleInLocalRel(estate, partrel,
+												&part_entry->remoterel,
+												remoteslot_part, &localslot);
+
+				remoteslot_new = table_slot_create(partrel,
+												   &estate->es_tupleTable);
+				oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+				if (found)
+				{
+					/* Get the updated tuple.  */
+					slot_modify_cstrings(remoteslot_new, localslot,
+										 part_entry,
+										 newtup->values, newtup->changed);
+					MemoryContextSwitchTo(oldctx);
+				}
+				else
+				{
+					/*
+					 * The tuple to be updated could not be found.
+					 *
+					 * TODO what to do here, change the log level to LOG
+					 * perhaps?
+					 */
+					elog(DEBUG1,
+						 "logical replication did not find row for update "
+						 "in replication target relation \"%s\"",
+						 RelationGetRelationName(partrel));
+				}
+
+				/* Does the updated tuple satisfy the partition constraint? */
+				if (partrelinfo->ri_PartitionCheck == NULL ||
+					ExecPartitionCheck(partrelinfo, remoteslot_new, estate,
+									   false))
+				{
+					/*
+					 * Yes, so simply UPDATE the partition.  We don't call
+					 * apply_handle_update_interal() here, which could do this
+					 * work, to avoid repeating some work already done above,
+					 * such as finding the local tuple in the partition.
+					 */
+					EPQState epqstate;
+
+					EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
+					ExecOpenIndices(partrelinfo, false);
+
+					EvalPlanQualSetSlot(&epqstate, remoteslot_new);
+					ExecSimpleRelationUpdate(estate, &epqstate, localslot,
+											 remoteslot_new);
+					ExecCloseIndices(partrelinfo);
+					EvalPlanQualEnd(&epqstate);
+				}
+				else
+				{
+					/* Move the tuple into the new partition. */
+
+					/* Convert the updated tuple back to the parent's rowtype. */
+					if (map)
+					{
+						TupleConversionMap *PartitionToRootMap =
+							convert_tuples_by_name(RelationGetDescr(partrel),
+												   RelationGetDescr(parentrel));
+						remoteslot =
+							execute_attr_map_slot(PartitionToRootMap->attrMap,
+												  remoteslot_new, remoteslot);
+					}
+					else
+					{
+						remoteslot = ExecCopySlot(remoteslot, remoteslot_new);
+						slot_getallattrs(remoteslot);
+					}
+
+
+					/* Find the new partition. */
+					oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+					partrelinfo_new = ExecFindPartition(mtstate, relinfo,
+														proute, remoteslot,
+														estate);
+					MemoryContextSwitchTo(oldctx);
+					Assert(partrelinfo_new != partrelinfo);
+
+					/* DELETE old tuple from the old partition. */
+					estate->es_result_relation_info = partrelinfo;
+					apply_handle_delete_internal(partrelinfo, estate,
+												 remoteslot_part,
+												 &relmapentry->remoterel);
+
+					/* INSERT new tuple into the new partition. */
+
+					/*
+					 * Convert the replacement tuple to match the destination
+					 * partition rowtype.
+					 */
+					oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+					partinfo = partrelinfo_new->ri_PartitionInfo;
+					map = partinfo->pi_RootToPartitionMap;
+					if (map != NULL)
+					{
+						remoteslot_new = partinfo->pi_PartitionTupleSlot;
+						remoteslot_new = execute_attr_map_slot(map->attrMap,
+															   remoteslot,
+															   remoteslot_new);
+					}
+					MemoryContextSwitchTo(oldctx);
+					estate->es_result_relation_info = partrelinfo_new;
+					apply_handle_insert_internal(partrelinfo_new, estate,
+												 remoteslot_new);
+				}
+			}
+			break;
+
+		default:
+			elog(ERROR, "unrecognized CmdType: %d", (int) operation);
+			break;
+	}
+
+	ExecCleanupTupleRouting(mtstate, proute);
+}
+
+/*
  * Handle TRUNCATE message.
  *
  * TODO: FDW support
@@ -987,6 +1214,7 @@ apply_handle_truncate(StringInfo s)
 	List	   *remote_relids = NIL;
 	List	   *remote_rels = NIL;
 	List	   *rels = NIL;
+	List	   *part_rels = NIL;
 	List	   *relids = NIL;
 	List	   *relids_logged = NIL;
 	ListCell   *lc;
@@ -1016,6 +1244,52 @@ apply_handle_truncate(StringInfo s)
 		relids = lappend_oid(relids, rel->localreloid);
 		if (RelationIsLogicallyLogged(rel->localrel))
 			relids_logged = lappend_oid(relids_logged, rel->localreloid);
+
+		/*
+		 * Truncate partitions if we got a message to truncate a partitioned
+		 * table.
+		 */
+		if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		{
+			ListCell   *child;
+			List	   *children = find_all_inheritors(rel->localreloid,
+													   RowExclusiveLock,
+													   NULL);
+
+			foreach(child, children)
+			{
+				Oid			childrelid = lfirst_oid(child);
+				Relation	childrel;
+
+				if (list_member_oid(relids, childrelid))
+					continue;
+
+				/* find_all_inheritors already got lock */
+				childrel = table_open(childrelid, NoLock);
+
+				/*
+				 * It is possible that the parent table has children that are
+				 * temp tables of other backends.  We cannot safely access
+				 * such tables (because of buffering issues), and the best
+				 * thing to do is to silently ignore them.  Note that this
+				 * check is the same as one of the checks done in
+				 * truncate_check_activity() called below, still it is kept
+				 * here for simplicity.
+				 */
+				if (RELATION_IS_OTHER_TEMP(childrel))
+				{
+					table_close(childrel, RowExclusiveLock);
+					continue;
+				}
+
+				rels = lappend(rels, childrel);
+				part_rels = lappend(part_rels, childrel);
+				relids = lappend_oid(relids, childrelid);
+				/* Log this relation only if needed for logical decoding */
+				if (RelationIsLogicallyLogged(childrel))
+					relids_logged = lappend_oid(relids_logged, childrelid);
+			}
+		}
 	}
 
 	/*
@@ -1031,6 +1305,12 @@ apply_handle_truncate(StringInfo s)
 
 		logicalrep_rel_close(rel, NoLock);
 	}
+	foreach(lc, part_rels)
+	{
+		Relation rel = lfirst(lc);
+
+		table_close(rel, NoLock);
+	}
 
 	CommandCounterIncrement();
 }
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 9971a80..4650b4f 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -34,6 +34,8 @@ extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
 
 extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
 												  LOCKMODE lockmode);
+extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root,
+						  Relation partrel, AttrMap *map);
 extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 								 LOCKMODE lockmode);
 
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index ea5812c..14ff9f4 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 15;
+use Test::More tests => 18;
 
 # setup
 
@@ -42,10 +42,15 @@ $node_subscriber1->safe_psql('postgres',
 	"CREATE TABLE tab1 (a int PRIMARY KEY, b text, c text) PARTITION BY LIST (a)");
 $node_subscriber1->safe_psql('postgres',
 	"CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT NULL)");
+
 $node_subscriber1->safe_psql('postgres',
 	"ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3, 4)");
 $node_subscriber1->safe_psql('postgres',
-	"CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (5, 6)");
+	"CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (5, 6) PARTITION BY LIST (a)");
+$node_subscriber1->safe_psql('postgres',
+	"CREATE TABLE tab1_2_1 PARTITION OF tab1_2 FOR VALUES IN (5)");
+$node_subscriber1->safe_psql('postgres',
+	"CREATE TABLE tab1_2_2 PARTITION OF tab1_2 FOR VALUES IN (6)");
 $node_subscriber1->safe_psql('postgres',
 	"CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1");
 
@@ -82,6 +87,10 @@ my $result = $node_subscriber1->safe_psql('postgres',
 	"SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
 is($result, qq(sub1_tab1|3|1|5), 'insert into tab1_1, tab1_2 replicated');
 
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT tableoid::regclass FROM tab1 WHERE a = 5");
+is($result, qq(tab1_2_1), 'inserts into tab1_2 replicated into correct partition');
+
 $result = $node_subscriber2->safe_psql('postgres',
 	"SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
 is($result, qq(sub2_tab1_1|2|1|3), 'inserts into tab1_1 replicated');
@@ -90,24 +99,30 @@ $result = $node_subscriber2->safe_psql('postgres',
 	"SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1");
 is($result, qq(sub2_tab1_2|1|5|5), 'inserts into tab1_2 replicated');
 
-# update (no partition change)
+# update (replicated as update)
 $node_publisher->safe_psql('postgres',
 	"UPDATE tab1 SET a = 2 WHERE a = 1");
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab1 SET a = 6 WHERE a = 5");
 
 $node_publisher->wait_for_catchup('sub1');
 $node_publisher->wait_for_catchup('sub2');
 
 $result = $node_subscriber1->safe_psql('postgres',
 	"SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
-is($result, qq(sub1_tab1|3|2|5), 'update of tab1_1 replicated');
+is($result, qq(sub1_tab1|3|2|6), 'update of tab1_1 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT tableoid::regclass FROM tab1 WHERE a = 6");
+is($result, qq(tab1_2_2), 'update of tab1_2 correctly replicated as intra-partition update');
 
 $result = $node_subscriber2->safe_psql('postgres',
 	"SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
 is($result, qq(sub2_tab1_1|2|2|3), 'update of tab1_1 replicated');
 
-# update (partition changes)
+# update (replicated as delete+insert)
 $node_publisher->safe_psql('postgres',
-	"UPDATE tab1 SET a = 6 WHERE a = 2");
+	"UPDATE tab1 SET a = 5 WHERE a = 2");
 
 $node_publisher->wait_for_catchup('sub1');
 $node_publisher->wait_for_catchup('sub2');
@@ -116,6 +131,10 @@ $result = $node_subscriber1->safe_psql('postgres',
 	"SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
 is($result, qq(sub1_tab1|3|3|6), 'update of tab1 replicated');
 
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT tableoid::regclass FROM tab1 WHERE a = 5");
+is($result, qq(tab1_2_1), 'update of tab1_2 correctly replicated as cross-partition update');
+
 $result = $node_subscriber2->safe_psql('postgres',
 	"SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
 is($result, qq(sub2_tab1_1|1|3|3), 'delete from tab1_1 replicated');
-- 
1.8.3.1

