Hi,

Thanks for the review!


>
> 1.
> In FilterOutNotSuitablePathsForReplIdentFull(), is
> "nonPartialIndexPathList" a
> good name for the list? Indexes on only expressions are also be filtered.
>
> +static List *
> +FilterOutNotSuitablePathsForReplIdentFull(List *pathlist)
> +{
> +       ListCell   *lc;
> +       List *nonPartialIndexPathList = NIL;
>
>
Yes, true. We only started filtering the non-partial ones first. Now
changed to *suitableIndexList*, does that look right?


> 2.
> +typedef struct LogicalRepPartMapEntry
> +{
> +       Oid                     partoid;                /*
> LogicalRepPartMap's key */
> +       LogicalRepRelMapEntry relmapentry;
> +       Oid                     usableIndexOid; /* which index to use?
> (Invalid when no index
> +                                                                * used) */
> +} LogicalRepPartMapEntry;
>
> For partition tables, is it possible to use relmapentry->usableIndexOid to
> mark
> which index to use? Which means we don't need to add "usableIndexOid" to
> LogicalRepPartMapEntry.
>
>
My intention was to make this explicit so that it is clear that partitions
can explicitly own indexes.

But I tried your suggested refactor, which looks good. So, I changed it.

Also, I realized that I do not have a test where the partition has an index
(not inherited from the parent), which I also added now.


> 3.
> It looks we should change the comment for FindReplTupleInLocalRel() in this
> patch.
>
> /*
>  * Try to find a tuple received from the publication side (in
> 'remoteslot') in
>  * the corresponding local relation using either replica identity index,
>  * primary key or if needed, sequential scan.
>  *
>  * Local tuple, if found, is returned in '*localslot'.
>  */
> static bool
> FindReplTupleInLocalRel(EState *estate, Relation localrel,
>
>
I made a small change, just adding "index". Do you expect a larger change?


> 4.
> @@ -2030,16 +2017,19 @@ apply_handle_delete_internal(ApplyExecutionData
> *edata,
>  {
>         EState     *estate = edata->estate;
>         Relation        localrel = relinfo->ri_RelationDesc;
> -       LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
> +       LogicalRepRelMapEntry *targetRel = edata->targetRel;
> +       LogicalRepRelation *remoterel = &targetRel->remoterel;
>         EPQState        epqstate;
>         TupleTableSlot *localslot;
>
> Do we need this change? I didn't see any place to use the variable
> targetRel
> afterwards.
>

Seems so, changed it back.


> 5.
> +               if (!AttributeNumberIsValid(mainattno))
> +               {
> +                       /*
> +                        * There are two cases to consider. First, if the
> index is a primary or
> +                        * unique key, we cannot have any indexes with
> expressions. So, at this
> +                        * point we are sure that the index we deal is not
> these.
> +                        */
> +                       Assert(RelationGetReplicaIndex(rel) !=
> RelationGetRelid(idxrel) &&
> +                                  RelationGetPrimaryKeyIndex(rel) !=
> RelationGetRelid(idxrel));
> +
> +                       /*
> +                        * For a non-primary/unique index with an
> expression, we are sure that
> +                        * the expression cannot be used for replication
> index search. The
> +                        * reason is that we create relevant index paths
> by providing column
> +                        * equalities. And, the planner does not pick
> expression indexes via
> +                        * column equality restrictions in the query.
> +                        */
> +                       continue;
> +               }
>
> Is it possible that it is a usable index with an expression? I think
> indexes
> with an expression has been filtered in
> FilterOutNotSuitablePathsForReplIdentFull(). If it can't be a usable index
> with
> an expression, maybe we shouldn't use "continue" here.
>

Ok, I think there are some confusing comments in the code, which I updated.
Also, added one more explicit Assert to make the code a little more
readable.

We can support indexes involving expressions but not indexes that are only
consisting of expressions. FilterOutNotSuitablePathsForReplIdentFull()
filters out the latter, see IndexOnlyOnExpression().

So, for example, if we have an index as below, we are skipping the
expression while building the index scan keys:

CREATE INDEX people_names ON people (firstname, lastname, (id || '_' ||
sub_id));

We can consider removing `continue`, but that'd mean we should also adjust
the following code-block to handle indexprs. To me, that seems like an edge
case to implement at this point, given such an index is probably not
common. Do you think should I try to use the indexprs as well while
building the scan key?

I'm mostly trying to keep the complexity small. If you suggest this
limitation should be lifted, I can give it a shot. I think the limitation I
leave here is with a single sentence: *The index on the subscriber can only
use simple column references.  *


> 6.
> In the following case, I got a result which is different from HEAD, could
> you
> please look into it?
>
> -- publisher
> CREATE TABLE test_replica_id_full (x int);
> ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;
> CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full;
>
> -- subscriber
> CREATE TABLE test_replica_id_full (x int, y int);
> CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y);
> CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION 'dbname=postgres
> port=5432' PUBLICATION tap_pub_rep_full;
>
> -- publisher
> INSERT INTO test_replica_id_full VALUES (1);
> UPDATE test_replica_id_full SET x = x + 1 WHERE x = 1;
>
> The data in subscriber:
> on HEAD:
> postgres=# select * from test_replica_id_full ;
>  x | y
> ---+---
>  2 |
> (1 row)
>
> After applying the patch:
> postgres=# select * from test_replica_id_full ;
>  x | y
> ---+---
>  1 |
> (1 row)
>
>
Ops, good catch. it seems we forgot to have:

skey[scankey_attoff].sk_flags |= SK_SEARCHNULL;


On head, the index used for this purpose could only be the primary key or
unique key on NOT NULL columns. Now,  we do allow NULL values, and need to
search for them. Added that (and your test) to the updated patch.

As a semi-related note, tuples_equal() decides `true` for (NULL = NULL). I
have not changed that, and it seems right in this context. Do you see any
issues with that?

Also, I realized that the functions in the execReplication.c expect only
btree indexes. So, I skipped others as well. If that makes sense, I can
work on a follow-up patch after we can merge this, to remove some of the
limitations mentioned here.

Thanks,
Onder
From ea2960a584ea3f8f9d7777d4d54734c7d612a1aa Mon Sep 17 00:00:00 2001
From: Onder Kalaci <onderkalaci@gmail.com>
Date: Tue, 17 May 2022 10:47:39 +0200
Subject: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full
 on the publisher

It is often not feasible to use `REPLICA IDENTITY FULL` on the publication, because it leads to full table scan per tuple change on the subscription. This makes `REPLICA IDENTITY FULL` impracticable -- probably other than some small number of use cases.

With this patch, I'm proposing the following change: If there is an index on the subscriber, use the index as long as the planner sub-modules picks any index over sequential scan.

Majority of the logic on the subscriber side has already existed in the code. The subscriber is already capable of doing (unique) index scans. With this patch, we are allowing the index to iterate over the tuples fetched and only act when tuples are equal. The ones familiar with this part of the code could realize that the sequential scan code on the subscriber already implements the `tuples_equal()` function. In short, the changes on the subscriber is mostly combining parts of (unique) index scan and sequential scan codes.

The decision on whether to use an index (or which index) is mostly derived from planner infrastructure. The idea is that on the subscriber we have all the columns. So, construct all the `Path`s with the restrictions on all columns, such as `col_1 = $1 AND col_2 = $2 ... AND col_n = $N`. Finally, let the planner sub-module -- `create_index_paths()` -- to give us the relevant  index `Path`s. On top of that, add the sequential scan `Path` as well. Finally, pick the cheapest `Path` among.

From the performance point of view, there are few things to note. First, the patch aims not to
change the behavior when PRIMARY KEY or UNIQUE INDEX is used. Second, when REPLICA IDENTITY
IS FULL on the publisher and an index is used on the subscriber, the difference mostly comes down
to `index scan` vs `sequential scan`. That's why it is hard to claim certain number of improvements.
It mostly depends on the data size, index and the data distribution.

Still, below I try to show case the potential improvements using an index on the subscriber
`pgbench_accounts(bid)`. With the index, the replication catches up around ~5 seconds.
When the index is dropped, the replication takes around ~300 seconds.

// init source db
pgbench -i -s 100 -p 5432 postgres
psql -c "ALTER TABLE pgbench_accounts DROP CONSTRAINT pgbench_accounts_pkey;" -p 5432 postgres
psql -c "CREATE INDEX i1 ON pgbench_accounts(aid);" -p 5432 postgres
psql -c "ALTER TABLE pgbench_accounts REPLICA IDENTITY FULL;" -p 5432 postgres
psql -c "CREATE PUBLICATION pub_test_1 FOR TABLE pgbench_accounts;" -p 5432 postgres

// init target db, drop existing primary key
pgbench -i -p 9700 postgres
psql -c "truncate pgbench_accounts;" -p 9700 postgres
psql -c "ALTER TABLE pgbench_accounts DROP CONSTRAINT pgbench_accounts_pkey;" -p 9700 postgres
psql -c "CREATE SUBSCRIPTION sub_test_1 CONNECTION 'host=localhost port=5432 user=onderkalaci dbname=postgres' PUBLICATION pub_test_1;" -p 9700 postgres

// create one indxe, even on a low cardinality column
psql -c "CREATE INDEX i2 ON pgbench_accounts(bid);" -p 9700 postgres

// now, run some pgbench tests and observe replication
pgbench -t 500 -b tpcb-like -p 5432 postgres
---
 src/backend/executor/execReplication.c        | 121 ++-
 src/backend/replication/logical/relation.c    | 398 +++++++-
 src/backend/replication/logical/worker.c      |  97 +-
 src/include/replication/logicalrelation.h     |  14 +-
 .../subscription/t/032_subscribe_use_index.pl | 939 ++++++++++++++++++
 5 files changed, 1499 insertions(+), 70 deletions(-)
 create mode 100644 src/test/subscription/t/032_subscribe_use_index.pl

diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 6014f2e248..18677c1b94 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -19,12 +19,18 @@
 #include "access/tableam.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#ifdef USE_ASSERT_CHECKING
+#include "catalog/index.h"
+#endif
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/nodeModifyTable.h"
 #include "nodes/nodeFuncs.h"
 #include "parser/parse_relation.h"
 #include "parser/parsetree.h"
+#ifdef USE_ASSERT_CHECKING
+#include "replication/logicalrelation.h"
+#endif
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "utils/builtins.h"
@@ -37,49 +43,80 @@
 #include "utils/typcache.h"
 
 
+static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
+						 TypeCacheEntry **eq);
+
 /*
  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
  * is setup to match 'rel' (*NOT* idxrel!).
  *
- * Returns whether any column contains NULLs.
+ * Returns how many columns should be used for the index scan.
  *
- * This is not generic routine, it expects the idxrel to be replication
- * identity of a rel and meet all limitations associated with that.
+ * This is not generic routine, it expects the idxrel to be an index
+ * that planner would choose if the searchslot includes all the columns
+ * (e.g., REPLICA IDENTITY FULL on the source).
  */
-static bool
+static int
 build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 						 TupleTableSlot *searchslot)
 {
-	int			attoff;
+	int			index_attoff;
+	int			scankey_attoff;
 	bool		isnull;
 	Datum		indclassDatum;
 	oidvector  *opclass;
 	int2vector *indkey = &idxrel->rd_index->indkey;
-	bool		hasnulls = false;
-
-	Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
-		   RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
 
 	indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
 									Anum_pg_index_indclass, &isnull);
 	Assert(!isnull);
 	opclass = (oidvector *) DatumGetPointer(indclassDatum);
+	scankey_attoff = 0;
 
 	/* Build scankey for every attribute in the index. */
-	for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
+	for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
+		 index_attoff++)
 	{
 		Oid			operator;
 		Oid			opfamily;
 		RegProcedure regop;
-		int			pkattno = attoff + 1;
-		int			mainattno = indkey->values[attoff];
-		Oid			optype = get_opclass_input_type(opclass->values[attoff]);
+		int			table_attno = indkey->values[index_attoff];
+		Oid			optype = get_opclass_input_type(opclass->values[index_attoff]);
+
+		if (!AttributeNumberIsValid(table_attno))
+		{
+			IndexInfo *indexInfo PG_USED_FOR_ASSERTS_ONLY;
+
+			/*
+			 * There are two cases to consider. First, if the index is a primary or
+			 * unique key, we cannot have any indexes with expressions. So, at this
+			 * point we are sure that the index we deal is not these.
+			 */
+			Assert(RelationGetReplicaIndex(rel) != RelationGetRelid(idxrel) &&
+				   RelationGetPrimaryKeyIndex(rel) != RelationGetRelid(idxrel));
+
+			/*
+			 * At this point, we are also sure that the index is not consisting
+			 * of only expressions.
+			 */
+#ifdef USE_ASSERT_CHECKING
+			indexInfo = BuildIndexInfo(idxrel);
+			Assert(!IndexOnlyOnExpression(indexInfo));
+#endif
+
+			/*
+			 * For a non-primary/unique index with an additional expression, do
+			 * not have to continue at this point. However, the below code
+			 * assumes the index scan is only done for simple column references.
+			 */
+			continue;
+		}
 
 		/*
 		 * Load the operator info.  We need this to get the equality operator
 		 * function for the scan key.
 		 */
-		opfamily = get_opclass_family(opclass->values[attoff]);
+		opfamily = get_opclass_family(opclass->values[index_attoff]);
 
 		operator = get_opfamily_member(opfamily, optype,
 									   optype,
@@ -91,23 +128,29 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 		regop = get_opcode(operator);
 
 		/* Initialize the scankey. */
-		ScanKeyInit(&skey[attoff],
-					pkattno,
+		ScanKeyInit(&skey[scankey_attoff],
+					index_attoff + 1,
 					BTEqualStrategyNumber,
 					regop,
-					searchslot->tts_values[mainattno - 1]);
+					searchslot->tts_values[table_attno - 1]);
 
-		skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
+		skey[scankey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
 
 		/* Check for null value. */
-		if (searchslot->tts_isnull[mainattno - 1])
+		if (searchslot->tts_isnull[table_attno - 1])
 		{
-			hasnulls = true;
-			skey[attoff].sk_flags |= SK_ISNULL;
+			skey[scankey_attoff].sk_flags |= SK_ISNULL;
+			skey[scankey_attoff].sk_flags |= SK_SEARCHNULL;
 		}
+
+		scankey_attoff++;
+
 	}
 
-	return hasnulls;
+	/* we should always use at least one attribute for the index scan */
+	Assert (scankey_attoff > 0);
+
+	return scankey_attoff;
 }
 
 /*
@@ -128,28 +171,44 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 	TransactionId xwait;
 	Relation	idxrel;
 	bool		found;
+	TypeCacheEntry **eq;
+	bool		indisunique;
+	int			scankey_attoff;
 
 	/* Open the index. */
 	idxrel = index_open(idxoid, RowExclusiveLock);
+	indisunique = idxrel->rd_index->indisunique;
+
+	/* we might not need this if the index is unique */
+	eq = NULL;
 
 	/* Start an index scan. */
 	InitDirtySnapshot(snap);
-	scan = index_beginscan(rel, idxrel, &snap,
-						   IndexRelationGetNumberOfKeyAttributes(idxrel),
-						   0);
 
 	/* Build scan key. */
-	build_replindex_scan_key(skey, rel, idxrel, searchslot);
+	scankey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
 
+	scan = index_beginscan(rel, idxrel, &snap,
+						   scankey_attoff, 0);
 retry:
 	found = false;
 
-	index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
+	index_rescan(scan, skey, scankey_attoff, NULL, 0);
 
 	/* Try to find the tuple */
-	if (index_getnext_slot(scan, ForwardScanDirection, outslot))
+	while (index_getnext_slot(scan, ForwardScanDirection, outslot))
 	{
-		found = true;
+		/* avoid expensive equality check if index is unique */
+		if (!indisunique)
+		{
+			/* we only need to allocate once */
+			if (eq == NULL)
+				eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
+
+			if (!tuples_equal(outslot, searchslot, eq))
+				continue;
+		}
+
 		ExecMaterializeSlot(outslot);
 
 		xwait = TransactionIdIsValid(snap.xmin) ?
@@ -164,6 +223,10 @@ retry:
 			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
 			goto retry;
 		}
+
+		/* Found our tuple and it's not locked */
+		found = true;
+		break;
 	}
 
 	/* Found tuple, try to lock it in the lockmode. */
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index e989047681..58300043ec 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -17,14 +17,25 @@
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/table.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_am_d.h"
 #include "catalog/pg_subscription_rel.h"
+#include "catalog/pg_operator.h"
+#include "commands/defrem.h"
 #include "executor/executor.h"
 #include "nodes/makefuncs.h"
 #include "replication/logicalrelation.h"
 #include "replication/worker_internal.h"
+#include "optimizer/cost.h"
+#include "optimizer/paramassign.h"
+#include "optimizer/pathnode.h"
+#include "optimizer/paths.h"
+#include "optimizer/plancat.h"
+#include "optimizer/restrictinfo.h"
 #include "utils/inval.h"
+#include "utils/typcache.h"
 
 
 static MemoryContext LogicalRepRelMapContext = NULL;
@@ -44,11 +55,9 @@ static HTAB *LogicalRepRelMap = NULL;
  */
 static MemoryContext LogicalRepPartMapContext = NULL;
 static HTAB *LogicalRepPartMap = NULL;
-typedef struct LogicalRepPartMapEntry
-{
-	Oid			partoid;		/* LogicalRepPartMap's key */
-	LogicalRepRelMapEntry relmapentry;
-} LogicalRepPartMapEntry;
+
+static Oid	LogicalRepUsableIndex(Relation localrel,
+								  LogicalRepRelation *remoterel);
 
 /*
  * Relcache invalidation callback for our relation map cache.
@@ -438,6 +447,12 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 		 */
 		logicalrep_rel_mark_updatable(entry);
 
+		/*
+		 * Finding a usable index is an infrequent operation, it is performed
+		 * only when first time an operation is performed on the relation or
+		 * after invalidation of the relation cache entry (e.g., such as ANALYZE).
+		 */
+		entry->usableIndexOid = LogicalRepUsableIndex(entry->localrel, remoterel);
 		entry->localrelvalid = true;
 	}
 
@@ -581,7 +596,7 @@ logicalrep_partmap_init(void)
  * Note there's no logicalrep_partition_close, because the caller closes the
  * component relation.
  */
-LogicalRepRelMapEntry *
+LogicalRepPartMapEntry *
 logicalrep_partition_open(LogicalRepRelMapEntry *root,
 						  Relation partrel, AttrMap *map)
 {
@@ -615,7 +630,7 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 	if (found && entry->localrelvalid)
 	{
 		entry->localrel = partrel;
-		return entry;
+		return part_entry;
 	}
 
 	/* Switch to longer-lived context. */
@@ -696,10 +711,377 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 	/* Set if the table's replica identity is enough to apply update/delete. */
 	logicalrep_rel_mark_updatable(entry);
 
+	/*
+	 * Finding a usable index is an infrequent operation, it is performed
+	 * only when first time an operation is performed on the relation or
+	 * after invalidation of the relation cache entry (e.g., such as ANALYZE).
+	 */
+	part_entry->relmapentry.usableIndexOid =
+		LogicalRepUsableIndex(partrel, remoterel);
+
 	entry->localrelvalid = true;
 
 	/* state and statelsn are left set to 0. */
 	MemoryContextSwitchTo(oldctx);
 
-	return entry;
+	return part_entry;
+}
+
+/*
+ * GetIndexOidFromPath returns a valid index oid if
+ * the input path is an index path.
+ * Otherwise, return invalid oid.
+ */
+static
+Oid
+GetIndexOidFromPath(Path *path)
+{
+	Oid indexOid;
+
+	switch (path->pathtype)
+	{
+		case T_IndexScan:
+		case T_IndexOnlyScan:
+			{
+				IndexPath  *index_sc = (IndexPath *) path;
+				indexOid = index_sc->indexinfo->indexoid;
+
+				break;
+			}
+
+		default:
+			indexOid = InvalidOid;
+	}
+
+	return indexOid;
+}
+
+
+/*
+ * Returns true if the given index consist only of expressions such as:
+ * 	CREATE INDEX idx ON table(foo(col));
+ *
+ * Returns false even if there is one column reference:
+ * 	 CREATE INDEX idx ON table(foo(col), col_2);
+ */
+bool
+IndexOnlyOnExpression(IndexInfo *indexInfo)
+{
+	int i=0;
+	for (i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
+	{
+		AttrNumber	attnum = indexInfo->ii_IndexAttrNumbers[i];
+		if (AttributeNumberIsValid(attnum))
+			return false;
+
+	}
+
+	return true;
+}
+
+
+/*
+ * Iterates over the input path list, and returns another path list
+ * where paths with non-btree indexes, partial indexes or
+ * indexes on only expressions are eliminated from the list.
+ */
+static List *
+FilterOutNotSuitablePathsForReplIdentFull(List *pathlist)
+{
+	ListCell   *lc;
+	List *suitableIndexList = NIL;
+
+	foreach(lc, pathlist)
+	{
+		Path	   *path = (Path *) lfirst(lc);
+		Oid indexOid = GetIndexOidFromPath(path);
+		Relation indexRelation;
+		IndexInfo *indexInfo;
+		bool is_btree_index;
+		bool is_partial_index;
+		bool is_index_only_on_expression;
+
+		if (!OidIsValid(indexOid))
+		{
+			/* unrelated Path, skip */
+			suitableIndexList = lappend(suitableIndexList, path);
+			continue;
+		}
+
+		indexRelation = index_open(indexOid, AccessShareLock);
+		indexInfo = BuildIndexInfo(indexRelation);
+		is_btree_index = (indexInfo->ii_Am == BTREE_AM_OID);
+		is_partial_index = (indexInfo->ii_Predicate != NIL);
+		is_index_only_on_expression = IndexOnlyOnExpression(indexInfo);
+		index_close(indexRelation, NoLock);
+
+		if (!is_btree_index || is_partial_index || is_index_only_on_expression)
+			continue;
+
+		suitableIndexList = lappend(suitableIndexList, path);
+	}
+
+	return suitableIndexList;
+}
+
+
+/*
+ * GetCheapestReplicaIdentityFullPath generates all the possible paths
+ * for the given subscriber relation, assuming that the source relation
+ * is replicated via REPLICA IDENTITY FULL.
+ *
+ * The function assumes that all the columns will be provided during
+ * the execution phase, given that REPLICA IDENTITY FULL gurantees
+ * that.
+ */
+static Path *
+GetCheapestReplicaIdentityFullPath(Relation localrel)
+{
+	PlannerInfo *root;
+	Query	   *query;
+	PlannerGlobal *glob;
+	RangeTblEntry *rte;
+	RelOptInfo *rel;
+	int			attno;
+	RangeTblRef *rt;
+	List *joinList;
+	Path *seqScanPath;
+
+	/* Set up mostly-dummy planner state */
+	query = makeNode(Query);
+	query->commandType = CMD_SELECT;
+
+	glob = makeNode(PlannerGlobal);
+
+	root = makeNode(PlannerInfo);
+	root->parse = query;
+	root->glob = glob;
+	root->query_level = 1;
+	root->planner_cxt = CurrentMemoryContext;
+	root->wt_param_id = -1;
+
+	/* Build a minimal RTE for the rel */
+	rte = makeNode(RangeTblEntry);
+	rte->rtekind = RTE_RELATION;
+	rte->relid = localrel->rd_id;
+	rte->relkind = RELKIND_RELATION;
+	rte->rellockmode = AccessShareLock;
+	rte->lateral = false;
+	rte->inh = false;
+	rte->inFromCl = true;
+	query->rtable = list_make1(rte);
+
+	rt = makeNode(RangeTblRef);
+	rt->rtindex = 1;
+	joinList = list_make1(rt);
+
+	/* Set up RTE/RelOptInfo arrays */
+	setup_simple_rel_arrays(root);
+
+	/* Build RelOptInfo */
+	rel = build_simple_rel(root, 1, NULL);
+
+	/*
+	 * Generate restrictions for all columns in the form of col_1 = $1
+	 * AND col_2 = $2 ...
+	 */
+	for (attno = 0; attno < RelationGetNumberOfAttributes(localrel); attno++)
+	{
+		Form_pg_attribute attr = TupleDescAttr(localrel->rd_att, attno);
+
+		if (attr->attisdropped)
+		{
+			continue;
+		}
+		else
+		{
+			Expr	   *eq_op;
+			TypeCacheEntry *typentry;
+			RestrictInfo *restrict_info;
+			Var		   *leftarg;
+			Param	   *rightarg;
+			int			varno = 1;
+
+			typentry = lookup_type_cache(attr->atttypid,
+										 TYPECACHE_EQ_OPR_FINFO);
+
+			if (!OidIsValid(typentry->eq_opr))
+				continue;		/* no equality operator skip this column */
+
+			leftarg =
+				makeVar(varno, attr->attnum, attr->atttypid, attr->atttypmod,
+						attr->attcollation, 0);
+
+			rightarg = makeNode(Param);
+			rightarg->paramkind = PARAM_EXTERN;
+			rightarg->paramid = list_length(rel->baserestrictinfo) + 1;
+			rightarg->paramtype = attr->atttypid;
+			rightarg->paramtypmod = attr->atttypmod;
+			rightarg->paramcollid = attr->attcollation;
+			rightarg->location = -1;
+
+			eq_op = make_opclause(typentry->eq_opr, BOOLOID, false,
+								  (Expr *) leftarg, (Expr *) rightarg,
+								  InvalidOid, attr->attcollation);
+
+			restrict_info = make_simple_restrictinfo(root, eq_op);
+
+			rel->baserestrictinfo = lappend(rel->baserestrictinfo, restrict_info);
+		}
+	}
+
+	/*
+	 * Make sure the planner generates the relevant paths, including
+	 * all the possible index scans as well as sequential scan.
+	 */
+	rel = make_one_rel(root, joinList);
+
+	/*
+	 * Currently it is not possible for planner to pick a
+	 * partial index or indexes only on expressions. We
+	 * still want to be explicit and eliminate such
+	 * paths proactively.
+	 *
+	 * The reason that the planner would not pick partial
+	 * indexes and indexes with only expressions based
+	 * on the way currently baserestrictinfos are
+	 * formed (e.g., col_1 = $1 ... AND col_N = $2).
+	 *
+	 * For the partial indexes, check_index_predicates()
+	 * (via operator_predicate_proof()) checks whether the
+	 * predicate of the index is implied by the
+	 * baserestrictinfos. The check always returns false
+	 * because index predicates formed with CONSTs and
+	 * baserestrictinfos are formed with PARAMs. Hence,
+	 * partial indexes are never picked.
+	 *
+	 * Indexes that consists of only expressions (e.g.,
+	 * no simple column references on the index) are also
+	 * eliminated with a similar reasoning.
+	 * match_restriction_clauses_to_index() (via
+	 * match_index_to_operand()) eliminates the use of the
+	 * index if the restriction does not have the equal
+	 * expression with the index.
+	 *
+	 * We also eliminate non-btree indexes, which could be relaxed
+	 * if needed. If we allow non-btree indexes, we should adjust
+	 * RelationFindReplTupleByIndex() to support such indexes.
+	 */
+	rel->pathlist =
+		FilterOutNotSuitablePathsForReplIdentFull(rel->pathlist);
+
+	if (rel->pathlist == NIL)
+	{
+		/*
+		 * A sequential scan has could have been dominated by
+		 * by an index scan during make_one_rel(). We should always
+		 * have a sequential scan before set_cheapest().
+		 */
+		seqScanPath = create_seqscan_path(root, rel, NULL, 0);
+		add_path(rel, seqScanPath);
+	}
+
+	set_cheapest(rel);
+
+	Assert(rel->cheapest_total_path != NULL);
+
+	return rel->cheapest_total_path;
+}
+
+
+/*
+ * FindUsableIndexForReplicaIdentityFull returns an index oid if
+ * the planner submodules picks index scans over sequential scan.
+ *
+ * Note that this is not a generic function, it expects REPLICA IDENTITY
+ * FULL for the remote relation.
+ */
+static Oid
+FindUsableIndexForReplicaIdentityFull(Relation localrel)
+{
+	MemoryContext usableIndexContext;
+	MemoryContext oldctx;
+	Path 		*cheapest_total_path;
+	Oid			indexOid;
+
+	usableIndexContext = AllocSetContextCreate(CurrentMemoryContext,
+											   "usableIndexContext",
+											   ALLOCSET_DEFAULT_SIZES);
+	oldctx = MemoryContextSwitchTo(usableIndexContext);
+
+	cheapest_total_path = GetCheapestReplicaIdentityFullPath(localrel);
+
+	indexOid = GetIndexOidFromPath(cheapest_total_path);
+
+	MemoryContextSwitchTo(oldctx);
+
+	MemoryContextDelete(usableIndexContext);
+
+	return indexOid;
+}
+
+/*
+ * Get replica identity index or if it is not defined a primary key.
+ *
+ * If neither is defined, returns InvalidOid
+ */
+static Oid
+GetRelationIdentityOrPK(Relation rel)
+{
+	Oid			idxoid;
+
+	idxoid = RelationGetReplicaIndex(rel);
+
+	if (!OidIsValid(idxoid))
+		idxoid = RelationGetPrimaryKeyIndex(rel);
+
+	return idxoid;
+}
+
+/*
+ * LogicalRepUsableIndex returns an index oid if we can use an index
+ * for the apply side.
+ */
+static Oid
+LogicalRepUsableIndex(Relation localrel, LogicalRepRelation *remoterel)
+{
+	Oid			idxoid;
+
+	/*
+	 * We never need index oid for partitioned tables, always rely on leaf
+	 * partition's index.
+	 */
+	if (localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		return InvalidOid;
+
+	/* simple case, we already have an identity or pkey */
+	idxoid = GetRelationIdentityOrPK(localrel);
+	if (OidIsValid(idxoid))
+		return idxoid;
+
+	/* indexscans are disabled, use seq. scan */
+	if (!enable_indexscan)
+		return InvalidOid;
+
+	if (remoterel->replident == REPLICA_IDENTITY_FULL &&
+		RelationGetIndexList(localrel) != NIL)
+	{
+		/*
+		 * If we had a primary key or relation identity with a unique index,
+		 * we would have already found a valid oid. At this point, the remote
+		 * relation has replica identity full and we have at least one local
+		 * index defined.
+		 *
+		 * We are looking for one more opportunity for using an index. If
+		 * there are any indexes defined on the local relation, try to pick
+		 * the cheapest index.
+		 *
+		 * The index selection safely assumes that all the columns are going
+		 * to be available for the index scan given that remote relation has
+		 * replica identity full.
+		 */
+		return FindUsableIndexForReplicaIdentityFull(localrel);
+	}
+
+	return InvalidOid;
 }
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5f8c541763..6470c38401 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -331,6 +331,8 @@ static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
 										 TupleTableSlot *remoteslot);
+static Oid	usable_indexoid_internal(ApplyExecutionData *edata,
+									 ResultRelInfo *relinfo);
 static void apply_handle_update_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
 										 TupleTableSlot *remoteslot,
@@ -339,6 +341,7 @@ static void apply_handle_delete_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
 										 TupleTableSlot *remoteslot);
 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
+									Oid localidxoid,
 									LogicalRepRelation *remoterel,
 									TupleTableSlot *remoteslot,
 									TupleTableSlot **localslot);
@@ -1586,24 +1589,6 @@ apply_handle_type(StringInfo s)
 	logicalrep_read_typ(s, &typ);
 }
 
-/*
- * Get replica identity index or if it is not defined a primary key.
- *
- * If neither is defined, returns InvalidOid
- */
-static Oid
-GetRelationIdentityOrPK(Relation rel)
-{
-	Oid			idxoid;
-
-	idxoid = RelationGetReplicaIndex(rel);
-
-	if (!OidIsValid(idxoid))
-		idxoid = RelationGetPrimaryKeyIndex(rel);
-
-	return idxoid;
-}
-
 /*
  * Check that we (the subscription owner) have sufficient privileges on the
  * target relation to perform the given operation.
@@ -1753,7 +1738,7 @@ check_relation_updatable(LogicalRepRelMapEntry *rel)
 	 * We are in error mode so it's fine this is somewhat slow. It's better to
 	 * give user correct error.
 	 */
-	if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
+	if (OidIsValid(rel->usableIndexOid))
 	{
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1896,11 +1881,13 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 	TupleTableSlot *localslot;
 	bool		found;
 	MemoryContext oldctx;
+	Oid			usableIndexOid = usable_indexoid_internal(edata, relinfo);
 
 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
 	ExecOpenIndices(relinfo, false);
 
 	found = FindReplTupleInLocalRel(estate, localrel,
+									usableIndexOid,
 									&relmapentry->remoterel,
 									remoteslot, &localslot);
 	ExecClearTuple(remoteslot);
@@ -2034,12 +2021,14 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 	EPQState	epqstate;
 	TupleTableSlot *localslot;
 	bool		found;
+	Oid			usableIndexOid = usable_indexoid_internal(edata, relinfo);
 
 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
 	ExecOpenIndices(relinfo, false);
 
-	found = FindReplTupleInLocalRel(estate, localrel, remoterel,
-									remoteslot, &localslot);
+
+	found = FindReplTupleInLocalRel(estate, localrel, usableIndexOid,
+									remoterel, remoteslot, &localslot);
 
 	/* If found delete it. */
 	if (found)
@@ -2069,20 +2058,63 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 	EvalPlanQualEnd(&epqstate);
 }
 
+
+/*
+ * Decide whether we can pick an index for the relinfo (e.g., the relation)
+ * we're actually deleting/updating from. If it is a child partition of
+ * edata->targetRelInfo, find the index on the partition.
+ */
+static Oid
+usable_indexoid_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo)
+{
+	Oid			usableIndexOid;
+	ResultRelInfo *targetResultRelInfo = edata->targetRelInfo;
+	LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+
+	Oid			targetrelid = targetResultRelInfo->ri_RelationDesc->rd_rel->oid;
+	Oid			localrelid = relinfo->ri_RelationDesc->rd_id;
+
+	if (targetrelid == localrelid)
+	{
+		/* target is a regular table */
+		usableIndexOid = relmapentry->usableIndexOid;
+	}
+	else
+	{
+		/*
+		 * Target is a partitioned table, get the index oid the partition.
+		 */
+		TupleConversionMap *map = relinfo->ri_RootToPartitionMap;
+		AttrMap    *attrmap = map ? map->attrMap : NULL;
+
+		LogicalRepPartMapEntry *part_entry =
+		logicalrep_partition_open(relmapentry, relinfo->ri_RelationDesc,
+								  attrmap);
+
+		Assert(targetResultRelInfo->ri_RelationDesc->rd_rel->relkind ==
+			   RELKIND_PARTITIONED_TABLE);
+
+		usableIndexOid = part_entry->relmapentry.usableIndexOid;
+	}
+
+	return usableIndexOid;
+}
+
+
 /*
  * Try to find a tuple received from the publication side (in 'remoteslot') in
  * the corresponding local relation using either replica identity index,
- * primary key or if needed, sequential scan.
+ * primary key, index or if needed, sequential scan.
  *
  * Local tuple, if found, is returned in '*localslot'.
  */
 static bool
 FindReplTupleInLocalRel(EState *estate, Relation localrel,
+						Oid localidxoid,
 						LogicalRepRelation *remoterel,
 						TupleTableSlot *remoteslot,
 						TupleTableSlot **localslot)
 {
-	Oid			idxoid;
 	bool		found;
 
 	/*
@@ -2093,12 +2125,11 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
 
 	*localslot = table_slot_create(localrel, &estate->es_tupleTable);
 
-	idxoid = GetRelationIdentityOrPK(localrel);
-	Assert(OidIsValid(idxoid) ||
+	Assert(OidIsValid(localidxoid) ||
 		   (remoterel->replident == REPLICA_IDENTITY_FULL));
 
-	if (OidIsValid(idxoid))
-		found = RelationFindReplTupleByIndex(localrel, idxoid,
+	if (OidIsValid(localidxoid))
+		found = RelationFindReplTupleByIndex(localrel, localidxoid,
 											 LockTupleExclusive,
 											 remoteslot, *localslot);
 	else
@@ -2128,7 +2159,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 	TupleTableSlot *remoteslot_part;
 	TupleConversionMap *map;
 	MemoryContext oldctx;
-	LogicalRepRelMapEntry *part_entry = NULL;
+	LogicalRepPartMapEntry *part_entry = NULL;
 	AttrMap    *attrmap = NULL;
 
 	/* ModifyTableState is needed for ExecFindPartition(). */
@@ -2178,7 +2209,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 	{
 		part_entry = logicalrep_partition_open(relmapentry, partrel,
 											   attrmap);
-		check_relation_updatable(part_entry);
+		check_relation_updatable(&part_entry->relmapentry);
 	}
 
 	switch (operation)
@@ -2202,13 +2233,17 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 			 * suitable partition.
 			 */
 			{
+				LogicalRepRelMapEntry *entry;
 				TupleTableSlot *localslot;
 				ResultRelInfo *partrelinfo_new;
 				bool		found;
 
+				entry = &part_entry->relmapentry;
+
 				/* Get the matching local tuple from the partition. */
 				found = FindReplTupleInLocalRel(estate, partrel,
-												&part_entry->remoterel,
+												part_entry->relmapentry.usableIndexOid,
+												&entry->remoterel,
 												remoteslot_part, &localslot);
 				if (!found)
 				{
@@ -2230,7 +2265,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 				 * remoteslot_part.
 				 */
 				oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-				slot_modify_data(remoteslot_part, localslot, part_entry,
+				slot_modify_data(remoteslot_part, localslot, entry,
 								 newtup);
 				MemoryContextSwitchTo(oldctx);
 
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 78cd7e77f5..e479977b85 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -13,6 +13,7 @@
 #define LOGICALRELATION_H
 
 #include "access/attmap.h"
+#include "catalog/index.h"
 #include "replication/logicalproto.h"
 
 typedef struct LogicalRepRelMapEntry
@@ -31,20 +32,29 @@ typedef struct LogicalRepRelMapEntry
 	Relation	localrel;		/* relcache entry (NULL when closed) */
 	AttrMap    *attrmap;		/* map of local attributes to remote ones */
 	bool		updatable;		/* Can apply updates/deletes? */
+	Oid			usableIndexOid; /* which index to use? (Invalid when no index
+								 * used) */
 
 	/* Sync state. */
 	char		state;
 	XLogRecPtr	statelsn;
 } LogicalRepRelMapEntry;
 
+typedef struct LogicalRepPartMapEntry
+{
+	Oid			partoid;		/* LogicalRepPartMap's key */
+	LogicalRepRelMapEntry relmapentry;
+} LogicalRepPartMapEntry;
+
 extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
 extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel);
 
 extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
 												  LOCKMODE lockmode);
-extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root,
-														Relation partrel, AttrMap *map);
+extern LogicalRepPartMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root,
+														 Relation partrel, AttrMap *map);
 extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 								 LOCKMODE lockmode);
+extern bool IndexOnlyOnExpression(IndexInfo *indexInfo);
 
 #endif							/* LOGICALRELATION_H */
diff --git a/src/test/subscription/t/032_subscribe_use_index.pl b/src/test/subscription/t/032_subscribe_use_index.pl
new file mode 100644
index 0000000000..1757754510
--- /dev/null
+++ b/src/test/subscription/t/032_subscribe_use_index.pl
@@ -0,0 +1,939 @@
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test logical replication behavior with subscriber uses available index
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	"wal_retrieve_retry_interval = 1ms");
+
+# we don't want planner to pick bitmap scans instead of index scans
+$node_subscriber->append_conf('postgresql.conf',
+       "enable_bitmapscan = off");
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+my $appname           = 'tap_sub';
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX
+#
+# Basic test where the subscriber uses index
+# and only touches 1 row
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i FROM generate_series(0,21)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates one row via index";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test_replica_id_full WHERE x = 20;");
+$node_publisher->wait_for_catchup($appname);
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 2) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for'check subscriber tap_sub_rep_full_0 deletes one row via index";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_0");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX MODIFIES MULTIPILE ROWS
+#
+# Basic test where the subscriber uses index
+# and only touches multiple rows
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+
+# insert some initial data within the range 0-1000
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i%20 FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_2 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# updates 1000 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 50) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_2 updates 50 rows via index";
+
+my $count_from_table =
+  $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full where x = 15;");
+is($count_from_table, qq(0),
+	'check subscriber tap_sub_rep_full_2 for no rows remaining with x=15');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_2");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX MODIFIES MULTIPILE ROWS
+# ====================================================================
+
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX WITH MULTIPILE COLUMNS
+#
+# Basic test where the subscriber uses index
+# and only touches multiple rows
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y text)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y text)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y)");
+
+# insert some initial data within the range 0-1000
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT (i%10), (i%10)::text FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_3 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# updates 1000 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x IN (5, 6);");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 200) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_3 updates 200 rows via index";
+
+my $sum_from_table =
+  $node_subscriber->safe_psql('postgres',
+	"select sum(x+y::int) from test_replica_id_full;");
+is($sum_from_table, qq(9200),
+	'check subscriber tap_sub_rep_full_3 for query result');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_3");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX MODIFIES MULTIPILE ROWS
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX WITH DROPPED COLUMNS
+#
+# Basic test where the subscriber uses index
+# and only touches multiple rows
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (drop_1 jsonb, x int, drop_2 point, y text, drop_3 timestamptz)"
+);
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_1");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_2");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_3");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (drop_1 jsonb, x int, drop_2 point, y text, drop_3 timestamptz)"
+);
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_1");
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_2");
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_3");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y)");
+
+# insert some initial data within the range 0-1000
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT (i%10), (i%10)::text FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_4 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# updates 1000 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x IN (5, 6);");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 200) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_4 updates 200 rows via index'";
+
+my $sum_from_table_2 =
+  $node_subscriber->safe_psql('postgres',
+	"select sum(x+y::int) from test_replica_id_full;");
+is($sum_from_table_2, qq(9200),
+	'check subscriber tap_sub_rep_full_4 for query result');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_4");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX WITH DROPPED COLUMNS
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX ON PARTITIONED TABLES
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_0 REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_1 REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX users_table_part_idx ON users_table_part(user_id, value_1)"
+);
+
+# insert some initial data within the range 0-1000
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO users_table_part SELECT (i%100), (i%20), i FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE users_table_part");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_5 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# updates rows and moves between partitions
+$node_publisher->safe_psql('postgres',
+	"UPDATE users_table_part SET value_1 = 0 WHERE user_id = 4;");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=10 from pg_stat_all_indexes where indexrelname ilike 'users_table_part_%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates partitioned table'";
+
+# updates rows and moves between partitions
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE user_id = 1 and value_1 = 1;");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE user_id = 12 and value_1 = 12;");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=30 from pg_stat_all_indexes where indexrelname ilike 'users_table_part_%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates partitioned table'";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_5");
+$node_subscriber->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# Testcase end: SUBSCRIPTION USES INDEX ON PARTITIONED TABLES
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION DOES NOT USE PARTIAL INDEX
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full_part_index (x int);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full_part_index REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full_part_index (x int);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_part_idx ON test_replica_id_full_part_index(x) WHERE (x = 5);");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full_part_index SELECT i FROM generate_series(0,21)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full_part_index");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# update 2 rows, one of them is indexed
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full_part_index SET x = x + 1 WHERE x = 5;");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full_part_index SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=0 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_part_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates one row via seq. scan with with partial index'";
+
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=0 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_part_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates one row via seq. scan with with partial index'";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full_part_index");
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_0");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full_part_index");
+
+# Testcase end: SUBSCRIPTION DOES NOT USE PARTIAL INDEX
+# ====================================================================
+
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION DOES NOT USE INDEXES WITH ONLY EXPRESSIONS
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE people REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX people_names ON people ((firstname || ' ' || lastname));");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO people SELECT 'first_name_' || i::text, 'last_name_' || i::text FROM generate_series(0,200)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE people");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# update 2 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_1';");
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_2' AND lastname = 'last_name_2';");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=0 from pg_stat_all_indexes where indexrelname = 'people_names';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates two rows via seq. scan with index on expressions";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'first_name_3';");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'first_name_4' AND lastname = 'last_name_4';");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=0 from pg_stat_all_indexes where indexrelname = 'people_names';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 deletes one row via seq. scan with index on expressions";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE people");
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_0");
+$node_subscriber->safe_psql('postgres', "DROP TABLE people");
+
+# Testcase end: SUBSCRIPTION DOES NOT USE INDEXES WITH ONLY EXPRESSIONS
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION CAN USE INDEXES WITH EXPRESSIONS AND COLUMNS
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE people REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX people_names ON people (firstname, lastname, (firstname || ' ' || lastname));");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO people SELECT 'first_name_' || i::text, 'last_name_' || i::text FROM generate_series(0, 200)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE people");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# update 2 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_1';");
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_3' AND lastname = 'last_name_3';");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'people_names';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates two rows via index scan with index on expressions and columns";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'Nan';");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=4 from pg_stat_all_indexes where indexrelname = 'people_names';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 deletes two rows via index scan with index on expressions and columns";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE people");
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_0");
+$node_subscriber->safe_psql('postgres', "DROP TABLE people");
+
+# Testcase end: SUBSCRIPTION CAN USE INDEXES WITH EXPRESSIONS AND COLUMNS
+# ====================================================================
+
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test (column_a int, column_b int);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test (column_a int, column_b int);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX index_a ON test (column_a);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX index_b ON test (column_b);");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test SELECT i,0 FROM generate_series(0, 2000)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_subscriber->safe_psql('postgres', "ANALYZE test;");
+
+# update 2 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test SET column_b = column_b + 1 WHERE column_a = 15;");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test WHERE column_a = 20;");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'index_a';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates two rows via index scan with index on high cardinality column-1";
+
+# do not use index_b
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=0 from pg_stat_all_indexes where indexrelname = 'index_b';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates two rows via index scan with index on high cardinality column-2";
+
+# insert data such that the cardinality of column_b becomes much higher
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test SELECT 0,i FROM generate_series(0, 20000)i;");
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->safe_psql('postgres', "ANALYZE test;");
+
+# update 2 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test SET column_a = column_a + 1 WHERE column_b = 150;");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test WHERE column_b = 200;");
+
+
+# do not use index_a anymore
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'index_a';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates two rows via index scan with index on high cardinality column-3";
+
+# now, use index_b as well
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'index_b';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates two rows via index scan with index on high cardinality column-4";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test");
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_0");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test");
+
+# Testcase end: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE
+# ====================================================================
+
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE - PARTITIONED TABLE
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (user_id);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_0 REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_1 REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (user_id);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX users_table_ind_on_value_1 ON users_table_part(value_1)"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX users_table_ind_on_value_2 ON users_table_part(value_2)"
+);
+
+# insert some initial data where cardinality of value_1 is high, and cardinality of value_2 is very low
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO users_table_part SELECT (i%20), i, i%2 FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE users_table_part");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_5 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_subscriber->safe_psql('postgres',	"ANALYZE users_table_part");
+
+# updates rows and moves between partitions
+$node_publisher->safe_psql('postgres',
+	"UPDATE users_table_part SET value_1 = 0 WHERE value_1 = 30;");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=1 from pg_stat_all_indexes where indexrelname ilike 'users_table%value%1%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates partitioned table'";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE value_1 = 40");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=2 from pg_stat_all_indexes where indexrelname ilike 'users_table%value%1%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates partitioned table'";
+
+
+# now, load some more data where cardinality of value_2 is high, and cardinality of value_1 is very low
+$node_publisher->safe_psql('postgres', "TRUNCATE users_table_part");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO users_table_part SELECT (i%20), i%2, i FROM generate_series(0,10000)i;"
+);
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->safe_psql('postgres',	"ANALYZE users_table_part");
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE users_table_part SET value_1 = 0 WHERE value_2 = 3000;");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=1 from pg_stat_all_indexes where indexrelname ilike 'users_table%value%2%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates partitioned table'";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE value_2 = 4000");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=2 from pg_stat_all_indexes where indexrelname ilike 'users_table%value%2%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates partitioned table'";
+
+
+# finally, make sure that even an index is only defined on a partition (e.g., not inherited from parent)
+# we can still use that
+
+$node_subscriber->safe_psql('postgres',
+	"DROP INDEX users_table_ind_on_value_1;"
+);
+$node_subscriber->safe_psql('postgres',
+	"DROP INDEX users_table_ind_on_value_2;"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX users_table_ind_on_value_2 ON users_table_part_0(value_2)"
+);
+
+# now, load some more data where cardinality of value_2 is high, and cardinality of value_1 is very low
+$node_publisher->safe_psql('postgres', "TRUNCATE users_table_part");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO users_table_part SELECT (i%20), i%2, i FROM generate_series(0,10000)i;"
+);
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->safe_psql('postgres',	"ANALYZE users_table_part");
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE users_table_part SET value_1 = 0 WHERE value_2 > 3000 AND user_id = 0;");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=350 from pg_stat_all_indexes where indexrelname ilike 'users_table_ind_on_value_2';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates partitioned table with index on partition'";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_5");
+$node_subscriber->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# Testcase start: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE - PARTITIONED TABLE
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE - INHERITED TABLE
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE parent (a int);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE child_1 (b int) inherits (parent);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE child_2 (b int) inherits (parent);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE parent REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE child_1 REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE child_2 REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE parent (a int);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE child_1 (b int) inherits (parent);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE child_2 (b int) inherits (parent);"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX index_on_parent ON parent(a)"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX index_on_child_1_a ON child_1(a)"
+);
+
+# create another index on the child on a column with higher cardinality
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX index_on_child_1_b ON child_1(b)"
+);
+
+# insert some initial data where cardinality of value_1 is high, and cardinality of value_2 is very low
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO parent SELECT i FROM generate_series(0,1000)i;");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO child_1 SELECT (i%500), 0 FROM generate_series(0,1000)i;");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO child_2 SELECT (i%500), 0 FROM generate_series(0,1000)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE parent");
+
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_5 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+$node_subscriber->wait_for_subscription_sync;
+$node_subscriber->wait_for_subscription_sync;
+
+$node_subscriber->safe_psql('postgres',	"ANALYZE parent");
+$node_subscriber->safe_psql('postgres',	"ANALYZE child_1");
+$node_subscriber->safe_psql('postgres',	"ANALYZE child_2");
+
+# updates rows and moves between partitions
+$node_publisher->safe_psql('postgres',
+	"UPDATE parent SET a = 0 WHERE a = 10;");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=1 from pg_stat_all_indexes where indexrelname = 'index_on_parent';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates parent table'";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM child_1 WHERE a = 250");
+
+# we check if the index is used or not. 2 rows from first command, another 2 from the second commsnd
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=4 from pg_stat_all_indexes where indexrelname = 'index_on_child_1_a';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates child_1 table'";
+
+
+# insert some more data where cardinality of column b is high on child_1
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO child_1 SELECT 0, i FROM generate_series(0,10000)i;",
+);
+$node_publisher->wait_for_catchup($appname);
+
+# ANALYZING parent wouldn't change the index used on child_1, still use index_on_child_1_a
+$node_subscriber->safe_psql('postgres',	"ANALYZE parent");
+$node_subscriber->safe_psql('postgres',	"ANALYZE child_1");
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM child_1 WHERE b = 41");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=1 from pg_stat_all_indexes where indexrelname = 'index_on_child_1_b';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates child_1 table'";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE parent, child_1, child_2");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_5");
+$node_subscriber->safe_psql('postgres', "DROP TABLE parent, child_1, child_2");
+
+# Testcase end: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE - INHERITED TABLE
+# ====================================================================
+
+# ====================================================================
+# Testcase start: Some NULL values
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y int);"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y);"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_5 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full VALUES (1), (2), (3);");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 1;");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 3;");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates parent table'";
+
+# also, make sure results are expected
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(x)=8 from test_replica_id_full;}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates table with null values'";
+
+$node_subscriber->poll_query_until(
+	'postgres', q{select count(*)=3 from test_replica_id_full WHERE y IS NULL;}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates table with null values - 2'";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_5");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: Some NULL values
+# ====================================================================
+
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
+
+done_testing();
-- 
2.34.1

Reply via email to