Hi Hackers,

I have accidentally noticed that pg_replication_slot_advance only changes in-memory state of the slot when its type is physical. Its new value does not survive restart.

Reproduction steps:

1) Create new slot and remember its restart_lsn

SELECT pg_create_physical_replication_slot('slot1', true);
SELECT * from pg_replication_slots;

2) Generate some dummy WAL

CHECKPOINT;
SELECT pg_switch_wal();
CHECKPOINT;
SELECT pg_switch_wal();

3) Advance slot to the value of pg_current_wal_insert_lsn()

SELECT pg_replication_slot_advance('slot1', '0/160001A0');

4) Check that restart_lsn has been updated

SELECT * from pg_replication_slots;

5) Restart server and check restart_lsn again. It should be the same as in the step 1.


I dig into the code and it happens because of this if statement:

    /* Update the on disk state when lsn was updated. */
    if (XLogRecPtrIsInvalid(endlsn))
    {
        ReplicationSlotMarkDirty();
        ReplicationSlotsComputeRequiredXmin(false);
        ReplicationSlotsComputeRequiredLSN();
        ReplicationSlotSave();
    }

Actually, endlsn is always a valid LSN after the execution of replication slot advance guts. It works for logical slots only by chance, since there is an implicit ReplicationSlotMarkDirty() call inside LogicalConfirmReceivedLocation.

Attached is a small patch, which fixes this bug. I have tried to
stick to the same logic in this 'if (XLogRecPtrIsInvalid(endlsn))'
and now pg_logical_replication_slot_advance and
pg_physical_replication_slot_advance return InvalidXLogRecPtr if
no-op.

What do you think?


Regards
--
Alexey Kondratov

Postgres Professional https://www.postgrespro.com
Russian Postgres Company

P.S. CCed Simon and Michael as they are the last who seriously touched pg_replication_slot_advance code.

>From 36d1fa2a89b3fb354a813354496df475ee11b62e Mon Sep 17 00:00:00 2001
From: Alexey Kondratov <kondratov.alek...@gmail.com>
Date: Tue, 24 Dec 2019 18:21:50 +0300
Subject: [PATCH v1] Make phsycal replslot advance persistent

---
 src/backend/replication/slotfuncs.c | 17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)

diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 46e6dd4d12..826708d3f6 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -358,12 +358,14 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
  * The LSN position to move to is compared simply to the slot's restart_lsn,
  * knowing that any position older than that would be removed by successive
  * checkpoints.
+ *
+ * Returns InvalidXLogRecPtr if no-op.
  */
 static XLogRecPtr
 pg_physical_replication_slot_advance(XLogRecPtr moveto)
 {
 	XLogRecPtr	startlsn = MyReplicationSlot->data.restart_lsn;
-	XLogRecPtr	retlsn = startlsn;
+	XLogRecPtr	retlsn = InvalidXLogRecPtr;
 
 	if (startlsn < moveto)
 	{
@@ -386,6 +388,8 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
  * because we need to digest WAL to advance restart_lsn allowing to recycle
  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
  * mode, no changes are generated anyway.
+ *
+ * Returns InvalidXLogRecPtr if no-op.
  */
 static XLogRecPtr
 pg_logical_replication_slot_advance(XLogRecPtr moveto)
@@ -393,7 +397,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 	LogicalDecodingContext *ctx;
 	ResourceOwner old_resowner = CurrentResourceOwner;
 	XLogRecPtr	startlsn;
-	XLogRecPtr	retlsn;
+	XLogRecPtr	retlsn = InvalidXLogRecPtr;
 
 	PG_TRY();
 	{
@@ -414,9 +418,6 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 		 */
 		startlsn = MyReplicationSlot->data.restart_lsn;
 
-		/* Initialize our return value in case we don't do anything */
-		retlsn = MyReplicationSlot->data.confirmed_flush;
-
 		/* invalidate non-timetravel entries */
 		InvalidateSystemCaches();
 
@@ -480,9 +481,9 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 			 * better than always losing the position even on clean restart.
 			 */
 			ReplicationSlotMarkDirty();
-		}
 
-		retlsn = MyReplicationSlot->data.confirmed_flush;
+			retlsn = MyReplicationSlot->data.confirmed_flush;
+		}
 
 		/* free context, call shutdown callback */
 		FreeDecodingContext(ctx);
@@ -575,7 +576,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 	nulls[0] = false;
 
 	/* Update the on disk state when lsn was updated. */
-	if (XLogRecPtrIsInvalid(endlsn))
+	if (!XLogRecPtrIsInvalid(endlsn))
 	{
 		ReplicationSlotMarkDirty();
 		ReplicationSlotsComputeRequiredXmin(false);
-- 
2.17.1

Reply via email to