Antonin Houska <[email protected]> wrote:

> Antonin Houska <[email protected]> wrote:
> 
> > Antonin Houska <[email protected]> wrote:
> > 
> > > Srinath Reddy Sadipiralla <[email protected]> wrote:
> > > 
> > > > The concurrency test failed once. I tried to reproduce the below 
> > > > scenario
> > > > but no luck,i think the reason the assert failure happened because
> > > > after speculative insert there might be no spec CONFIRM or ABORT, 
> > > > thoughts?
> > > 
> > > Perhaps, I'll try. I'm not sure the REPACK decoding worker does anthing
> > > special regarding decoding. If you happen to see the problem again, 
> > > please try
> > > to preserve the related WAL segments - if this is a bug in PG executor,
> > > pg_waldump might reveal that.
> > 
> > I could not reproduce the failure, and have no idea how speculative insert 
> > can
> > stay w/o CONFIRM / ABORT record. The only problem I could imagine is that
> > change_useless_for_repack() filters out the CONFIRM / ABORT record
> > accidentally, but neither code review nor debugger proves that
> > theory. (Actually if this was the problem, the test failure probably 
> > wouldn't
> > be that rare.)
> 
> I confirm that I was able to reproduce the crash using debugger and your more
> recent diagnosis [1]. Indeed, filtering was the problem.
> 
> Unfortunately, I wasn't able to make the crash easily reproducible using
> isolation tester. The problem is that the logical decoding is performed by a
> background worker, and when the backend executing REPACK waits for the
> background worker, which in turn waits on an injection point, the isolation
> tester does not recognize that it's effectively the backend who is waiting on
> the injection point. Therefore the isolation tester does not proceed to the
> next step.

I could not resist digging in it deeper :-) Attached is a test that reproduces
the crash - it includes the isolation tester enhancement that I posted
separately [1]. It crashes reliably with v43 [2] if your fix v43-0005 is
omitted.

[1] https://www.postgresql.org/message-id/4703.1774250534%40localhost
[2] 
https://www.postgresql.org/message-id/202603191855.fzsgsnyzfvpt%40alvherre.pgsql

-- 
Antonin Houska
Web: https://www.cybertec-postgresql.com

>From f3a1371656da372e26cb56bf543cbaa0c99720a8 Mon Sep 17 00:00:00 2001
From: Antonin Houska <[email protected]>
Date: Mon, 23 Mar 2026 12:50:15 +0100
Subject: [PATCH] Reproduce filtering issue.

---
 contrib/test_decoding/expected/filtering.out |  74 ++++++++++++++
 contrib/test_decoding/specs/filtering.spec   | 101 +++++++++++++++++++
 src/backend/executor/nodeModifyTable.c       |   2 +
 src/backend/replication/logical/snapbuild.c  |   3 +
 src/test/isolation/isolationtester.c         |   9 +-
 5 files changed, 188 insertions(+), 1 deletion(-)
 create mode 100644 contrib/test_decoding/expected/filtering.out
 create mode 100644 contrib/test_decoding/specs/filtering.spec

diff --git a/contrib/test_decoding/expected/filtering.out b/contrib/test_decoding/expected/filtering.out
new file mode 100644
index 00000000000..6ba9690509f
--- /dev/null
+++ b/contrib/test_decoding/expected/filtering.out
@@ -0,0 +1,74 @@
+Parsed test spec with 5 sessions
+
+starting permutation: s1_assign_xid s3_repack s2_assign_xid s1_rollback s4_insert s5_wakeup_snapbuild s2_rollback s5_wakeup_insert_speculative s4_insert_commit s5_wakeup_repack
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step s1_assign_xid: 
+    BEGIN;
+    CREATE TABLE c(i int);
+
+step s3_repack: 
+	REPACK (CONCURRENTLY) a;
+ <waiting ...>
+step s2_assign_xid: 
+    BEGIN;
+    CREATE TABLE d(i int);
+
+step s1_rollback: 
+    ROLLBACK;
+
+step s4_insert: 
+	BEGIN;
+	INSERT INTO t(i)
+	SELECT max(i) + 1 FROM t ON CONFLICT (i) DO UPDATE SET i=EXCLUDED.i;
+ <waiting ...>
+step s5_wakeup_snapbuild: 
+	SELECT injection_points_wakeup('snapbuild-full');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step s2_rollback: 
+    ROLLBACK;
+
+step s5_wakeup_insert_speculative: 
+	SELECT injection_points_wakeup('insert-speculative-before-confirm');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step s4_insert: <... completed>
+step s4_insert_commit: 
+	COMMIT;
+
+step s5_wakeup_repack: 
+	SELECT injection_points_wakeup('repack-concurrently-before-lock');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step s3_repack: <... completed>
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
diff --git a/contrib/test_decoding/specs/filtering.spec b/contrib/test_decoding/specs/filtering.spec
new file mode 100644
index 00000000000..a02c9f8facb
--- /dev/null
+++ b/contrib/test_decoding/specs/filtering.spec
@@ -0,0 +1,101 @@
+setup
+{
+	CREATE TABLE a(i int primary key, j int) WITH (autovacuum_enabled = off);
+	INSERT INTO a(i, j) VALUES (1, 1), (2, 2);
+	CREATE TABLE t(i int primary key);
+	INSERT INTO t(i) VALUES (1);
+	CREATE EXTENSION injection_points;
+}
+
+session s1
+step s1_assign_xid
+{
+    BEGIN;
+    CREATE TABLE c(i int);
+}
+step s1_rollback
+{
+    ROLLBACK;
+}
+
+session s2
+step s2_assign_xid
+{
+    BEGIN;
+    CREATE TABLE d(i int);
+}
+step s2_rollback
+{
+    ROLLBACK;
+}
+
+session s3
+setup
+{
+	SELECT injection_points_attach('snapbuild-full', 'wait');
+	SELECT injection_points_attach('repack-concurrently-before-lock', 'wait');
+}
+step s3_repack
+{
+	REPACK (CONCURRENTLY) a;
+}
+teardown
+{
+	SELECT injection_points_detach('repack-concurrently-before-lock');
+	SELECT injection_points_detach('snapbuild-full');
+}
+
+session s4
+setup
+{
+	SELECT injection_points_set_local();
+	SELECT injection_points_attach('insert-speculative-before-confirm', 'wait');
+}
+step s4_insert
+{
+	BEGIN;
+	INSERT INTO t(i)
+	SELECT max(i) + 1 FROM t ON CONFLICT (i) DO UPDATE SET i=EXCLUDED.i;
+}
+step s4_insert_commit
+{
+	COMMIT;
+}
+teardown
+{
+	SELECT injection_points_detach('insert-speculative-before-confirm');
+}
+
+session s5
+step s5_wakeup_snapbuild
+{
+	SELECT injection_points_wakeup('snapbuild-full');
+}
+step s5_wakeup_insert_speculative
+{
+	SELECT injection_points_wakeup('insert-speculative-before-confirm');
+}
+step s5_wakeup_repack
+{
+	SELECT injection_points_wakeup('repack-concurrently-before-lock');
+}
+
+permutation
+# Bring the snapshot builder to the FULL_SNAPSHOT state.
+s1_assign_xid
+s3_repack
+s2_assign_xid
+s1_rollback
+# Perform the speculative insert, but no confirmation so far. The snapshot
+# builder should decode it.
+s4_insert
+# Let the snapshout builder achieve CONSISTENT state and finish the setup.
+s5_wakeup_snapbuild
+s2_rollback
+# While REPACK is waiting on repack-concurrently-before-lock, let the insert
+# get confirmed. Relation filtering is now enabled.
+s5_wakeup_insert_speculative
+s4_insert_commit
+# REPACK should now decode the speculative insert and decode the speculative
+# insert (with the confirmation record filtered out).
+s5_wakeup_repack
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 680c29f35d5..6d5482e5746 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -1232,6 +1232,8 @@ ExecInsert(ModifyTableContext *context,
 												   slot, arbiterIndexes,
 												   &specConflict);
 
+			INJECTION_POINT("insert-speculative-before-confirm", NULL);
+
 			/* adjust the tuple's state accordingly */
 			table_tuple_complete_speculative(resultRelationDesc, slot,
 											 specToken, !specConflict);
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index fbdd4600a2b..883e5b6e18f 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -141,6 +141,7 @@
 #include "storage/procarray.h"
 #include "storage/standby.h"
 #include "utils/builtins.h"
+#include "utils/injection_point.h"
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
 #include "utils/snapshot.h"
@@ -1390,6 +1391,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 		builder->state = SNAPBUILD_FULL_SNAPSHOT;
 		builder->next_phase_at = running->nextXid;
 
+		INJECTION_POINT("snapbuild-full", NULL);
+
 		ereport(LOG,
 				errmsg("logical decoding found initial consistent point at %X/%08X",
 					   LSN_FORMAT_ARGS(lsn)),
diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c
index 440c875b8ac..8f17ee412c9 100644
--- a/src/test/isolation/isolationtester.c
+++ b/src/test/isolation/isolationtester.c
@@ -216,15 +216,22 @@ main(int argc, char **argv)
 	 * exactly expect concurrent use of test tables.  However, autovacuum will
 	 * occasionally take AccessExclusiveLock to truncate a table, and we must
 	 * ignore that transient wait.
+	 *
+	 * If the session's backend is blocked, and if its background worker is
+	 * waiting on an injection point, we assume that the injection point is
+	 * the reason for the backend to be blocked. That's what we check in the
+	 * second query of the UNION. XXX Should we use a separate query for that?
 	 */
 	initPQExpBuffer(&wait_query);
 	appendPQExpBufferStr(&wait_query,
+						 "WITH blocking(res) AS ("
 						 "SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{");
 	/* The spec syntax requires at least one session; assume that here. */
 	appendPQExpBufferStr(&wait_query, conns[1].backend_pid_str);
 	for (i = 2; i < nconns; i++)
 		appendPQExpBuffer(&wait_query, ",%s", conns[i].backend_pid_str);
-	appendPQExpBufferStr(&wait_query, "}')");
+	appendPQExpBufferStr(&wait_query, "}') UNION "
+						 "SELECT pg_catalog.pg_isolation_test_session_is_blocked(pid, '{}') FROM pg_stat_activity WHERE leader_pid=$1) SELECT bool_or(res) FROM blocking");
 
 	res = PQprepare(conns[0].conn, PREP_WAITING, wait_query.data, 0, NULL);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-- 
2.47.3

Reply via email to