On Thu, Jan 16, 2020 at 08:09:09PM +0300, Alexey Kondratov wrote:
> OK, I have definitely overthought that, thanks. This looks like a minimal
> subset of changes that actually solves the bug. I would only prefer to keep
> some additional comments (something like the attached), otherwise after half
> a year it will be unclear again, why we save slot unconditionally here.

Since this email, Andres has sent an email that did not reach the
community lists, but where all the participants of this thread were in
CC.  Here is a summary of the points raised (please correct me if that
does not sound right to you, Andres):
1) The slot advancing has to mark the slot as dirty, but should we
make the change persistent at the end of the function or should we
wait for a checkpoint to do the work, meaning that any update done to
the slot would be lost if a crash occurs in-between?  Note that we
have this commit in slotfuncs.c for
pg_logical_replication_slot_advance():
 * Dirty the slot so it's written out at the next checkpoint.
 * We'll still lose its position on crash, as documented, but it's
 * better than always losing the position even on clean restart.

This comment refers to the documentation for the logical decoding
section (see logicaldecoding-replication-slots in
logicaldecoding.sgml), and even if nothing can be done until the slot
advance function reaches its hand, we ought to make the data
persistent if we can.

The original commit that introduced slot advancing is 9c7d06d.  Here
is the thread, where this point was not really mentioned by the way:
https://www.postgresql.org/message-id/5c26ff40-8452-fb13-1bea-56a0338a8...@2ndquadrant.com

2) pg_replication_slot_advance() includes this code, which is broken:
    /* Update the on disk state when lsn was updated. */
    if (XLogRecPtrIsInvalid(endlsn))
    {
        ReplicationSlotMarkDirty();
        ReplicationSlotsComputeRequiredXmin(false);
        ReplicationSlotsComputeRequiredLSN();
        ReplicationSlotSave();
    }
Here the deal is that endlsn, aka the LSN where the slot has been
advanced (or its current position if no progress has been done) never 
gets to be set to InvalidXLogRecPtr as of f731cfa, and that this work
should be done only when endlsn has done some progress.  It seems to
me that this should have been the opposite to begin with in 9c7d06d,
aka do the save if endlsn is valid.

3) The amount of testing related to slot advancing could be better
with cluster-wide operations.

@@ -370,6 +370,11 @@ pg_physical_replication_slot_advance(XLogRecPtr
moveto)
    MyReplicationSlot->data.restart_lsn = moveto;

    SpinLockRelease(&MyReplicationSlot->mutex);
    retlsn = moveto;
+
+   ReplicationSlotMarkDirty();
+
+   /* We moved retart_lsn, update the global value. */
+   ReplicationSlotsComputeRequiredLSN();
I think that the proposed patch is missing a call to
ReplicationSlotsComputeRequiredXmin() here for physical slots.

So, I have been looking at this patch by myself, and updated it so as
the extra slot save is done only if any advancing has been done, on
top of the other computations that had better be around for
consistency.  The patch includes TAP tests for physical and logical
slots' durability across restarts.

Thoughts?
--
Michael
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index bb69683e2a..af3e114fc9 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -359,17 +359,20 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
  * checkpoints.
  */
 static XLogRecPtr
-pg_physical_replication_slot_advance(XLogRecPtr moveto)
+pg_physical_replication_slot_advance(XLogRecPtr moveto, bool *advance_done)
 {
 	XLogRecPtr	startlsn = MyReplicationSlot->data.restart_lsn;
 	XLogRecPtr	retlsn = startlsn;
 
+	*advance_done = false;
+
 	if (startlsn < moveto)
 	{
 		SpinLockAcquire(&MyReplicationSlot->mutex);
 		MyReplicationSlot->data.restart_lsn = moveto;
 		SpinLockRelease(&MyReplicationSlot->mutex);
 		retlsn = moveto;
+		*advance_done = true;
 	}
 
 	return retlsn;
@@ -387,13 +390,15 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
  * mode, no changes are generated anyway.
  */
 static XLogRecPtr
-pg_logical_replication_slot_advance(XLogRecPtr moveto)
+pg_logical_replication_slot_advance(XLogRecPtr moveto, bool *advance_done)
 {
 	LogicalDecodingContext *ctx;
 	ResourceOwner old_resowner = CurrentResourceOwner;
 	XLogRecPtr	startlsn;
 	XLogRecPtr	retlsn;
 
+	*advance_done = false;
+
 	PG_TRY();
 	{
 		/*
@@ -475,13 +480,16 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 			 * effort to save it for them.
 			 *
 			 * Dirty the slot so it's written out at the next checkpoint.
-			 * We'll still lose its position on crash, as documented, but it's
-			 * better than always losing the position even on clean restart.
+			 * We'll still lose its position on crash until slot advancing
+			 * is done, as documented in the logical decoding section of the
+			 * docs, but it's better than always losing the position even on
+			 * clean restart.
 			 */
 			ReplicationSlotMarkDirty();
 		}
 
 		retlsn = MyReplicationSlot->data.confirmed_flush;
+		*advance_done = true;
 
 		/* free context, call shutdown callback */
 		FreeDecodingContext(ctx);
@@ -515,6 +523,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 	bool		nulls[2];
 	HeapTuple	tuple;
 	Datum		result;
+	bool		advance_done;
 
 	Assert(!MyReplicationSlot);
 
@@ -566,15 +575,15 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 
 	/* Do the actual slot update, depending on the slot type */
 	if (OidIsValid(MyReplicationSlot->data.database))
-		endlsn = pg_logical_replication_slot_advance(moveto);
+		endlsn = pg_logical_replication_slot_advance(moveto, &advance_done);
 	else
-		endlsn = pg_physical_replication_slot_advance(moveto);
+		endlsn = pg_physical_replication_slot_advance(moveto, &advance_done);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
 	nulls[0] = false;
 
-	/* Update the on disk state when lsn was updated. */
-	if (XLogRecPtrIsInvalid(endlsn))
+	/* Update the on disk state when the slot position has been moved. */
+	if (advance_done)
 	{
 		ReplicationSlotMarkDirty();
 		ReplicationSlotsComputeRequiredXmin(false);
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index 3c743d7d7c..12b36bc824 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 32;
+use Test::More tests => 34;
 
 # Initialize master node
 my $node_master = get_new_node('master');
@@ -344,3 +344,25 @@ is($catalog_xmin, '',
 is($xmin, '', 'xmin of cascaded slot null with hs feedback reset');
 is($catalog_xmin, '',
 	'catalog xmin of cascaded slot still null with hs_feedback reset');
+
+# Test physical slot advancing and its durability.  Create a new slot on
+# the primary, not used by any of the standbys. This reserves WAL at creation.
+my $phys_slot = 'phys_slot';
+$node_master->safe_psql('postgres',
+	"SELECT pg_create_physical_replication_slot('$phys_slot', true);");
+$node_master->psql('postgres', "
+	CREATE TABLE tab_phys_slot (a int);
+	INSERT INTO tab_phys_slot VALUES (generate_series(1,10));");
+my $psql_rc = $node_master->psql('postgres',
+	"SELECT pg_replication_slot_advance('$phys_slot', 'FF/FFFFFFFF');");
+is($psql_rc, '0', 'slot advancing works with physical slot');
+my $phys_restart_lsn_pre = $node_master->safe_psql('postgres',
+    "SELECT restart_lsn from pg_replication_slots WHERE slot_name = '$phys_slot';");
+chomp($phys_restart_lsn_pre);
+# Slot advance should persist across restarts.
+$node_master->restart;
+my $phys_restart_lsn_post = $node_master->safe_psql('postgres',
+    "SELECT restart_lsn from pg_replication_slots WHERE slot_name = '$phys_slot';");
+chomp($phys_restart_lsn_post);
+ok(($phys_restart_lsn_pre cmp $phys_restart_lsn_post) == 0,
+	"physical slot advance persists across restarts");
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index c23cc4dda7..1dcc98b418 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -7,7 +7,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 10;
+use Test::More tests => 12;
 use Config;
 
 # Initialize master node
@@ -135,5 +135,26 @@ is($node_master->psql('postgres', 'DROP DATABASE otherdb'),
 is($node_master->slot('otherdb_slot')->{'slot_name'},
 	undef, 'logical slot was actually dropped with DB');
 
+# Test logical slot advancing and its durability.
+my $logical_slot = 'logical_slot';
+$node_master->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false);");
+$node_master->psql('postgres', "
+	CREATE TABLE tab_logical_slot (a int);
+	INSERT INTO tab_logical_slot VALUES (generate_series(1,10));");
+my $psql_rc = $node_master->psql('postgres',
+	"SELECT pg_replication_slot_advance('$logical_slot', 'FF/FFFFFFFF');");
+is($psql_rc, '0', 'slot advancing works with logical slot');
+my $logical_restart_lsn_pre = $node_master->safe_psql('postgres',
+    "SELECT restart_lsn from pg_replication_slots WHERE slot_name = '$logical_slot';");
+chomp($logical_restart_lsn_pre);
+# Slot advance should persists across restarts.
+$node_master->restart;
+my $logical_restart_lsn_post = $node_master->safe_psql('postgres',
+    "SELECT restart_lsn from pg_replication_slots WHERE slot_name = '$logical_slot';");
+chomp($logical_restart_lsn_post);
+ok(($logical_restart_lsn_pre cmp $logical_restart_lsn_post) == 0,
+	"logical slot advance persists across restarts");
+
 # done with the node
 $node_master->stop;

Attachment: signature.asc
Description: PGP signature

Reply via email to