On Fri, 2 May 2025 at 09:23, vignesh C <[email protected]> wrote:
>
> On Fri, 2 May 2025 at 06:30, Tom Lane <[email protected]> wrote:
> >
> > vignesh C <[email protected]> writes:
> > > I agree with your analysis. I was able to reproduce the issue by
> > > delaying the invalidation of the subscription until the walsender
> > > finished decoding the INSERT operation following the ALTER
> > > SUBSCRIPTION through a debugger and using the lsn from the pg_waldump
> > > of the INSERT after the ALTER SUBSCRIPTION.
> >
> > Can you be a little more specific about how you reproduced this?
> > I tried inserting sleep() calls in various likely-looking spots
> > and could not get a failure that way.
>
> Test Steps:
> 1) Set up logical replication:
> Create a publication on the publisher
> Create a subscription on the subscriber
> 2) Create the following table on the publisher:
> CREATE TABLE tab_3 (a int);
> 3) Create the same table on the subscriber:
> CREATE TABLE tab_3 (a int);
> 4) On the subscriber, alter the subscription to refer to a
> non-existent publication:
> ALTER SUBSCRIPTION sub1 SET PUBLICATION tap_pub_3;
> 5) Insert data on the publisher:
> INSERT INTO tab_3 VALUES (1);
>
> As expected, the publisher logs the following warning in normal case:
> 2025-05-02 08:56:45.350 IST [516197] WARNING: skipped loading
> publication: tap_pub_3
> 2025-05-02 08:56:45.350 IST [516197] DETAIL: The publication does
> not exist at this point in the WAL.
> 2025-05-02 08:56:45.350 IST [516197] HINT: Create the publication
> if it does not exist.
>
> To simulate a delay in subscription invalidation, I modified the
> maybe_reread_subscription() function as follows:
> diff --git a/src/backend/replication/logical/worker.c
> b/src/backend/replication/logical/worker.c
> index 4151a4b2a96..0831784aca3 100644
> --- a/src/backend/replication/logical/worker.c
> +++ b/src/backend/replication/logical/worker.c
> @@ -3970,6 +3970,10 @@ maybe_reread_subscription(void)
> MemoryContext oldctx;
> Subscription *newsub;
> bool started_tx = false;
> + bool test = true;
> +
> + if (test)
> + return;
>
> This change delays the subscription invalidation logic, preventing the
> apply worker from detecting the subscription change immediately.
>
> With the patch applied, repeat steps 1–5.
> Using pg_waldump, identify the LSN of the insert:
> rmgr: Heap len (rec/tot): 59/ 59, tx: 756, lsn:
> 0/01711848, prev 0/01711810, desc: INSERT+INIT off: 1
> rmgr: Transaction len (rec/tot): 46/ 46, tx: 756, lsn:
> 0/01711888, prev 0/01711848, desc: COMMIT 2025-05-02 09:06:09.400926
> IST
>
> Check the confirmed flush LSN from the walsender via gdb by attaching
> it to the walsender process
> (gdb) p *MyReplicationSlot
> ...
> confirmed_flush = 24241928
> (gdb) p /x 24241928
> $4 = 0x171e708
>
> Now attach to the apply worker, set a breakpoint at
> maybe_reread_subscription, and continue execution. Once control
> reaches the function, set test = false. Now it will identify that
> subscription is invalidated and restart the apply worker.
>
> As the walsender has already confirmed_flush position after the
> insert, causing the newly started apply worker to miss the inserted
> row entirely. This leads to the CI failure. This issue can arise when
> the walsender advances more quickly than the apply worker is able to
> detect and react to the subscription change.
>
> I could not find a simpler way to reproduce this.
A simpler way to consistently reproduce the issue is to add a 1-second
sleep in the LogicalRepApplyLoop function, just before the call to
WaitLatchOrSocket. This reproduces the test failure consistently for
me. The failure reason is the same as in [1].
[1] -
https://www.postgresql.org/message-id/CALDaNm2Q_pfwiCkaV920iXEbh4D%3D5MmD_tNQm_GRGX6-MsLxoQ%40mail.gmail.com
Regards,
Vignesh
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 4151a4b2a96..d0056f5655c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3702,6 +3702,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
if (last_received < end_lsn)
last_received = end_lsn;
+ elog(LOG, "Send feedback from 1");
send_feedback(last_received, reply_requested, false);
UpdateWorkerStats(last_received, timestamp, true);
}
@@ -3714,6 +3715,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
}
}
+ elog(LOG, "Send feedback from 2");
/* confirm all writes so far */
send_feedback(last_received, false, false);
@@ -3739,6 +3741,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
if (endofstream)
break;
+ sleep(1);
+
/*
* Wait for more data or latch. If we have unflushed transactions,
* wake up after WalWriterDelay to see if they've been flushed yet (in
@@ -3812,6 +3816,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
}
}
+ elog(LOG, "Send feedback from 3");
send_feedback(last_received, requestReply, requestReply);
/*
@@ -3910,7 +3915,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
pq_sendint64(reply_message, now); /* sendTime */
pq_sendbyte(reply_message, requestReply); /* replyRequested */
- elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
+ elog(LOG, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
force,
LSN_FORMAT_ARGS(recvpos),
LSN_FORMAT_ARGS(writepos),
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9fa8beb6103..9896a8d74d5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -4062,7 +4062,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
static void
WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
{
- elog(DEBUG2, "sending replication keepalive");
+ elog(LOG, "sending replication keepalive - writePtr %X/%X",
+ LSN_FORMAT_ARGS(writePtr));
/* construct the message... */
resetStringInfo(&output_message);