Hello, Amit!

> Now, I
> would like to know the opinion of others who were involved in the
> initial commit, so added Peter E. to see what he thinks of the same.

Peter answered in [0]:
> I don’t remember. I was just the committer.

I’ve attached a new version of the proposed solution.
The first commit includes tests, some README updates, and an
additional pgbench test that reproduces the issue without explicitly
simulating the wait/resume race. This last test is heavy and isn't
intended to be committed.

Instead of adding extra locking in btree, a more lightweight approach
is used: since we already call GetLatestSnapshot before
table_tuple_lock, we can simply call it before each scan attempt and
use that snapshot for the scan.

As a result:
* MVCC scan will not miss updated tuples, while DirtyScan may
* in both cases, table_tuple_lock will wait for the updating
transaction to commit before retrying
* MVCC scan cannot see not-yet-committed new rows, while DirtyScan
can. However, this does not provide any stronger guarantee: in the
case of INSERT vs INSERT, two parallel inserts are still possible.
DirtyScan only slightly reduces the probability, but if the scan does
not find the row, there is still no guarantee that it won’t be
inserted immediately afterward.

Therefore, the MVCC version appears to provide the same guarantees,
without missing tuples, and with the same performance.

Best regards,
Mikhail.

[0]:
https://discord.com/channels/1258108670710124574/1407753138991009913/1411303541900841090
From 60dca743bf755b068ccdff4cb2f35467167f592a Mon Sep 17 00:00:00 2001
From: nkey <[email protected]>
Date: Wed, 3 Sep 2025 19:08:55 +0200
Subject: [PATCH v11 2/2] Fix logical replication conflict detection during
 tuple lookup

SNAPSHOT_DIRTY scans could miss conflict detection with concurrent transactions during logical replication.
Replace SNAPSHOT_DIRTY scan with the GetLatestSnapshot in RelationFindReplTupleByIndex and RelationFindReplTupleSeq.
---
 src/backend/executor/execReplication.c | 63 ++++++++------------------
 1 file changed, 18 insertions(+), 45 deletions(-)

diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index b409d4ecbf5..0de40aec733 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -186,8 +186,6 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 	ScanKeyData skey[INDEX_MAX_KEYS];
 	int			skey_attoff;
 	IndexScanDesc scan;
-	SnapshotData snap;
-	TransactionId xwait;
 	Relation	idxrel;
 	bool		found;
 	TypeCacheEntry **eq = NULL;
@@ -198,17 +196,17 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 
 	isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
 
-	InitDirtySnapshot(snap);
-
 	/* Build scan key. */
 	skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
 
-	/* Start an index scan. */
-	scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
+	/* Start an index scan. SnapshotAny will be replaced below. */
+	scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);
 
 retry:
 	found = false;
-
+	PushActiveSnapshot(GetLatestSnapshot());
+	/* Update the actual scan snapshot each retry */
+	scan->xs_snapshot = GetActiveSnapshot();
 	index_rescan(scan, skey, skey_attoff, NULL, 0);
 
 	/* Try to find the tuple */
@@ -229,19 +227,6 @@ retry:
 
 		ExecMaterializeSlot(outslot);
 
-		xwait = TransactionIdIsValid(snap.xmin) ?
-			snap.xmin : snap.xmax;
-
-		/*
-		 * If the tuple is locked, wait for locking transaction to finish and
-		 * retry.
-		 */
-		if (TransactionIdIsValid(xwait))
-		{
-			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
-			goto retry;
-		}
-
 		/* Found our tuple and it's not locked */
 		found = true;
 		break;
@@ -253,8 +238,6 @@ retry:
 		TM_FailureData tmfd;
 		TM_Result	res;
 
-		PushActiveSnapshot(GetLatestSnapshot());
-
 		res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
 							   outslot,
 							   GetCurrentCommandId(false),
@@ -263,13 +246,15 @@ retry:
 							   0 /* don't follow updates */ ,
 							   &tmfd);
 
-		PopActiveSnapshot();
-
 		if (should_refetch_tuple(res, &tmfd))
+		{
+			PopActiveSnapshot();
 			goto retry;
+		}
 	}
 
 	index_endscan(scan);
+	PopActiveSnapshot();
 
 	/* Don't release lock until commit. */
 	index_close(idxrel, NoLock);
@@ -370,9 +355,7 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 {
 	TupleTableSlot *scanslot;
 	TableScanDesc scan;
-	SnapshotData snap;
 	TypeCacheEntry **eq;
-	TransactionId xwait;
 	bool		found;
 	TupleDesc	desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
 
@@ -380,13 +363,15 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 
 	eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
 
-	/* Start a heap scan. */
-	InitDirtySnapshot(snap);
-	scan = table_beginscan(rel, &snap, 0, NULL);
+	/* Start a heap scan. SnapshotAny will be replaced below. */
+	scan = table_beginscan(rel, SnapshotAny, 0, NULL);
 	scanslot = table_slot_create(rel, NULL);
 
 retry:
 	found = false;
+	PushActiveSnapshot(GetLatestSnapshot());
+	/* Update the actual scan snapshot each retry */
+	scan->rs_snapshot = GetActiveSnapshot();
 
 	table_rescan(scan, NULL);
 
@@ -399,19 +384,6 @@ retry:
 		found = true;
 		ExecCopySlot(outslot, scanslot);
 
-		xwait = TransactionIdIsValid(snap.xmin) ?
-			snap.xmin : snap.xmax;
-
-		/*
-		 * If the tuple is locked, wait for locking transaction to finish and
-		 * retry.
-		 */
-		if (TransactionIdIsValid(xwait))
-		{
-			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
-			goto retry;
-		}
-
 		/* Found our tuple and it's not locked */
 		break;
 	}
@@ -422,8 +394,6 @@ retry:
 		TM_FailureData tmfd;
 		TM_Result	res;
 
-		PushActiveSnapshot(GetLatestSnapshot());
-
 		res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
 							   outslot,
 							   GetCurrentCommandId(false),
@@ -432,13 +402,16 @@ retry:
 							   0 /* don't follow updates */ ,
 							   &tmfd);
 
-		PopActiveSnapshot();
 
 		if (should_refetch_tuple(res, &tmfd))
+		{
+			PopActiveSnapshot();
 			goto retry;
+		}
 	}
 
 	table_endscan(scan);
+	PopActiveSnapshot();
 	ExecDropSingleTupleTableSlot(scanslot);
 
 	return found;
-- 
2.48.1

From e73046a0da7213332a3701123e040de1fd3f2f54 Mon Sep 17 00:00:00 2001
From: nkey <[email protected]>
Date: Sat, 23 Nov 2024 13:25:11 +0100
Subject: [PATCH v11 1/2] This patch introduces new injection points and TAP
 tests to reproduce and verify conflict detection issues that arise during
 SNAPSHOT_DIRTY index scans in logical replication.

---
 src/backend/access/index/indexam.c            |   9 ++
 src/backend/access/nbtree/README              |   9 ++
 src/backend/executor/execIndexing.c           |   7 +-
 src/backend/replication/logical/worker.c      |   4 +
 src/include/utils/snapshot.h                  |  14 ++
 src/test/subscription/Makefile                |   1 +
 src/test/subscription/meson.build             |   9 +-
 .../subscription/t/036_delete_missing_race.pl | 137 +++++++++++++++++
 .../subscription/t/037_update_missing_race.pl | 139 +++++++++++++++++
 .../t/038_update_missing_with_retain.pl       | 141 ++++++++++++++++++
 .../t/039_update_missing_simulation.pl        | 123 +++++++++++++++
 11 files changed, 591 insertions(+), 2 deletions(-)
 create mode 100644 src/test/subscription/t/036_delete_missing_race.pl
 create mode 100644 src/test/subscription/t/037_update_missing_race.pl
 create mode 100644 src/test/subscription/t/038_update_missing_with_retain.pl
 create mode 100644 src/test/subscription/t/039_update_missing_simulation.pl

diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index 86d11f4ec79..a503fa02ac5 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -52,11 +52,13 @@
 #include "catalog/pg_type.h"
 #include "nodes/execnodes.h"
 #include "pgstat.h"
+#include "replication/logicalworker.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "utils/ruleutils.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
+#include "utils/injection_point.h"
 
 
 /* ----------------------------------------------------------------
@@ -751,6 +753,13 @@ index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *
 		 * the index.
 		 */
 		Assert(ItemPointerIsValid(&scan->xs_heaptid));
+#ifdef USE_INJECTION_POINTS
+		if (!IsCatalogRelation(scan->heapRelation) && IsLogicalWorker())
+		{
+			INJECTION_POINT("index_getnext_slot_before_fetch_apply_dirty", NULL);
+		}
+#endif
+
 		if (index_fetch_heap(scan, slot))
 			return true;
 	}
diff --git a/src/backend/access/nbtree/README b/src/backend/access/nbtree/README
index 53d4a61dc3f..634a3d10bb1 100644
--- a/src/backend/access/nbtree/README
+++ b/src/backend/access/nbtree/README
@@ -103,6 +103,15 @@ We also remember the left-link, and follow it when the scan moves backwards
 (though this requires extra handling to account for concurrent splits of
 the left sibling; see detailed move-left algorithm below).
 
+Despite the described mechanics in place, inconsistent results may still occur
+during non-MVCC scans (SnapshotDirty and SnapshotSelf). This issue can occur if a 
+concurrent transaction deletes a tuple and inserts a new tuple with a new TID in the 
+same page or to the left/right (depending on scan direction) of current scan position.
+If the scan has already visited the page and cached its content in the
+backend-local storage, it might skip the old tuple due to deletion and miss the new 
+tuple because the scan does not re-read the page. Note it affects not only btree
+scan but also a heap scan.
+
 In most cases we release our lock and pin on a page before attempting
 to acquire pin and lock on the page we are moving to.  In a few places
 it is necessary to lock the next page before releasing the current one.
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c
index ca33a854278..61a5097f789 100644
--- a/src/backend/executor/execIndexing.c
+++ b/src/backend/executor/execIndexing.c
@@ -117,6 +117,7 @@
 #include "utils/multirangetypes.h"
 #include "utils/rangetypes.h"
 #include "utils/snapmgr.h"
+#include "utils/injection_point.h"
 
 /* waitMode argument to check_exclusion_or_unique_constraint() */
 typedef enum
@@ -780,7 +781,9 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 	/*
 	 * Search the tuples that are in the index for any violations, including
 	 * tuples that aren't visible yet.
-	 */
+	 * Snapshot dirty may miss some tuples in the case of parallel updates,
+	 * but it is acceptable here.
+	*/
 	InitDirtySnapshot(DirtySnapshot);
 
 	for (i = 0; i < indnkeyatts; i++)
@@ -943,6 +946,8 @@ retry:
 
 	ExecDropSingleTupleTableSlot(existing_slot);
 
+	if (!conflict)
+		INJECTION_POINT("check_exclusion_or_unique_constraint_no_conflict", NULL);
 	return !conflict;
 }
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 22ad9051db3..bb3aaf21d65 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -270,6 +270,7 @@
 #include "utils/acl.h"
 #include "utils/dynahash.h"
 #include "utils/guc.h"
+#include "utils/injection_point.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -2932,7 +2933,10 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 			conflicttuple.origin != replorigin_session_origin)
 			type = CT_UPDATE_DELETED;
 		else
+		{
+			INJECTION_POINT("apply_handle_update_internal_update_missing", NULL);
 			type = CT_UPDATE_MISSING;
+		}
 
 		/* Store the new tuple for conflict reporting */
 		slot_store_data(newslot, relmapentry, newtup);
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 0e546ec1497..189dfd71103 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -53,6 +53,13 @@ typedef enum SnapshotType
 	 * - previous commands of this transaction
 	 * - changes made by the current command
 	 *
+	 * Note: such a snapshot may miss an existing logical tuple in case of
+	 * parallel update.
+	 * If a new version of a tuple is inserted into an already processed page
+	 * but the old one marked with committed xmax - snapshot will skip the old
+	 * one and never meet the new one during that scan - resulting in skipping
+	 * that tuple at all.
+	 *
 	 * Does _not_ include:
 	 * - in-progress transactions (as of the current instant)
 	 * -------------------------------------------------------------------------
@@ -82,6 +89,13 @@ typedef enum SnapshotType
 	 * transaction and committed/aborted xacts are concerned.  However, it
 	 * also includes the effects of other xacts still in progress.
 	 *
+	 * Note: such a snapshot may miss an existing logical tuple in case of
+	 * parallel update.
+	 * If a new version of a tuple is inserted into an already processed page but the
+	 * old one marked with committed/in-progress xmax - snapshot will skip the old one
+	 * and never meet the new one during that scan - resulting in skipping that tuple
+	 * at all.
+	 *
 	 * A special hack is that when a snapshot of this type is used to
 	 * determine tuple visibility, the passed-in snapshot struct is used as an
 	 * output argument to return the xids of concurrent xacts that affected
diff --git a/src/test/subscription/Makefile b/src/test/subscription/Makefile
index 50b65d8f6ea..51d28eca091 100644
--- a/src/test/subscription/Makefile
+++ b/src/test/subscription/Makefile
@@ -16,6 +16,7 @@ include $(top_builddir)/src/Makefile.global
 EXTRA_INSTALL = contrib/hstore
 
 export with_icu
+export enable_injection_points
 
 check:
 	$(prove_check)
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index 586ffba434e..8b24d76a247 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -5,7 +5,10 @@ tests += {
   'sd': meson.current_source_dir(),
   'bd': meson.current_build_dir(),
   'tap': {
-    'env': {'with_icu': icu.found() ? 'yes' : 'no'},
+    'env': {
+      'with_icu': icu.found() ? 'yes' : 'no',
+      'enable_injection_points': get_option('injection_points') ? 'yes' : 'no'
+    },
     'tests': [
       't/001_rep_changes.pl',
       't/002_types.pl',
@@ -42,6 +45,10 @@ tests += {
       't/033_run_as_table_owner.pl',
       't/034_temporal.pl',
       't/035_conflicts.pl',
+      't/036_delete_missing_race.pl',
+      't/037_update_missing_race.pl',
+      't/038_update_missing_with_retain.pl',
+      't/039_update_missing_simulation.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/036_delete_missing_race.pl b/src/test/subscription/t/036_delete_missing_race.pl
new file mode 100644
index 00000000000..a319513fd60
--- /dev/null
+++ b/src/test/subscription/t/036_delete_missing_race.pl
@@ -0,0 +1,137 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection and resolution in logical replication
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+############################## Set it to 0 to make set success; TODO: delete that for commit
+my $simulate_race_condition = 1;
+##############################
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_publisher->start;
+
+
+# Create subscriber node with track_commit_timestamp enabled
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_subscriber->start;
+
+
+# Check if the extension injection_points is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node_subscriber->check_extension('injection_points'))
+{
+	plan skip_all => 'Extension injection_points not installed';
+}
+
+# Create table on publisher
+$node_publisher->safe_psql(
+	'postgres',
+	"CREATE TABLE conf_tab(a int PRIMARY key, data text);");
+
+# Create similar table on subscriber with additional index to disable HOT updates
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE TABLE conf_tab(a int PRIMARY key, data text);
+	 CREATE INDEX data_index ON conf_tab(data);");
+
+# Set up extension to simulate race condition
+$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE conf_tab");
+
+# Insert row to be updated later
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO conf_tab(a, data) VALUES (1,'frompub')");
+
+# Create the subscription
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE SUBSCRIPTION tap_sub
+	 CONNECTION '$publisher_connstr application_name=$appname'
+	 PUBLICATION tap_pub");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+############################################
+# Race condition because of DirtySnapshot
+############################################
+
+my $psql_session_subscriber = $node_subscriber->background_psql('postgres');
+if ($simulate_race_condition)
+{
+	$node_subscriber->safe_psql('postgres',
+		"SELECT injection_points_attach('index_getnext_slot_before_fetch_apply_dirty', 'wait')");
+}
+
+my $log_offset = -s $node_subscriber->logfile;
+
+# Delete tuple on publisher
+$node_publisher->safe_psql('postgres', "DELETE FROM conf_tab WHERE a=1;");
+
+if ($simulate_race_condition)
+{
+	# Wait apply worker to start the search for the tuple using index
+	$node_subscriber->wait_for_event('logical replication apply worker',
+		'index_getnext_slot_before_fetch_apply_dirty');
+}
+
+# Updater tuple on subscriber
+$psql_session_subscriber->query_until(
+	qr/start/, qq[
+	\\echo start
+	UPDATE conf_tab SET data = 'fromsubnew' WHERE (a=1);
+]);
+
+
+if ($simulate_race_condition)
+{
+	# Wake up apply worker
+	$node_subscriber->safe_psql('postgres',"
+		SELECT injection_points_detach('index_getnext_slot_before_fetch_apply_dirty');
+		SELECT injection_points_wakeup('index_getnext_slot_before_fetch_apply_dirty');
+		");
+}
+
+# Tuple was updated - so, we have conflict
+$node_subscriber->wait_for_log(
+	qr/conflict detected on relation \"public.conf_tab\"/,
+	$log_offset);
+
+# But tuple should be deleted on subscriber any way
+is($node_subscriber->safe_psql('postgres', 'SELECT count(*) from conf_tab'), 0, 'record deleted on subscriber');
+
+ok(!$node_subscriber->log_contains(
+		qr/LOG:  conflict detected on relation \"public.conf_tab\": conflict=delete_missing/,
+		$log_offset), 'invalid conflict detected');
+
+ok($node_subscriber->log_contains(
+		qr/LOG:  conflict detected on relation \"public.conf_tab\": conflict=delete_origin_differs/,
+		$log_offset), 'correct conflict detected');
+
+done_testing();
diff --git a/src/test/subscription/t/037_update_missing_race.pl b/src/test/subscription/t/037_update_missing_race.pl
new file mode 100644
index 00000000000..b71fdc0c136
--- /dev/null
+++ b/src/test/subscription/t/037_update_missing_race.pl
@@ -0,0 +1,139 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection and resolution in logical replication
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+############################## Set it to 0 to make set success; TODO: delete that for commit
+my $simulate_race_condition = 1;
+##############################
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_publisher->start;
+
+
+# Create subscriber node with track_commit_timestamp enabled
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_subscriber->start;
+
+
+# Check if the extension injection_points is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node_subscriber->check_extension('injection_points'))
+{
+	plan skip_all => 'Extension injection_points not installed';
+}
+
+# Create table on publisher
+$node_publisher->safe_psql(
+	'postgres',
+	"CREATE TABLE conf_tab(a int PRIMARY key, data text);");
+
+# Create similar table on subscriber with additional index to disable HOT updates and additional column
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE TABLE conf_tab(a int PRIMARY key, data text, i int DEFAULT 0);
+	 CREATE INDEX i_index ON conf_tab(i);");
+
+# Set up extension to simulate race condition
+$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE conf_tab");
+
+# Insert row to be updated later
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO conf_tab(a, data) VALUES (1,'frompub')");
+
+# Create the subscription
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE SUBSCRIPTION tap_sub
+	 CONNECTION '$publisher_connstr application_name=$appname'
+	 PUBLICATION tap_pub");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+############################################
+# Race condition because of DirtySnapshot
+############################################
+
+my $psql_session_subscriber = $node_subscriber->background_psql('postgres');
+if ($simulate_race_condition)
+{
+	$node_subscriber->safe_psql('postgres', "SELECT injection_points_attach('index_getnext_slot_before_fetch_apply_dirty', 'wait')");
+}
+
+my $log_offset = -s $node_subscriber->logfile;
+
+# Update tuple on publisher
+$node_publisher->safe_psql('postgres',
+	"UPDATE conf_tab SET data = 'frompubnew' WHERE (a=1);");
+
+
+if ($simulate_race_condition)
+{
+	# Wait apply worker to start the search for the tuple using index
+	$node_subscriber->wait_for_event('logical replication apply worker', 'index_getnext_slot_before_fetch_apply_dirty');
+}
+
+# Update additional(!) column on the subscriber
+$psql_session_subscriber->query_until(
+	qr/start/, qq[
+	\\echo start
+	UPDATE conf_tab SET i = 1 WHERE (a=1);
+]);
+
+
+if ($simulate_race_condition)
+{
+	# Wake up apply worker
+	$node_subscriber->safe_psql('postgres',"
+		SELECT injection_points_detach('index_getnext_slot_before_fetch_apply_dirty');
+		SELECT injection_points_wakeup('index_getnext_slot_before_fetch_apply_dirty');
+		");
+}
+
+# Tuple was updated - so, we have conflict
+$node_subscriber->wait_for_log(
+	qr/conflict detected on relation \"public.conf_tab\"/,
+	$log_offset);
+
+# We need new column value be synced with subscriber
+is($node_subscriber->safe_psql('postgres', 'SELECT data from conf_tab WHERE a = 1'), 'frompubnew', 'record updated on subscriber');
+# And additional column maintain updated value
+is($node_subscriber->safe_psql('postgres', 'SELECT i from conf_tab WHERE a = 1'), 1, 'column record updated on subscriber');
+
+ok(!$node_subscriber->log_contains(
+		qr/LOG:  conflict detected on relation \"public.conf_tab\": conflict=update_missing/,
+		$log_offset), 'invalid conflict detected');
+
+ok($node_subscriber->log_contains(
+		qr/LOG:  conflict detected on relation \"public.conf_tab\": conflict=update_origin_differs/,
+		$log_offset), 'correct conflict detected');
+
+done_testing();
diff --git a/src/test/subscription/t/038_update_missing_with_retain.pl b/src/test/subscription/t/038_update_missing_with_retain.pl
new file mode 100644
index 00000000000..6f7dfd28d37
--- /dev/null
+++ b/src/test/subscription/t/038_update_missing_with_retain.pl
@@ -0,0 +1,141 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection and resolution in logical replication
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+############################## Set it to 0 to make set success; TODO: delete that for commit
+my $simulate_race_condition = 1;
+##############################
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_publisher->start;
+
+
+# Create subscriber node with track_commit_timestamp enabled
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_subscriber->append_conf('postgresql.conf',
+	qq(wal_level = 'replica'));
+$node_subscriber->start;
+
+
+# Check if the extension injection_points is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node_subscriber->check_extension('injection_points'))
+{
+	plan skip_all => 'Extension injection_points not installed';
+}
+
+# Create table on publisher
+$node_publisher->safe_psql(
+	'postgres',
+	"CREATE TABLE conf_tab(a int PRIMARY key, data text);");
+
+# Create similar table on subscriber with additional index to disable HOT updates and additional column
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE TABLE conf_tab(a int PRIMARY key, data text, i int DEFAULT 0);
+	 CREATE INDEX i_index ON conf_tab(i);");
+
+# Set up extension to simulate race condition
+$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE conf_tab");
+
+# Insert row to be updated later
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO conf_tab(a, data) VALUES (1,'frompub')");
+
+# Create the subscription
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE SUBSCRIPTION tap_sub
+	 CONNECTION '$publisher_connstr application_name=$appname'
+	 PUBLICATION tap_pub WITH (retain_dead_tuples = true)");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+############################################
+# Race condition because of DirtySnapshot
+############################################
+
+my $psql_session_subscriber = $node_subscriber->background_psql('postgres');
+if ($simulate_race_condition)
+{
+	$node_subscriber->safe_psql('postgres', "SELECT injection_points_attach('index_getnext_slot_before_fetch_apply_dirty', 'wait')");
+}
+
+my $log_offset = -s $node_subscriber->logfile;
+
+# Update tuple on publisher
+$node_publisher->safe_psql('postgres',
+	"UPDATE conf_tab SET data = 'frompubnew' WHERE (a=1);");
+
+
+if ($simulate_race_condition)
+{
+	# Wait apply worker to start the search for the tuple using index
+	$node_subscriber->wait_for_event('logical replication apply worker', 'index_getnext_slot_before_fetch_apply_dirty');
+}
+
+# Update additional(!) column on the subscriber
+$psql_session_subscriber->query_until(
+	qr/start/, qq[
+	\\echo start
+	UPDATE conf_tab SET i = 1 WHERE (a=1);
+]);
+
+
+if ($simulate_race_condition)
+{
+	# Wake up apply worker
+	$node_subscriber->safe_psql('postgres',"
+		SELECT injection_points_detach('index_getnext_slot_before_fetch_apply_dirty');
+		SELECT injection_points_wakeup('index_getnext_slot_before_fetch_apply_dirty');
+		");
+}
+
+# Tuple was updated - so, we have conflict
+$node_subscriber->wait_for_log(
+	qr/conflict detected on relation \"public.conf_tab\"/,
+	$log_offset);
+
+# We need new column value be synced with subscriber
+is($node_subscriber->safe_psql('postgres', 'SELECT data from conf_tab WHERE a = 1'), 'frompubnew', 'record updated on subscriber');
+# And additional column maintain updated value
+is($node_subscriber->safe_psql('postgres', 'SELECT i from conf_tab WHERE a = 1'), 1, 'column record updated on subscriber');
+
+ok(!$node_subscriber->log_contains(
+		qr/LOG:  conflict detected on relation \"public.conf_tab\": conflict=update_deleted/,
+		$log_offset), 'invalid conflict detected');
+
+ok($node_subscriber->log_contains(
+		qr/LOG:  conflict detected on relation \"public.conf_tab\": conflict=update_origin_differs/,
+		$log_offset), 'correct conflict detected');
+
+done_testing();
diff --git a/src/test/subscription/t/039_update_missing_simulation.pl b/src/test/subscription/t/039_update_missing_simulation.pl
new file mode 100644
index 00000000000..322e931c171
--- /dev/null
+++ b/src/test/subscription/t/039_update_missing_simulation.pl
@@ -0,0 +1,123 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection and resolution in logical replication
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use IPC::Run qw(start finish);
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_publisher->start;
+
+# Create subscriber node with track_commit_timestamp enabled
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_subscriber->start;
+
+# Check if the extension injection_points is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node_subscriber->check_extension('injection_points'))
+{
+	plan skip_all => 'Extension injection_points not installed';
+}
+
+# Create table on publisher
+$node_publisher->safe_psql(
+	'postgres',
+	"CREATE TABLE tbl(a int PRIMARY key, data_pub int);");
+
+# Create similar table on subscriber with additional index to disable HOT updates
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE TABLE tbl(a int PRIMARY key, data_pub int, data_sub int default 0);
+	 CREATE INDEX data_index ON tbl(data_pub);");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE tbl");
+
+# Create the subscription
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE SUBSCRIPTION tap_sub
+	 CONNECTION '$publisher_connstr application_name=$appname'
+	 PUBLICATION tap_pub");
+
+my $num_rows = 10;
+my $num_updates = 10000;
+my $num_clients = 10;
+$node_publisher->safe_psql('postgres', "INSERT INTO tbl SELECT i, i * i FROM generate_series(1,$num_rows) i");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+# Prepare small pgbench scripts as files
+my $sub_sql = $node_subscriber->basedir . '/sub_update.sql';
+my $pub_sql = $node_publisher->basedir . '/pub_delete.sql';
+
+open my $fh1, '>', $sub_sql or die $!;
+print $fh1 "\\set num random(1,$num_rows)\nUPDATE tbl SET data_sub = data_sub + 1 WHERE a = :num;\n";
+close $fh1;
+
+open my $fh2, '>', $pub_sql or die $!;
+print $fh2 "\\set num random(1,$num_rows)\nUPDATE tbl SET data_pub = data_pub + 1 WHERE a = :num;\n";
+close $fh2;
+
+my @sub_cmd = (
+	'pgbench',
+	'--no-vacuum', "--client=$num_clients", '--jobs=4', '--exit-on-abort', "--transactions=$num_updates",
+	'-p', $node_subscriber->port, '-h', $node_subscriber->host, '-f', $sub_sql, 'postgres'
+);
+
+my @pub_cmd = (
+	'pgbench',
+	'--no-vacuum', "--client=$num_clients", '--jobs=4', '--exit-on-abort', "--transactions=$num_updates",
+	'-p', $node_publisher->port, '-h', $node_publisher->host, '-f', $pub_sql, 'postgres'
+);
+
+$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+# This should never happen
+$node_subscriber->safe_psql('postgres',
+		"SELECT injection_points_attach('apply_handle_update_internal_update_missing', 'error')");
+my $log_offset = -s $node_subscriber->logfile;
+
+# Start both concurrently
+my ($sub_out, $sub_err, $pub_out, $pub_err) = ('', '', '', '');
+my $sub_h = start \@sub_cmd, '>', \$sub_out, '2>', \$sub_err;
+my $pub_h = start \@pub_cmd, '>', \$pub_out, '2>', \$pub_err;
+
+# Wait for completion
+finish $sub_h;
+finish $pub_h;
+
+like($sub_out, qr/actually processed/, 'subscriber pgbench completed');
+like($pub_out, qr/actually processed/, 'publisher pgbench completed');
+
+# Let subscription catch up, then check expectations
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+ok(!$node_subscriber->log_contains(
+		qr/ERROR:  error triggered for injection point apply_handle_update_internal_update_missing/,
+		$log_offset), 'invalid conflict detected');
+
+done_testing();
-- 
2.48.1

Reply via email to