On Tue, 5 Sept 2023 at 09:10, Dilip Kumar <dilipbal...@gmail.com> wrote:
>
> On Fri, Sep 1, 2023 at 12:16 PM vignesh C <vignes...@gmail.com> wrote:
> >
> > On Fri, 1 Sept 2023 at 10:06, Amit Kapila <amit.kapil...@gmail.com> wrote:
> > >
> > > On Thu, Aug 31, 2023 at 6:12 PM Ashutosh Bapat
> > > <ashutosh.bapat....@gmail.com> wrote:
> > > >
> > > > On Thu, Aug 31, 2023 at 2:52 PM Amit Kapila <amit.kapil...@gmail.com> 
> > > > wrote:
> > > > >
> > > > > All but one. Normally, the idea of marking dirty is to indicate that
> > > > > we will actually write/flush the contents at a later point (except
> > > > > when required for correctness) as even indicated in the comments atop
> > > > > ReplicatioinSlotMarkDirty(). However, I see your point that we use
> > > > > that protocol at all the current places including CreateSlotOnDisk().
> > > > > So, we can probably do it here as well.
> > > >
> > > > yes
> > > >
> > >
> > > I think we should also ensure that slots are not invalidated
> > > (slot.data.invalidated != RS_INVAL_NONE) before marking them dirty
> > > because we don't allow decoding from such slots, so we shouldn't
> > > include those.
> >
> > Added this check.
> >
> > Apart from this I have also fixed the following issues that were
> > agreed on: a) Setting slots to dirty in CheckPointReplicationSlots
> > instead of setting it in SaveSlotToPath b) The comments were moved
> > from ReplicationSlot and moved to CheckPointReplicationSlots c) Tests
> > will be run in autovacuum = off d) updating last_saved_confirmed_flush
> > based on cp.slotdata.confirmed_flush rather than
> > slot->data.confirmed_flush.
> > I have also added the commitfest entry for this at [1].
>
> The overall idea looks fine to me
>
> +
> + /*
> + * We won't ensure that the slot is persisted after the
> + * confirmed_flush LSN is updated as that could lead to frequent
> + * writes.  However, we need to ensure that we do persist the slots at
> + * the time of shutdown whose confirmed_flush LSN is changed since we
> + * last saved the slot to disk. This will help in avoiding retreat of
> + * the confirmed_flush LSN after restart.
> + */
> + if (is_shutdown && SlotIsLogical(s))
> + {
> + SpinLockAcquire(&s->mutex);
> + if (s->data.invalidated == RS_INVAL_NONE &&
> + s->data.confirmed_flush != s->last_saved_confirmed_flush)
> + s->dirty = true;
> + SpinLockRelease(&s->mutex);
> + }
>
> The comments don't mention anything about why we are just flushing at
> the shutdown checkpoint.  I mean the checkpoint is not that frequent
> and we already perform a lot of I/O during checkpoints so isn't it
> wise to flush during every checkpoint.  We may argue that there is no
> extra advantage of that as we are not optimizing for crash recovery
> but OTOH there is no reason for not doing so for other checkpoints or
> we are worried about the concurrency with parallel walsender running
> during non shutdown checkpoint if so then better we explain that as
> well?  If it is already discussed in the thread and we have a
> conclusion on this then maybe we can mention this in comments?

I felt it is better to do this only during the shutdown checkpoint as
in other cases it is being saved  periodically as and when the
replication happens. Added comments for the same.

> /*
>   * Flush all replication slots to disk.
>   *
> - * This needn't actually be part of a checkpoint, but it's a convenient
> - * location.
> + * is_shutdown is true in case of a shutdown checkpoint.
>   */
>  void
> -CheckPointReplicationSlots(void)
> +CheckPointReplicationSlots(bool is_shutdown)
>
> It seems we have removed two lines from the function header comments,
> is this intentional or accidental?

Modified.
The updated v8 version patch has the changes for the same.

Regards,
Vignesh
From 1e498fb2f13cebe49e8e187c664caf71da2aed5e Mon Sep 17 00:00:00 2001
From: Julien Rouhaud <julien.rouh...@free.fr>
Date: Fri, 14 Apr 2023 13:49:09 +0800
Subject: [PATCH v8] Persist logical slots to disk during a shutdown checkpoint
 if required.

It's entirely possible for a logical slot to have a confirmed_flush LSN
higher than the last value saved on disk while not being marked as dirty.
Currently, it is not a major problem but a later patch adding support for
the upgrade of slots relies on that value being properly persisted to disk.

It can also help avoid processing the same transactions again in some
boundary cases after the clean shutdown and restart. Say, we process some
transactions for which we didn't send anything downstream (the changes got
filtered) but the confirm_flush LSN is updated due to keepalives. As we
don't flush the latest value of confirm_flush LSN, it may lead to
processing the same changes again without this patch.

Author: Vignesh C, Julien Rouhaud, Kuroda Hayato based on suggestions by
Ashutosh Bapat
Reviewed-by: Amit Kapila, Peter Smith, Ashutosh Bapat
Discussion: http://postgr.es/m/CAA4eK1JzJagMmb_E8D4au=gyqkxox0afnbm1fbp7sy7t4yw...@mail.gmail.com
Discussion: http://postgr.es/m/tyapr01mb58664c81887b3af2eb6b16e3f5...@tyapr01mb5866.jpnprd01.prod.outlook.com
---
 src/backend/access/transam/xlog.c             |   2 +-
 src/backend/replication/slot.c                |  34 +++++-
 src/include/replication/slot.h                |   5 +-
 src/test/recovery/meson.build                 |   1 +
 .../t/038_save_logical_slots_shutdown.pl      | 102 ++++++++++++++++++
 5 files changed, 139 insertions(+), 5 deletions(-)
 create mode 100644 src/test/recovery/t/038_save_logical_slots_shutdown.pl

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f6f8adc72a..f26c8d18a6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7039,7 +7039,7 @@ static void
 CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 {
 	CheckPointRelationMap();
-	CheckPointReplicationSlots();
+	CheckPointReplicationSlots(flags & CHECKPOINT_IS_SHUTDOWN);
 	CheckPointSnapBuild();
 	CheckPointLogicalRewriteHeap();
 	CheckPointReplicationOrigin();
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index bb09c4010f..6559a61753 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -321,6 +321,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->candidate_xmin_lsn = InvalidXLogRecPtr;
 	slot->candidate_restart_valid = InvalidXLogRecPtr;
 	slot->candidate_restart_lsn = InvalidXLogRecPtr;
+	slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
 
 	/*
 	 * Create the slot on disk.  We haven't actually marked the slot allocated
@@ -1573,10 +1574,10 @@ restart:
  * Flush all replication slots to disk.
  *
  * This needn't actually be part of a checkpoint, but it's a convenient
- * location.
+ * location. is_shutdown is true in case of a shutdown checkpoint.
  */
 void
-CheckPointReplicationSlots(void)
+CheckPointReplicationSlots(bool is_shutdown)
 {
 	int			i;
 
@@ -1601,6 +1602,31 @@ CheckPointReplicationSlots(void)
 
 		/* save the slot to disk, locking is handled in SaveSlotToPath() */
 		sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
+
+		/*
+		 * We won't ensure that the slot is persisted after the
+		 * confirmed_flush LSN is updated as that could lead to frequent
+		 * writes.  However, we need to ensure that we do persist the slots at
+		 * the time of shutdown whose confirmed_flush LSN is changed since we
+		 * last saved the slot to disk. This will help in avoiding retreat of
+		 * the confirmed_flush LSN after restart. At other times, the walsender
+		 * keeps saving the slot from time to time as the replication
+		 * progresses, so there is no clear advantage of flushing additional
+		 * slots at the time of checkpoint.
+		 */
+		if (is_shutdown && SlotIsLogical(s))
+		{
+			SpinLockAcquire(&s->mutex);
+			if (s->data.invalidated == RS_INVAL_NONE &&
+				s->data.confirmed_flush != s->last_saved_confirmed_flush)
+			{
+				s->just_dirtied = true;
+				s->dirty = true;
+			}
+
+			SpinLockRelease(&s->mutex);
+		}
+
 		SaveSlotToPath(s, path, LOG);
 	}
 	LWLockRelease(ReplicationSlotAllocationLock);
@@ -1873,11 +1899,12 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
 
 	/*
 	 * Successfully wrote, unset dirty bit, unless somebody dirtied again
-	 * already.
+	 * already and remember the confirmed_flush LSN value.
 	 */
 	SpinLockAcquire(&slot->mutex);
 	if (!slot->just_dirtied)
 		slot->dirty = false;
+	slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
 	SpinLockRelease(&slot->mutex);
 
 	LWLockRelease(&slot->io_in_progress_lock);
@@ -2074,6 +2101,7 @@ RestoreSlotFromDisk(const char *name)
 		/* initialize in memory state */
 		slot->effective_xmin = cp.slotdata.xmin;
 		slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
+		slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
 
 		slot->candidate_catalog_xmin = InvalidTransactionId;
 		slot->candidate_xmin_lsn = InvalidXLogRecPtr;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a8a89dc784..da8978342a 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -178,6 +178,9 @@ typedef struct ReplicationSlot
 	XLogRecPtr	candidate_xmin_lsn;
 	XLogRecPtr	candidate_restart_valid;
 	XLogRecPtr	candidate_restart_lsn;
+
+	/* This is used to track the last saved confirmed_flush LSN value */
+	XLogRecPtr	last_saved_confirmed_flush;
 } ReplicationSlot;
 
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
@@ -241,7 +244,7 @@ extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslo
 extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
 
 extern void StartupReplicationSlots(void);
-extern void CheckPointReplicationSlots(void);
+extern void CheckPointReplicationSlots(bool is_shutdown);
 
 extern void CheckSlotRequirements(void);
 extern void CheckSlotPermissions(void);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index e7328e4894..646d6ffde4 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -43,6 +43,7 @@ tests += {
       't/035_standby_logical_decoding.pl',
       't/036_truncated_dropped.pl',
       't/037_invalid_database.pl',
+      't/038_save_logical_slots_shutdown.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/038_save_logical_slots_shutdown.pl b/src/test/recovery/t/038_save_logical_slots_shutdown.pl
new file mode 100644
index 0000000000..224a840a61
--- /dev/null
+++ b/src/test/recovery/t/038_save_logical_slots_shutdown.pl
@@ -0,0 +1,102 @@
+
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Test logical replication slots are always persisted to disk during a shutdown
+# checkpoint.
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+sub compare_confirmed_flush
+{
+	my ($node, $confirmed_flush_from_log) = @_;
+
+	# Fetch Latest checkpoint location from the control file
+	my ($stdout, $stderr) =
+	  run_command([ 'pg_controldata', $node->data_dir ]);
+	my @control_data      = split("\n", $stdout);
+	my $latest_checkpoint = undef;
+	foreach (@control_data)
+	{
+		if ($_ =~ /^Latest checkpoint location:\s*(.*)$/mg)
+		{
+			$latest_checkpoint = $1;
+			last;
+		}
+	}
+	die "Latest checkpoint location not found in control file\n"
+	  unless defined($latest_checkpoint);
+
+	# Is it same as the value read from log?
+	ok( $latest_checkpoint eq $confirmed_flush_from_log,
+		"Check that the slot's confirmed_flush LSN is the same as the latest_checkpoint location"
+	);
+
+	return;
+}
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('pub');
+$node_publisher->init(allows_streaming => 'logical');
+# Avoid checkpoint during the test, otherwise, the latest checkpoint location
+# will change.
+$node_publisher->append_conf(
+	'postgresql.conf', q{
+checkpoint_timeout = 1h
+autovacuum = off
+});
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('sub');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create tables
+$node_publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+
+# Insert some data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tbl VALUES (generate_series(1, 5));");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub FOR ALL TABLES");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub"
+);
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub');
+
+my $result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl");
+
+is($result, qq(5), "check initial copy was done");
+
+my $offset = -s $node_publisher->logfile;
+
+# Restart the publisher to ensure that the slot will be persisted if required
+$node_publisher->restart();
+
+# Wait until the walsender creates decoding context
+$node_publisher->wait_for_log(
+	qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./,
+	$offset);
+
+# Extract confirmed_flush from the logfile
+my $log_contents = slurp_file($node_publisher->logfile, $offset);
+$log_contents =~
+  qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./
+  or die "could not get confirmed_flush_lsn";
+
+# Ensure that the slot's confirmed_flush LSN is the same as the
+# latest_checkpoint location.
+compare_confirmed_flush($node_publisher, $1);
+
+done_testing();
-- 
2.34.1

Reply via email to