Hi Amit, all

Amit Kapila <amit.kapil...@gmail.com>, 6 Mar 2023 Pzt, 12:40 tarihinde şunu
yazdı:

> On Fri, Mar 3, 2023 at 6:40 PM Önder Kalacı <onderkal...@gmail.com> wrote:
> >
> > Hi Vignesh,
> >
> > Thanks for the review
> >
> >>
> >> 1) We are currently calling RelationGetIndexList twice, once in
> >> FindUsableIndexForReplicaIdentityFull function and in the caller too,
> >> we could avoid one of the calls by passing the indexlist to the
> >> function or removing the check here, index list check can be handled
> >> in FindUsableIndexForReplicaIdentityFull.
> >> +       if (remoterel->replident == REPLICA_IDENTITY_FULL &&
> >> +               RelationGetIndexList(localrel) != NIL)
> >> +       {
> >> +               /*
> >> +                * If we had a primary key or relation identity with a
> >> unique index,
> >> +                * we would have already found and returned that oid.
> >> At this point,
> >> +                * the remote relation has replica identity full and
> >> we have at least
> >> +                * one local index defined.
> >> +                *
> >> +                * We are looking for one more opportunity for using
> >> an index. If
> >> +                * there are any indexes defined on the local
> >> relation, try to pick
> >> +                * a suitable index.
> >> +                *
> >> +                * The index selection safely assumes that all the
> >> columns are going
> >> +                * to be available for the index scan given that
> >> remote relation has
> >> +                * replica identity full.
> >> +                */
> >> +               return FindUsableIndexForReplicaIdentityFull(localrel);
> >> +       }
> >> +
> >
> > makes sense, done
> >
>
> Today, I was looking at this comment and the fix for it. It seems to
> me that it would be better to not add the check (indexlist != NIL)
> here and rather get the indexlist in
> FindUsableIndexForReplicaIdentityFull(). It will anyway return
> InvalidOid, if there is no index and that way code will look a bit
> cleaner.
>
>
Yeah, seems easier to follow to me as well. Reflected it in the comment as
well.


Thanks,
Onder
From 42d1ccde2a3e904c31f47aa33bbb006bf33626dd Mon Sep 17 00:00:00 2001
From: Onder Kalaci <onderkalaci@gmail.com>
Date: Fri, 3 Mar 2023 18:20:57 +0300
Subject: [PATCH v33 2/2] Optionally disable index scan when replica identity
 is full

When replica identitiy is full, the logical replication apply workers
for subscription is capable of using index scans. However, index scans
might have some overhead compared to sequential scan. The main use case
for disabling the index scan could be that the table has many dead
tuples as well as many duplicates, where index scan might be slower
compared to sequential scan.

We add a new option to table storage parameter that the user can
control the behavior.
---
 doc/src/sgml/ref/create_table.sgml            | 16 +++++++++-
 src/backend/access/common/reloptions.c        | 11 +++++++
 src/include/utils/rel.h                       | 12 +++++++
 .../subscription/t/032_subscribe_use_index.pl | 31 +++++++------------
 4 files changed, 49 insertions(+), 21 deletions(-)

diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml
index a03dee4afe..89eb154730 100644
--- a/doc/src/sgml/ref/create_table.sgml
+++ b/doc/src/sgml/ref/create_table.sgml
@@ -1812,7 +1812,21 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
     </listitem>
    </varlistentry>
 
-   </variablelist>
+  <varlistentry id="reloption-ri-index-scan" xreflabel="enable_replica_identity_full_index_scan">
+    <term><literal>enable_replica_identity_full_index_scan</literal> (<type>boolean</type>)
+    <indexterm>
+     <primary><varname>enable_replica_identity_full_index_scan</varname> storage parameter</primary>
+    </indexterm>
+    </term>
+    <listitem>
+     <para>
+      Controls using avaliable indexes on the apply worker when replica identity is full. See
+      <xref linkend="logical-replication-publication"/> for details. The default is true.
+     </para>
+    </listitem>
+   </varlistentry>
+
+  </variablelist>
 
   </refsect2>
  </refsect1>
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index 14c23101ad..3e54e4a52b 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -168,6 +168,15 @@ static relopt_bool boolRelOpts[] =
 		},
 		true
 	},
+	{
+		{
+			"enable_replica_identity_full_index_scan",
+			"Enables index scan on logical replication apply worker",
+			RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+			ShareUpdateExclusiveLock
+		},
+		true
+	},
 	/* list terminator */
 	{{NULL}}
 };
@@ -1877,6 +1886,8 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind)
 		offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, analyze_scale_factor)},
 		{"user_catalog_table", RELOPT_TYPE_BOOL,
 		offsetof(StdRdOptions, user_catalog_table)},
+		{"enable_replica_identity_full_index_scan", RELOPT_TYPE_BOOL,
+		offsetof(StdRdOptions, enable_replica_identity_full_index_scan)},
 		{"parallel_workers", RELOPT_TYPE_INT,
 		offsetof(StdRdOptions, parallel_workers)},
 		{"vacuum_index_cleanup", RELOPT_TYPE_ENUM,
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 67f994cb3e..fdd02130da 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -336,6 +336,8 @@ typedef struct StdRdOptions
 	int			toast_tuple_target; /* target for tuple toasting */
 	AutoVacOpts autovacuum;		/* autovacuum-related options */
 	bool		user_catalog_table; /* use as an additional catalog relation */
+	bool		enable_replica_identity_full_index_scan; /* enables index scan
+															for RI full */
 	int			parallel_workers;	/* max number of parallel workers */
 	StdRdOptIndexCleanup vacuum_index_cleanup;	/* controls index vacuuming */
 	bool		vacuum_truncate;	/* enables vacuum to truncate a relation */
@@ -385,6 +387,16 @@ typedef struct StdRdOptions
 	  (relation)->rd_rel->relkind == RELKIND_MATVIEW) ? \
 	 ((StdRdOptions *) (relation)->rd_options)->user_catalog_table : false)
 
+/*
+ * RelationReplicaIdentityFullIndexScanEnabled
+ *		Returns whether index scan should be enabled when replica
+ *		identity is full.
+ */
+#define RelationReplicaIdentityFullIndexScanEnabled(relation)	\
+	((relation)->rd_options && \
+	 (relation)->rd_rel->relkind == RELKIND_RELATION ? \
+	 ((StdRdOptions *) (relation)->rd_options)->enable_replica_identity_full_index_scan : true)
+
 /*
  * RelationGetParallelWorkers
  *		Returns the relation's parallel_workers reloption setting.
diff --git a/src/test/subscription/t/032_subscribe_use_index.pl b/src/test/subscription/t/032_subscribe_use_index.pl
index e8c75ea9a1..a05690717f 100644
--- a/src/test/subscription/t/032_subscribe_use_index.pl
+++ b/src/test/subscription/t/032_subscribe_use_index.pl
@@ -761,11 +761,12 @@ $node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
 # ====================================================================
 
 # ====================================================================
-# Testcase start: SUBSCRIPTION BEHAVIOR WITH ENABLE_INDEXSCAN
+# Testcase start: SUBSCRIPTION BEHAVIOR WITH enable_replica_identity_full_index_scan=false
 #
-# Even if enable_indexscan = false, we do use the primary keys, this
-# is the legacy behavior. However, we do not use non-primary/non replica
-# identity columns.
+# When the table has storage parameter enable_replica_identity_full_index_scan = false, even
+# if there is a suitable index to use.
+#
+# Note that this is not relevant when the table has PK/RI. In that case, index is always used.
 #
 
 # create tables pub and sub
@@ -774,13 +775,9 @@ $node_publisher->safe_psql('postgres',
 $node_publisher->safe_psql('postgres',
 	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
 $node_subscriber->safe_psql('postgres',
-	"CREATE TABLE test_replica_id_full (x int NOT NULL)");
+	"CREATE TABLE test_replica_id_full (x int NOT NULL) WITH (enable_replica_identity_full_index_scan = false)");
 $node_subscriber->safe_psql('postgres',
 	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
-$node_subscriber->safe_psql('postgres',
-	"ALTER SYSTEM SET enable_indexscan TO off;");
-$node_subscriber->safe_psql('postgres',
-	"SELECT pg_reload_conf();");
 
 # insert some initial data
 $node_publisher->safe_psql('postgres',
@@ -800,10 +797,10 @@ $node_publisher->safe_psql('postgres',
 	"UPDATE test_replica_id_full SET x = x + 10000 WHERE x = 15;");
 $node_publisher->wait_for_catchup($appname);
 
-# show that index is not used when enable_indexscan=false
+# show that index is not used when enable_replica_identity_full_index_scan=false
 $result = $node_subscriber->safe_psql('postgres',
 	"select idx_scan from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx'");
-is($result, qq(0), 'ensure subscriber has not used index with enable_indexscan=false');
+is($result, qq(0), 'ensure subscriber has not used index with enable_replica_identity_full_index_scan=false');
 
 # we are done with this index, drop to simplify the tests
 $node_subscriber->safe_psql('postgres',
@@ -831,11 +828,11 @@ $node_publisher->wait_for_catchup($appname);
 # this is a legacy behavior
 $node_subscriber->poll_query_until(
 	'postgres', q{select (idx_scan=1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_unique'}
-) or die "Timed out while waiting ensuring subscriber used unique index as replica identity even with enable_indexscan=false";
+) or die "Timed out while waiting ensuring subscriber used unique index as replica identity even with enable_replica_identity_full_index_scan=false";
 
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*) FROM test_replica_id_full WHERE x IN (14,15)");
-is($result, qq(0), 'ensure the results are accurate even with enable_indexscan=false');
+is($result, qq(0), 'ensure the results are accurate even with enable_replica_identity_full_index_scan=false');
 
 # cleanup pub
 $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
@@ -843,13 +840,7 @@ $node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
 # cleanup sub
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
 $node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
-
-$node_subscriber->safe_psql('postgres',
-	"ALTER SYSTEM RESET enable_indexscan;");
-$node_subscriber->safe_psql('postgres',
-	"SELECT pg_reload_conf();");
-
-# Testcase end: SUBSCRIPTION BEHAVIOR WITH ENABLE_INDEXSCAN
+# Testcase end: SUBSCRIPTION BEHAVIOR WITH enable_replica_identity_full_index_scan=false
 # ====================================================================
 
 # ====================================================================
-- 
2.34.1

From 0f9b7eeeac6177d0f6c29faf9c0cc673830102bf Mon Sep 17 00:00:00 2001
From: Onder Kalaci <onderkalaci@gmail.com>
Date: Wed, 22 Feb 2023 14:12:56 +0300
Subject: [PATCH v33 1/2] Use indexes on the subscriber when REPLICA IDENTITY
 is full on the publisher

Using `REPLICA IDENTITY FULL` on the publication leads to a full table
scan per tuple change on the subscription. This makes `REPLICA
IDENTITY FULL` impracticable -- probably other than some small number
of use cases.

With this patch, I'm proposing the following change: If there is any
index on the subscriber, let the apply worker use an index. The index
should be a btree index, not a partial index, and it should have at
least one column reference (i.e. cannot consist of only expressions).

There is no smart mechanism to pick the index. If there is more than
one index that satisfies these requirements, we just pick the first
one.

The majority of the logic on the subscriber side already exists in
the code. The subscriber is already capable of doing (unique) index
scans.  With this patch, we are allowing the index to iterate over the
tuples fetched and only act when tuples are equal. Anyone familiar
with this part of the code might recognize that the sequential scan
code on the subscriber already implements the `tuples_equal()`
function. In short, the changes on the subscriber are mostly
combining parts of (unique) index scan and sequential scan codes.

From the performance point of view, there are a few things to note.
First, the patch aims not to change the behavior when PRIMARY KEY
or UNIQUE INDEX is used. Second, when REPLICA IDENTITY FULL is on
the publisher and an index is used on the subscriber, the
difference mostly comes down to `index scan` vs `sequential scan`.
That's why it is hard to claim certain number of improvements.
It mostly depends on the data size, index and the data distribution.

Still, below I try to showcase the potential improvements using an
index on the subscriber `pgbench_accounts(bid)`. With the index,
all the changes are replicated within ~5 seconds. When the index
is dropped, the same operation takes around ~300 seconds.

// init source db
pgbench -i -s 100 -p 5432 postgres
psql -c "ALTER TABLE pgbench_accounts DROP CONSTRAINT pgbench_accounts_pkey;" -p 5432 postgres
psql -c "CREATE INDEX i1 ON pgbench_accounts(aid);" -p 5432 postgres
psql -c "ALTER TABLE pgbench_accounts REPLICA IDENTITY FULL;" -p 5432 postgres
psql -c "CREATE PUBLICATION pub_test_1 FOR TABLE pgbench_accounts;" -p 5432 postgres

// init target db, drop existing primary key
pgbench -i -p 9700 postgres
psql -c "TRUNCATE pgbench_accounts;" -p 9700 postgres
psql -c "ALTER TABLE pgbench_accounts DROP CONSTRAINT pgbench_accounts_pkey;" -p 9700 postgres
psql -c "CREATE SUBSCRIPTION sub_test_1 CONNECTION 'host=localhost port=5432 user=onderkalaci dbname=postgres' PUBLICATION pub_test_1;" -p 9700 postgres

// create one index, even on a low cardinality column
psql -c "CREATE INDEX i2 ON pgbench_accounts(bid);" -p 9700 postgres

// now, run some pgbench tests and observe replication
pgbench -t 500 -b tpcb-like -p 5432 postgres
---
 doc/src/sgml/logical-replication.sgml         |  14 +-
 src/backend/executor/execReplication.c        | 118 ++-
 src/backend/replication/logical/relation.c    | 194 ++++
 src/backend/replication/logical/worker.c      |  68 +-
 src/include/replication/logicalrelation.h     |   5 +
 src/test/subscription/meson.build             |   1 +
 .../subscription/t/032_subscribe_use_index.pl | 933 ++++++++++++++++++
 7 files changed, 1272 insertions(+), 61 deletions(-)
 create mode 100644 src/test/subscription/t/032_subscribe_use_index.pl

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 1bd5660c87..3124b8ad67 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -132,11 +132,19 @@
    certain additional requirements) can also be set to be the replica
    identity.  If the table does not have any suitable key, then it can be set
    to replica identity <quote>full</quote>, which means the entire row becomes
-   the key.  This, however, is very inefficient and should only be used as a
+   the key.  When replica identity <quote>full</quote> is specified,
+   indexes can be used on the subscriber side for searching the rows.  Candidate
+   indexes must be btree, non-partial, and have at least one column reference
+   (i.e. cannot consist of only expressions).  These restrictions
+   on the non-unique index properties adheres some of the restrictions that
+   are enforced for primary keys.  Internally, we follow a similar approach for
+   supporting index scans within logical replication scope.  If there are no
+   such suitable indexes, the search on the subscriber side can be very inefficient,
+   therefore replica identity <quote>full</quote> should only be used as a
    fallback if no other solution is possible.  If a replica identity other
    than <quote>full</quote> is set on the publisher side, a replica identity
-   comprising the same or fewer columns must also be set on the subscriber
-   side.  See <xref linkend="sql-altertable-replica-identity"/> for details on
+   comprising the same or fewer columns must also be set on the subscriber side.
+   See <xref linkend="sql-altertable-replica-identity"/> for details on
    how to set the replica identity.  If a table without a replica identity is
    added to a publication that replicates <command>UPDATE</command>
    or <command>DELETE</command> operations then
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index c484f5c301..4191e13b35 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -25,6 +25,7 @@
 #include "nodes/nodeFuncs.h"
 #include "parser/parse_relation.h"
 #include "parser/parsetree.h"
+#include "replication/logicalrelation.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "utils/builtins.h"
@@ -37,49 +38,65 @@
 #include "utils/typcache.h"
 
 
+static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
+						 TypeCacheEntry **eq);
+
 /*
  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
  * is setup to match 'rel' (*NOT* idxrel!).
  *
- * Returns whether any column contains NULLs.
+ * Returns how many columns should be used for the index scan.
+ *
+ * This is not generic routine, it expects the idxrel to be a btree, non-partial
+ * and have at least one column reference (i.e. cannot consist of only
+ * expressions).
  *
- * This is not generic routine, it expects the idxrel to be replication
- * identity of a rel and meet all limitations associated with that.
+ * By definition, replication identity of a rel meets all limitations associated
+ * with that. Note that any other index could also meet these limitations.
  */
-static bool
+static int
 build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 						 TupleTableSlot *searchslot)
 {
-	int			attoff;
+	int			index_attoff;
+	int			skey_attoff = 0;
 	bool		isnull;
 	Datum		indclassDatum;
 	oidvector  *opclass;
 	int2vector *indkey = &idxrel->rd_index->indkey;
-	bool		hasnulls = false;
-
-	Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
-		   RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
 
 	indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
 									Anum_pg_index_indclass, &isnull);
 	Assert(!isnull);
 	opclass = (oidvector *) DatumGetPointer(indclassDatum);
 
-	/* Build scankey for every attribute in the index. */
-	for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
+	/* Build scankey for every non-expression attribute in the index. */
+	for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
+		 index_attoff++)
 	{
 		Oid			operator;
 		Oid			opfamily;
 		RegProcedure regop;
-		int			pkattno = attoff + 1;
-		int			mainattno = indkey->values[attoff];
-		Oid			optype = get_opclass_input_type(opclass->values[attoff]);
+		int			table_attno = indkey->values[index_attoff];
+		Oid			optype = get_opclass_input_type(opclass->values[index_attoff]);
+
+		if (!AttributeNumberIsValid(table_attno))
+		{
+			/*
+			 * XXX: For a non-primary/unique index with an additional
+			 * expression, we do not have to continue at this point. However,
+			 * the below code assumes the index scan is only done for simple
+			 * column references. If we can relax the assumption in the below
+			 * code-block, we can also remove the continue.
+			 */
+			continue;
+		}
 
 		/*
 		 * Load the operator info.  We need this to get the equality operator
 		 * function for the scan key.
 		 */
-		opfamily = get_opclass_family(opclass->values[attoff]);
+		opfamily = get_opclass_family(opclass->values[index_attoff]);
 
 		operator = get_opfamily_member(opfamily, optype,
 									   optype,
@@ -91,23 +108,25 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 		regop = get_opcode(operator);
 
 		/* Initialize the scankey. */
-		ScanKeyInit(&skey[attoff],
-					pkattno,
+		ScanKeyInit(&skey[skey_attoff],
+					index_attoff + 1,
 					BTEqualStrategyNumber,
 					regop,
-					searchslot->tts_values[mainattno - 1]);
+					searchslot->tts_values[table_attno - 1]);
 
-		skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
+		skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
 
 		/* Check for null value. */
-		if (searchslot->tts_isnull[mainattno - 1])
-		{
-			hasnulls = true;
-			skey[attoff].sk_flags |= SK_ISNULL;
-		}
+		if (searchslot->tts_isnull[table_attno - 1])
+			skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
+
+		skey_attoff++;
 	}
 
-	return hasnulls;
+	/* There should always be at least one attribute for the index scan. */
+	Assert(skey_attoff > 0);
+
+	return skey_attoff;
 }
 
 /*
@@ -123,33 +142,62 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 							 TupleTableSlot *outslot)
 {
 	ScanKeyData skey[INDEX_MAX_KEYS];
+	int			skey_attoff;
 	IndexScanDesc scan;
 	SnapshotData snap;
 	TransactionId xwait;
 	Relation	idxrel;
 	bool		found;
+	TypeCacheEntry **eq = NULL;
+	bool		idxIsRelationIdentityOrPK;
 
 	/* Open the index. */
 	idxrel = index_open(idxoid, RowExclusiveLock);
 
-	/* Start an index scan. */
+	idxIsRelationIdentityOrPK = IdxIsRelationIdentityOrPK(rel, idxoid);
+
 	InitDirtySnapshot(snap);
-	scan = index_beginscan(rel, idxrel, &snap,
-						   IndexRelationGetNumberOfKeyAttributes(idxrel),
-						   0);
 
 	/* Build scan key. */
-	build_replindex_scan_key(skey, rel, idxrel, searchslot);
+	skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
+
+	/* Start an index scan. */
+	scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
 
 retry:
 	found = false;
 
-	index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
+	index_rescan(scan, skey, skey_attoff, NULL, 0);
 
 	/* Try to find the tuple */
-	if (index_getnext_slot(scan, ForwardScanDirection, outslot))
+	while (index_getnext_slot(scan, ForwardScanDirection, outslot))
 	{
-		found = true;
+		/*
+		 * Avoid expensive equality check if the index is primary key or
+		 * replica identity index.
+		 */
+		if (!idxIsRelationIdentityOrPK)
+		{
+			if (eq == NULL)
+			{
+#ifdef USE_ASSERT_CHECKING
+				/* apply assertions only once for the input idxoid */
+				IndexInfo  *indexInfo = BuildIndexInfo(idxrel);
+				Assert(IsIndexUsableForReplicaIdentityFull(indexInfo));
+#endif
+
+				/*
+				 * We only need to allocate once. This is allocated within per
+				 * tuple context -- ApplyMessageContext -- hence no need to
+				 * explicitly pfree().
+				 */
+				eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
+			}
+
+			if (!tuples_equal(outslot, searchslot, eq))
+				continue;
+		}
+
 		ExecMaterializeSlot(outslot);
 
 		xwait = TransactionIdIsValid(snap.xmin) ?
@@ -164,6 +212,10 @@ retry:
 			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
 			goto retry;
 		}
+
+		/* Found our tuple and it's not locked */
+		found = true;
+		break;
 	}
 
 	/* Found tuple, try to lock it in the lockmode. */
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 55bfa07871..ba8655d608 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -17,11 +17,14 @@
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/table.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_am_d.h"
 #include "catalog/pg_subscription_rel.h"
 #include "executor/executor.h"
 #include "nodes/makefuncs.h"
+#include "optimizer/cost.h"
 #include "replication/logicalrelation.h"
 #include "replication/worker_internal.h"
 #include "utils/inval.h"
@@ -50,6 +53,9 @@ typedef struct LogicalRepPartMapEntry
 	LogicalRepRelMapEntry relmapentry;
 } LogicalRepPartMapEntry;
 
+static Oid	FindLogicalRepUsableIndex(Relation localrel,
+									  LogicalRepRelation *remoterel);
+
 /*
  * Relcache invalidation callback for our relation map cache.
  */
@@ -439,6 +445,14 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 		 */
 		logicalrep_rel_mark_updatable(entry);
 
+		/*
+		 * Finding a usable index is an infrequent task. It occurs when an
+		 * operation is first performed on the relation, or after invalidation
+		 * of the relation cache entry (such as ANALYZE or CREATE/DROP index
+		 * on the relation).
+		 */
+		entry->usableIndexOid = FindLogicalRepUsableIndex(entry->localrel, remoterel);
+
 		entry->localrelvalid = true;
 	}
 
@@ -702,5 +716,185 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 	/* state and statelsn are left set to 0. */
 	MemoryContextSwitchTo(oldctx);
 
+	/*
+	 * Finding a usable index is an infrequent task. It occurs when an
+	 * operation is first performed on the relation, or after invalidation of
+	 * the relation cache entry (such as ANALYZE or CREATE/DROP index on the
+	 * relation).
+	 *
+	 * We also prefer to run this code on the oldctx such that we do not
+	 * leak anything in the LogicalRepPartMapContext (hence
+	 * CacheMemoryContext).
+	 */
+	entry->usableIndexOid = FindLogicalRepUsableIndex(partrel, remoterel);
+
 	return entry;
 }
+
+/*
+ * Returns true if the given index consists only of expressions such as:
+ * 	CREATE INDEX idx ON table(foo(col));
+ *
+ * Returns false even if there is one column reference:
+ * 	 CREATE INDEX idx ON table(foo(col), col_2);
+ */
+static bool
+IsIndexOnlyOnExpression(IndexInfo *indexInfo)
+{
+	for (int i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
+	{
+		AttrNumber	attnum = indexInfo->ii_IndexAttrNumbers[i];
+
+		if (AttributeNumberIsValid(attnum))
+			return false;
+	}
+
+	return true;
+}
+
+/*
+ * Returns the oid of an index that can be used via the apply worker. The index
+ * should be btree, non-partial and have at least one column reference (i.e.
+ * cannot consist of only expressions). The limitations arise from
+ * RelationFindReplTupleByIndex(), which is designed to handle PK/RI.
+ *
+ * Note that the limitations of index scans for replica identity full only
+ * adheres a subset of the limitations of PK/RI. For example, we support
+ * columns that are not marked as [NOT NULL] or we are not interested in
+ * [NOT DEFERRABLE] aspect of constraints here.
+ *
+ * If no suitable index is found, returns InvalidOid.
+ *
+ * XXX: There are no fundamental problems for supporting non-btree indexes. We
+ * should mostly relax the limitations in RelationFindReplTupleByIndex(). For
+ * partial indexes, the required changes likely to be larger. If none of the
+ * tuples satisfy the expression for the index scan, we should fall-back to
+ * sequential execution, which might not be a good idea in some cases.
+ *
+ * Note that this is not a generic function, it expects REPLICA IDENTITY FULL
+ * for the remote relation.
+ */
+static Oid
+FindUsableIndexForReplicaIdentityFull(Relation localrel)
+{
+	List	   *indexlist = RelationGetIndexList(localrel);
+	Oid			usableIndex = InvalidOid;
+	ListCell   *lc;
+
+	foreach(lc, indexlist)
+	{
+		Oid			idxoid = lfirst_oid(lc);
+		Relation	indexRelation = index_open(idxoid, AccessShareLock);
+		IndexInfo  *indexInfo = BuildIndexInfo(indexRelation);
+
+		bool		isUsableIndex =
+			IsIndexUsableForReplicaIdentityFull(indexInfo);
+
+		index_close(indexRelation, AccessShareLock);
+
+		if (isUsableIndex)
+		{
+			/* we found one eligible index, don't need to continue */
+			usableIndex = idxoid;
+			break;
+		}
+	}
+
+	return usableIndex;
+}
+
+/*
+ * Returns true if the index is usable for replica identity full. For details,
+ * see FindUsableIndexForReplicaIdentityFull.
+ */
+bool
+IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo)
+{
+	bool		is_btree = (indexInfo->ii_Am == BTREE_AM_OID);
+	bool		is_partial = (indexInfo->ii_Predicate != NIL);
+	bool		is_only_on_expression = IsIndexOnlyOnExpression(indexInfo);
+
+	if (is_btree && !is_partial && !is_only_on_expression)
+	{
+		return true;
+	}
+
+	return false;
+}
+
+/*
+ * Get replica identity index or if it is not defined a primary key.
+ *
+ * If neither is defined, returns InvalidOid
+ */
+Oid
+GetRelationIdentityOrPK(Relation rel)
+{
+	Oid			idxoid;
+
+	idxoid = RelationGetReplicaIndex(rel);
+
+	if (!OidIsValid(idxoid))
+		idxoid = RelationGetPrimaryKeyIndex(rel);
+
+	return idxoid;
+}
+
+/*
+ * Given a relation and OID of an index, returns true if the index is relation's
+ * replica identity index or relation's primary key's index.
+ *
+ * Returns false otherwise.
+ */
+bool
+IdxIsRelationIdentityOrPK(Relation rel, Oid idxoid)
+{
+	Assert(OidIsValid(idxoid));
+
+	return GetRelationIdentityOrPK(rel) == idxoid;
+}
+
+/*
+ * Returns the index oid if we can use an index for subscriber. Otherwise,
+ * returns InvalidOid.
+ */
+static Oid
+FindLogicalRepUsableIndex(Relation localrel, LogicalRepRelation *remoterel)
+{
+	Oid			idxoid;
+
+	/*
+	 * We never need index oid for partitioned tables, always rely on leaf
+	 * partition's index.
+	 */
+	if (localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		return InvalidOid;
+
+	/*
+	 * Simple case, we already have a primary key or a replica identity index.
+	 */
+	idxoid = GetRelationIdentityOrPK(localrel);
+	if (OidIsValid(idxoid))
+		return idxoid;
+
+	if (RelationReplicaIdentityFullIndexScanEnabled(localrel) &&
+		remoterel->replident == REPLICA_IDENTITY_FULL)
+	{
+		/*
+		 * If we had a primary key or relation identity with a unique index,
+		 * we would have already found and returned that oid. At this point,
+		 * the remote relation has replica identity full.
+		 *
+		 * We are looking for one more opportunity for using an index. If
+		 * there are any indexes defined on the local relation, try to pick a
+		 * suitable index.
+		 *
+		 * The index selection safely assumes that all the columns are going
+		 * to be available for the index scan given that remote relation has
+		 * replica identity full.
+		 */
+		return FindUsableIndexForReplicaIdentityFull(localrel);
+	}
+
+	return InvalidOid;
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index cfb2ab6248..e9569e59d3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -397,6 +397,8 @@ static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
 										 TupleTableSlot *remoteslot);
+static Oid	get_usable_indexoid(ApplyExecutionData *edata,
+								ResultRelInfo *relinfo);
 static void apply_handle_update_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
 										 TupleTableSlot *remoteslot,
@@ -406,6 +408,7 @@ static void apply_handle_delete_internal(ApplyExecutionData *edata,
 										 TupleTableSlot *remoteslot);
 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
 									LogicalRepRelation *remoterel,
+									Oid localidxoid,
 									TupleTableSlot *remoteslot,
 									TupleTableSlot **localslot);
 static void apply_handle_tuple_routing(ApplyExecutionData *edata,
@@ -2348,24 +2351,6 @@ apply_handle_type(StringInfo s)
 	logicalrep_read_typ(s, &typ);
 }
 
-/*
- * Get replica identity index or if it is not defined a primary key.
- *
- * If neither is defined, returns InvalidOid
- */
-static Oid
-GetRelationIdentityOrPK(Relation rel)
-{
-	Oid			idxoid;
-
-	idxoid = RelationGetReplicaIndex(rel);
-
-	if (!OidIsValid(idxoid))
-		idxoid = RelationGetPrimaryKeyIndex(rel);
-
-	return idxoid;
-}
-
 /*
  * Check that we (the subscription owner) have sufficient privileges on the
  * target relation to perform the given operation.
@@ -2655,12 +2640,14 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 	TupleTableSlot *localslot;
 	bool		found;
 	MemoryContext oldctx;
+	Oid			usableIndexOid = get_usable_indexoid(edata, relinfo);
 
 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
 	ExecOpenIndices(relinfo, false);
 
 	found = FindReplTupleInLocalRel(estate, localrel,
 									&relmapentry->remoterel,
+									usableIndexOid,
 									remoteslot, &localslot);
 	ExecClearTuple(remoteslot);
 
@@ -2793,11 +2780,12 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 	EPQState	epqstate;
 	TupleTableSlot *localslot;
 	bool		found;
+	Oid			usableIndexOid = get_usable_indexoid(edata, relinfo);
 
 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
 	ExecOpenIndices(relinfo, false);
 
-	found = FindReplTupleInLocalRel(estate, localrel, remoterel,
+	found = FindReplTupleInLocalRel(estate, localrel, remoterel, usableIndexOid,
 									remoteslot, &localslot);
 
 	/* If found delete it. */
@@ -2828,20 +2816,50 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 	EvalPlanQualEnd(&epqstate);
 }
 
+/*
+ * Decide whether we can pick an index for the relinfo (i.e the relation) we're
+ * actually deleting/updating from. If it is a child partition of
+ * edata->targetRelInfo, find the index on the partition.
+ *
+ * Note that if the corresponding relmapentry has invalid usableIndexOid, the
+ * function returns InvalidOid.
+ */
+static Oid
+get_usable_indexoid(ApplyExecutionData *edata, ResultRelInfo *relinfo)
+{
+	ResultRelInfo *targetResultRelInfo = edata->targetRelInfo;
+	LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+
+	char		targetrelkind = targetResultRelInfo->ri_RelationDesc->rd_rel->relkind;
+
+	if (targetrelkind == RELKIND_PARTITIONED_TABLE)
+	{
+		/* Target is a partitioned table, so find relmapentry of the partition */
+		TupleConversionMap *map = ExecGetRootToChildMap(relinfo, edata->estate);
+		AttrMap    *attrmap = map ? map->attrMap : NULL;
+
+		relmapentry =
+			logicalrep_partition_open(relmapentry, relinfo->ri_RelationDesc,
+									  attrmap);
+	}
+
+	return relmapentry->usableIndexOid;
+}
+
 /*
  * Try to find a tuple received from the publication side (in 'remoteslot') in
  * the corresponding local relation using either replica identity index,
- * primary key or if needed, sequential scan.
+ * primary key, index or if needed, sequential scan.
  *
  * Local tuple, if found, is returned in '*localslot'.
  */
 static bool
 FindReplTupleInLocalRel(EState *estate, Relation localrel,
 						LogicalRepRelation *remoterel,
+						Oid localidxoid,
 						TupleTableSlot *remoteslot,
 						TupleTableSlot **localslot)
 {
-	Oid			idxoid;
 	bool		found;
 
 	/*
@@ -2852,12 +2870,11 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
 
 	*localslot = table_slot_create(localrel, &estate->es_tupleTable);
 
-	idxoid = GetRelationIdentityOrPK(localrel);
-	Assert(OidIsValid(idxoid) ||
+	Assert(OidIsValid(localidxoid) ||
 		   (remoterel->replident == REPLICA_IDENTITY_FULL));
 
-	if (OidIsValid(idxoid))
-		found = RelationFindReplTupleByIndex(localrel, idxoid,
+	if (OidIsValid(localidxoid))
+		found = RelationFindReplTupleByIndex(localrel, localidxoid,
 											 LockTupleExclusive,
 											 remoteslot, *localslot);
 	else
@@ -2978,6 +2995,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 				/* Get the matching local tuple from the partition. */
 				found = FindReplTupleInLocalRel(estate, partrel,
 												&part_entry->remoterel,
+												part_entry->usableIndexOid,
 												remoteslot_part, &localslot);
 				if (!found)
 				{
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 9c34054bb7..55cbc4580a 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -13,6 +13,7 @@
 #define LOGICALRELATION_H
 
 #include "access/attmap.h"
+#include "catalog/index.h"
 #include "replication/logicalproto.h"
 
 typedef struct LogicalRepRelMapEntry
@@ -31,6 +32,7 @@ typedef struct LogicalRepRelMapEntry
 	Relation	localrel;		/* relcache entry (NULL when closed) */
 	AttrMap    *attrmap;		/* map of local attributes to remote ones */
 	bool		updatable;		/* Can apply updates/deletes? */
+	Oid			usableIndexOid; /* which index to use, or InvalidOid if none */
 
 	/* Sync state. */
 	char		state;
@@ -46,5 +48,8 @@ extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *r
 														Relation partrel, AttrMap *map);
 extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 								 LOCKMODE lockmode);
+extern bool IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo);
+extern Oid GetRelationIdentityOrPK(Relation rel);
+extern bool IdxIsRelationIdentityOrPK(Relation rel, Oid idxoid);
 
 #endif							/* LOGICALRELATION_H */
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index 3db0fdfd96..f85bf92b6f 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -38,6 +38,7 @@ tests += {
       't/029_on_error.pl',
       't/030_origin.pl',
       't/031_column_list.pl',
+      't/032_subscribe_use_index.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/032_subscribe_use_index.pl b/src/test/subscription/t/032_subscribe_use_index.pl
new file mode 100644
index 0000000000..e8c75ea9a1
--- /dev/null
+++ b/src/test/subscription/t/032_subscribe_use_index.pl
@@ -0,0 +1,933 @@
+# Copyright (c) 2022-2023, PostgreSQL Global Development Group
+
+# Test logical replication behavior with subscriber uses available index
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	"wal_retrieve_retry_interval = 1ms");
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+my $appname           = 'tap_sub';
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX
+#
+# Basic test where the subscriber uses index
+# and only updates 1 row and deletes
+# 1 other row
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i FROM generate_series(0,21)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates one row via index";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test_replica_id_full WHERE x = 20;");
+$node_publisher->wait_for_catchup($appname);
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 2) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full deletes one row via index";
+
+# make sure that the subscriber has the correct data
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(DISTINCT x) FROM test_replica_id_full");
+is($result, qq(20), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION CREATE/DROP INDEX WORKS WITHOUT ISSUES
+#
+# This test ensures that after CREATE INDEX, the subscriber can automatically
+# use one of the indexes (provided that it fulfils the requirements).
+# Similarly, after DROP index, the subscriber can automatically switch to
+# sequential scan
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int NOT NULL, y int)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int NOT NULL, y int)");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i, i FROM generate_series(0,2100)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# now, create index and see that the index is used
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+
+# wait until the index is created
+$node_subscriber->poll_query_until(
+	'postgres', q{select count(*)=1 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for creating index test_replica_id_full_idx";
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates one row via index";
+
+
+# now, create index on column y as well
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idy ON test_replica_id_full(y)");
+
+# wait until the index is created
+$node_subscriber->poll_query_until(
+	'postgres', q{select count(*)=1 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idy';}
+) or die "Timed out while waiting for creating index test_replica_id_full_idy";
+
+# now, the update could either use the test_replica_id_full_idy or test_replica_id_full_idy index
+# it is not possible for user to control which index to use
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET y = y + 1 WHERE y = 3000;");
+$node_publisher->wait_for_catchup($appname);
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select count(idx_scan) = 2 from pg_stat_all_indexes where indexrelname IN ('test_replica_id_full_idy', 'test_replica_id_full_idx');}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full deletes one row via index";
+
+# let's also test dropping test_replica_id_full_idy and
+# hence use test_replica_id_full_idx
+$node_subscriber->safe_psql('postgres',
+	"DROP INDEX test_replica_id_full_idy;");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 25;");
+$node_publisher->wait_for_catchup($appname);
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM test_replica_id_full WHERE x = 15 OR x = 25 OR y = 3000;");
+is($result, qq(0), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION CREATE/DROP INDEX WORKS WITHOUT ISSUES
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX UPDATEs MULTIPLE ROWS
+#
+# Basic test where the subscriber uses index
+# and updates 50 rows
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+
+# insert some initial data within the range 0-19
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i%20 FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# updates 50 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 50) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates 50 rows via index";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full where x = 15;");
+is($result, qq(0), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX UPDATEs MULTIPLE ROWS
+# ====================================================================
+
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX WITH MULTIPLE COLUMNS
+#
+# Basic test where the subscriber uses index
+# and deletes 200 rows
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y text)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y text)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y)");
+
+# insert some initial data within the range 0-9
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT (i%10), (i%10)::text FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# deletes 200 rows
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test_replica_id_full WHERE x IN (5, 6);");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 200) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full deletes 200 rows via index";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full where x in (5, 6);");
+is($result, qq(0), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX WITH MULTIPLE COLUMNS
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX WITH DROPPED COLUMNS
+#
+# Basic test where the subscriber uses index
+# and updates multiple rows with a table that has
+# dropped columns
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (drop_1 jsonb, x int, drop_2 point, y text, drop_3 timestamptz)"
+);
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_1");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_2");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_3");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (drop_1 jsonb, x int, drop_2 point, y text, drop_3 timestamptz)"
+);
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_1");
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_2");
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_3");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y)");
+
+# insert some initial data within the range 0-9
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT (i%10), (i%10)::text FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# updates 200 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x IN (5, 6);");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 200) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates 200 rows via index";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(x+y::int) from test_replica_id_full;");
+is($result, qq(9200), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX WITH DROPPED COLUMNS
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX ON PARTITIONED TABLES
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_0 REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_1 REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX users_table_part_idx ON users_table_part(user_id, value_1)"
+);
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO users_table_part SELECT (i%100), (i%20), i FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE users_table_part");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# updates rows and moves between partitions
+$node_publisher->safe_psql('postgres',
+	"UPDATE users_table_part SET value_1 = 0 WHERE user_id = 4;");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=10 from pg_stat_all_indexes where indexrelname ilike 'users_table_part_%';}
+) or die "Timed out while waiting for updates on partitioned table with index";
+
+# deletes rows from different partitions
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE user_id = 1 and value_1 = 1;");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE user_id = 12 and value_1 = 12;");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=30 from pg_stat_all_indexes where indexrelname ilike 'users_table_part_%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates partitioned table";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(user_id+value_1+value_2) from users_table_part;");
+is($result, qq(550070), 'ensure subscriber has the correct data at the end of the test');
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(DISTINCT(user_id,value_1, value_2)) from users_table_part;");
+is($result, qq(981), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# Testcase end: SUBSCRIPTION USES INDEX ON PARTITIONED TABLES
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION DOES NOT USE PARTIAL INDEX
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full_part_index (x int);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full_part_index REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full_part_index (x int);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_part_idx ON test_replica_id_full_part_index(x) WHERE (x = 5);");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full_part_index SELECT i FROM generate_series(0,21)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full_part_index");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# update 2 rows, one of them is indexed
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full_part_index SET x = x + 1 WHERE x = 5;");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full_part_index SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# make sure that the index is not used
+$result = $node_subscriber->safe_psql('postgres',
+	"select idx_scan from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_part_idx'");
+is($result, qq(0), 'ensure subscriber tap_sub_rep_full updates one row via seq. scan with with partial index');
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM test_replica_id_full_part_index;");
+is($result, qq(22), 'ensure subscriber has the correct data at the end of the test');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(DISTINCT x) FROM test_replica_id_full_part_index;");
+is($result, qq(20), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full_part_index");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full_part_index");
+
+# Testcase end: SUBSCRIPTION DOES NOT USE PARTIAL INDEX
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION DOES NOT USE INDEXES WITH ONLY EXPRESSIONS
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE people REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX people_names ON people ((firstname || ' ' || lastname));");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO people SELECT 'first_name_' || i::text, 'last_name_' || i::text FROM generate_series(0,200)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE people");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# update 2 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_1';");
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_2' AND lastname = 'last_name_2';");
+
+# make sure the index is not used on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"select idx_scan from pg_stat_all_indexes where indexrelname = 'people_names'");
+is($result, qq(0), 'ensure subscriber tap_sub_rep_full updates two rows via seq. scan with index on expressions');
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'first_name_3';");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'first_name_4' AND lastname = 'last_name_4';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# make sure the index is not used on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"select idx_scan from pg_stat_all_indexes where indexrelname = 'people_names'");
+is($result, qq(0), 'ensure subscriber tap_sub_rep_full updates two rows via seq. scan with index on expressions');
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM people;");
+is($result, qq(199), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE people");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE people");
+
+# Testcase end: SUBSCRIPTION DOES NOT USE INDEXES WITH ONLY EXPRESSIONS
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION CAN USE INDEXES WITH EXPRESSIONS AND COLUMNS
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE people REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX people_names ON people (firstname, lastname, (firstname || ' ' || lastname));");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO people SELECT 'first_name_' || i::text, 'last_name_' || i::text FROM generate_series(0, 200)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE people");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# update 2 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_1';");
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_3' AND lastname = 'last_name_3';");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'people_names';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates two rows via index scan with index on expressions and columns";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'Nan';");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=4 from pg_stat_all_indexes where indexrelname = 'people_names';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full deletes two rows via index scan with index on expressions and columns";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM people;");
+is($result, qq(199), 'ensure subscriber has the correct data at the end of the test');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM people WHERE firstname = 'NaN';");
+is($result, qq(0), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE people");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE people");
+
+# Testcase end: SUBSCRIPTION CAN USE INDEXES WITH EXPRESSIONS AND COLUMNS
+# ====================================================================
+
+# ====================================================================
+# Testcase start: Some NULL values
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y int);"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y);"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full VALUES (1), (2), (3);");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 1;");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 3;");
+
+# check if the index is used even when the index has NULL values
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates test_replica_id_full table";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(x) from test_replica_id_full WHERE y IS NULL;");
+is($result, qq(8), 'ensure subscriber has the correct data at the end of the test');
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full WHERE y IS NULL;");
+is($result, qq(3), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: Some NULL values
+# ====================================================================
+
+# ====================================================================
+# Testcase start: Unique index that is not primary key or replica identity
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y int);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y int);"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE UNIQUE INDEX test_replica_id_full_unique_idx ON test_replica_id_full(x);"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full (x, y) VALUES (NULL, 1), (NULL, 2), (NULL, 3);");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = 1 WHERE y = 2;");
+
+# check if the index is used even when the index has NULL values
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=1 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_unique_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates test_replica_id_full table";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(y) from test_replica_id_full;");
+is($result, qq(6), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: Unique index that is not primary key or replica identity
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION BEHAVIOR WITH ENABLE_INDEXSCAN
+#
+# Even if enable_indexscan = false, we do use the primary keys, this
+# is the legacy behavior. However, we do not use non-primary/non replica
+# identity columns.
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int NOT NULL)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int NOT NULL)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+$node_subscriber->safe_psql('postgres',
+	"ALTER SYSTEM SET enable_indexscan TO off;");
+$node_subscriber->safe_psql('postgres',
+	"SELECT pg_reload_conf();");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i FROM generate_series(0,21)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 10000 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# show that index is not used when enable_indexscan=false
+$result = $node_subscriber->safe_psql('postgres',
+	"select idx_scan from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx'");
+is($result, qq(0), 'ensure subscriber has not used index with enable_indexscan=false');
+
+# we are done with this index, drop to simplify the tests
+$node_subscriber->safe_psql('postgres',
+	"DROP INDEX test_replica_id_full_idx");
+
+# now, create a unique index and set the replica
+$node_publisher->safe_psql('postgres',
+	"CREATE UNIQUE INDEX test_replica_id_full_unique ON test_replica_id_full(x);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE UNIQUE INDEX test_replica_id_full_unique ON test_replica_id_full(x);");
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY USING INDEX test_replica_id_full_unique;");
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY USING INDEX test_replica_id_full_unique;");
+
+# wait for the synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 10000 WHERE x = 14;");
+$node_publisher->wait_for_catchup($appname);
+
+# show that the unique index on replica identity is used even when enable_indexscan=false
+# this is a legacy behavior
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan=1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_unique'}
+) or die "Timed out while waiting ensuring subscriber used unique index as replica identity even with enable_indexscan=false";
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM test_replica_id_full WHERE x IN (14,15)");
+is($result, qq(0), 'ensure the results are accurate even with enable_indexscan=false');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+$node_subscriber->safe_psql('postgres',
+	"ALTER SYSTEM RESET enable_indexscan;");
+$node_subscriber->safe_psql('postgres',
+	"SELECT pg_reload_conf();");
+
+# Testcase end: SUBSCRIPTION BEHAVIOR WITH ENABLE_INDEXSCAN
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX WITH PUB/SUB different data
+#
+# The subscriber has duplicate tuples that publisher does not have.
+# When publsher updates/deletes 1 row, subscriber uses indexes and
+# updates/deletes exactly 1 row.
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i FROM generate_series(0,21)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+
+# duplicate the data in subscriber
+$node_subscriber->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i FROM generate_series(0,21)i;");
+$node_publisher->wait_for_catchup($appname);
+
+# now, we update only 1 row on the publisher and expect
+# the subscriber to only update 1 row
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates one row via index";
+
+# similarly for delete
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test_replica_id_full WHERE x = 20;");
+$node_publisher->wait_for_catchup($appname);
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 2) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full deletes one row via index";
+
+# make sure that the subscriber has the correct data
+# we only deleted 1 row
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM test_replica_id_full");
+is($result, qq(43), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX WITH PUB/SUB different data
+# ====================================================================
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
+
+done_testing();
-- 
2.34.1

Reply via email to