On 11/21/24 14:59, Tomas Vondra wrote:
> 
> ...
>
> But then there's the SQL API - pg_logical_slot_get_changes(). And it
> turns out it ends up syncing the slot to disk pretty often, because for
> RUNNING_XACTS we call LogicalDecodingProcessRecord() + standby_decode(),
> which ends up calling SaveSlotToDisk(). And at the end we call
> LogicalConfirmReceivedLocation() for good measure, which saves the slot
> too, just to be sure.
> 
> FWIW I suspect this still is not perfectly safe, because we may still
> crash / restart before the updated data.restart_lsn makes it to disk,
> but after it was already used to remove old WAL, although that's
> probably harder to hit. With streaming the subscriber will still send us
> the new startpoint, so that should not fail I think. But with the SQL
> API we probably can get into the "segment already removed" issues.
> 
> I haven't tried reproducing this yet, I guess it should be possible
> using the injection points. Not sure when I get to this, though.
> 
I kept pulling on this loose thread, and the deeper I look the more I'm
concvinced ReplicationSlotsComputeRequiredLSN() is fundamentally unsafe.
I may be missing something, of course, in which case I'd be grateful if
someone could correct me.

I believe the main problem is that ReplicationSlotsComputeRequiredLSN()
operates on data that may not be on-disk yet. It just iterates over
slots in shared memory, looks at the data.restart_lsn, and rolls with
that. So some of the data may be lost after a crash or "immediate"
restart, which for restart_lsn means it can move backwards by some
unknown amount.

Unfortunately, some of the callers use the value as if it was durable,
and do irreversible actions based on it. This whole thread is about
checkpointer using the value to discard WAL supposedly not required by
any slot, only to find out we're missing WAL.

That seems like a rather fundamental problem, and the only reason why we
don't see this causing trouble more often is that (a) abrupt restarts
are not very common, (b) most slots are likely not lagging very much,
and thus not in danger of actually losing WAL, and (c) the streaming
replication tracks startpoint, which masks the issue.

But with the SQL API it's quite simple to cause issues with the right
timing, as I'll show in a bit.

There's an interesting difference in how different places update the
slot. For example LogicalConfirmReceivedLocation() does this:

1) update slot->data.restart_lsn
2) mark slot dirty: ReplicationSlotMarkDirty()
3) save slot to disk: ReplicationSlotSave()
4) recalculate required LSN: ReplicationSlotsComputeRequiredLSN()

while pg_replication_slot_advance() does only this:

1) update slot->data.restart_lsn
2) mark slot dirty: ReplicationSlotMarkDirty()
3) recalculate required LSN: ReplicationSlotsComputeRequiredLSN()

That is, it doesn't save the slot to disk. It just updates the LSN and
them proceeds to recalculate the "required LSN" for all slots. That
makes is very easy to hit the issue, as demonstrated by Vitaly.

However, it doesn't mean LogicalConfirmReceivedLocation() is safe. It
would be safe without concurrency, but it can happen that the logical
decoding does (1) and maybe (2), but before the slot gets persisted,
some other session gets to call ReplicationSlotsComputeRequiredLSN().

It might be logical decoding on another slot, or advance of a physical
slot. I haven't checked what else can trigger that.

So ultimately logical slots have exactly the same issue.

Attached are two patches, demonstrating the issue. 0001 adds injection
points into two places - before (2) in LogicalConfirmReceivedLocation,
and before removal of old WAL in a checkpoint. 0002 then adds a simple
TAP test triggering the issue in pg_logical_slot_get_changes(), leading to:

  ERROR:  requested WAL segment pg_wal/000000010000000000000001 has
          already been removed

The same issue could be demonstrated on a physical slot - it would
actually be simpler, I think.


I've been unable to cause issues for streaming replication (both
physical and logical), because the subscriber sends startpoint which
adjusts the restart_lsn to a "good" value. But I'm not sure if that's
reliable in all cases, or if the replication could break too.


It's entirely possible this behavior is common knowledge, but it was a
surprise for me. Even if the streaming replication is safe, it does seem
to make using the SQL functions less reliable (not that it doesn't have
other challenges, e.g. with Ctrl-C). But maybe it could be made safer?

I don't have a great idea how to improve this. It seems wrong for
ReplicationSlotsComputeRequiredLSN() to calculate the LSN using values
from dirty slots, so maybe it should simply retry if any slot is dirty?
Or retry on that one slot? But various places update the restart_lsn
before marking the slot as dirty, so right now this won't work.


regards

-- 
Tomas Vondra
From 79a045728b09237234f23130a2a710e1bdde7870 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@vondra.me>
Date: Thu, 21 Nov 2024 23:07:22 +0100
Subject: [PATCH 2/2] TAP test

---
 src/test/modules/test_required_lsn/Makefile   |  18 +++
 .../modules/test_required_lsn/meson.build     |  15 +++
 .../test_required_lsn/t/001_logical_slot.pl   | 126 ++++++++++++++++++
 3 files changed, 159 insertions(+)
 create mode 100644 src/test/modules/test_required_lsn/Makefile
 create mode 100644 src/test/modules/test_required_lsn/meson.build
 create mode 100644 src/test/modules/test_required_lsn/t/001_logical_slot.pl

diff --git a/src/test/modules/test_required_lsn/Makefile b/src/test/modules/test_required_lsn/Makefile
new file mode 100644
index 00000000000..3eb2b02d38f
--- /dev/null
+++ b/src/test/modules/test_required_lsn/Makefile
@@ -0,0 +1,18 @@
+# src/test/modules/test_required_lsn/Makefile
+
+EXTRA_INSTALL=src/test/modules/injection_points \
+	contrib/test_decoding
+
+export enable_injection_points
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_required_lsn
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_required_lsn/meson.build b/src/test/modules/test_required_lsn/meson.build
new file mode 100644
index 00000000000..99ef3a60a4e
--- /dev/null
+++ b/src/test/modules/test_required_lsn/meson.build
@@ -0,0 +1,15 @@
+# Copyright (c) 2022-2024, PostgreSQL Global Development Group
+
+tests += {
+  'name': 'test_required_lsn',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'env': {
+       'enable_injection_points': get_option('injection_points') ? 'yes' : 'no',
+    },
+    'tests': [
+      't/001_logical_replication.pl'
+    ],
+  },
+}
diff --git a/src/test/modules/test_required_lsn/t/001_logical_slot.pl b/src/test/modules/test_required_lsn/t/001_logical_slot.pl
new file mode 100644
index 00000000000..41261f4aa6b
--- /dev/null
+++ b/src/test/modules/test_required_lsn/t/001_logical_slot.pl
@@ -0,0 +1,126 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# This test verifies edge case of reading a multixact:
+# when we have multixact that is followed by exactly one another multixact,
+# and another multixact have no offset yet, we must wait until this offset
+# becomes observable. Previously we used to wait for 1ms in a loop in this
+# case, but now we use CV for this. This test is exercising such a sleep.
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+my ($node, $result);
+
+$node = PostgreSQL::Test::Cluster->new('mike');
+$node->init;
+$node->append_conf('postgresql.conf',
+	"shared_preload_libraries = 'injection_points'");
+$node->append_conf('postgresql.conf',
+	"wal_level = 'logical'");
+$node->start;
+$node->safe_psql('postgres', q(CREATE EXTENSION injection_points));
+
+# create a simple table to generate data into
+$node->safe_psql('postgres',
+	q{create table t (id serial primary key, b text)});
+
+# create the two slots we'll need
+$node->safe_psql('postgres',
+	q{select pg_create_logical_replication_slot('slot_logical', 'test_decoding')});
+$node->safe_psql('postgres',
+	q{select pg_create_physical_replication_slot('slot_physical', true)});
+
+# advance both to current position, just to have everything "valid"
+$node->safe_psql('postgres',
+	q{select count(*) from pg_logical_slot_get_changes('slot_logical', null, null)});
+$node->safe_psql('postgres',
+	q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())});
+
+# run checkpoint, to flush current state to disk and set a baseline
+$node->safe_psql('postgres', q{checkpoint});
+
+# generate transactions to get RUNNING_XACTS
+my $xacts = $node->background_psql('postgres');
+$xacts->query_until(qr/run_xacts/,
+q(\echo run_xacts
+SELECT 1 \watch 0.1
+\q
+));
+
+# insert 2M rows, that's about 260MB (~20 segments) worth of WAL
+$node->safe_psql('postgres',
+	q{insert into t (b) select md5(i::text) from generate_series(1,1000000) s(i)});
+
+# run another checkpoint, to set a new restore LSN
+$node->safe_psql('postgres', q{checkpoint});
+
+# another 2M rows, that's about 260MB (~20 segments) worth of WAL
+$node->safe_psql('postgres',
+	q{insert into t (b) select md5(i::text) from generate_series(1,1000000) s(i)});
+
+# run another checkpoint, this time in the background, and make it wait
+# on the injection point), so that the checkpoint stops right before
+# removing old WAL segments
+print('starting checkpoint\n');
+
+my $checkpoint = $node->background_psql('postgres');
+$checkpoint->query_safe(q(select injection_points_attach('checkpoint-before-old-wal-removal','wait')));
+$checkpoint->query_until(qr/starting_checkpoint/,
+q(\echo starting_checkpoint
+checkpoint;
+\q
+));
+
+print('waiting for injection_point\n');
+# wait until the checkpoint stops right before removing WAL segments
+$node->wait_for_event('checkpointer', 'checkpoint-before-old-wal-removal');
+
+
+# try to advance the logical slot, but make it stop when it moves to the
+# next WAL segment (has to happen in the background too)
+my $logical = $node->background_psql('postgres');
+$logical->query_safe(q{select injection_points_attach('logical-replication-slot-advance-segment','wait');});
+$logical->query_until(qr/get_changes/,
+q(
+\echo get_changes
+select count(*) from pg_logical_slot_get_changes('slot_logical', null, null) \watch 1
+\q
+));
+
+
+# wait until the checkpoint stops right before removing WAL segments
+$node->wait_for_event('client backend', 'logical-replication-slot-advance-segment');
+
+
+# OK, we're in the right situation,  time to advance the physical slot,
+# which recalculates the required LSN, and then unblock the checkpoint,
+# which removes the WAL still needed by the logical slot
+$node->safe_psql('postgres',
+	q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())});
+
+$node->safe_psql('postgres',
+	q{select injection_points_wakeup('checkpoint-before-old-wal-removal')});
+
+# abruptly stop the server (1 second should be enough for the checkpoint
+# to finish, would be better )
+$node->stop('immediate');
+
+$node->start;
+
+$node->safe_psql('postgres', q{select count(*) from pg_logical_slot_get_changes('slot_logical', null, null);});
+
+$node->stop;
+
+# If we reached this point - everything is OK.
+ok(1);
+done_testing();
-- 
2.47.0

From eef5f02a5c22ccc520c20623d70eaf093a039f09 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@vondra.me>
Date: Thu, 21 Nov 2024 20:37:00 +0100
Subject: [PATCH 1/2] injection points

---
 src/backend/access/transam/xlog.c         |  4 ++++
 src/backend/replication/logical/logical.c | 18 ++++++++++++++++++
 2 files changed, 22 insertions(+)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6f58412bcab..8f9629866c3 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7310,6 +7310,10 @@ CreateCheckPoint(int flags)
 	if (PriorRedoPtr != InvalidXLogRecPtr)
 		UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
 
+#ifdef USE_INJECTION_POINTS
+	INJECTION_POINT("checkpoint-before-old-wal-removal");
+#endif
+
 	/*
 	 * Delete old log files, those no longer needed for last checkpoint to
 	 * prevent the disk holding the xlog from growing full.
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index e941bb491d8..569c1925ecc 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -30,6 +30,7 @@
 
 #include "access/xact.h"
 #include "access/xlogutils.h"
+#include "access/xlog_internal.h"
 #include "fmgr.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -41,6 +42,7 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
+#include "utils/injection_point.h"
 #include "utils/inval.h"
 #include "utils/memutils.h"
 
@@ -1844,9 +1846,13 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 	{
 		bool		updated_xmin = false;
 		bool		updated_restart = false;
+		XLogRecPtr	restart_lsn;
 
 		SpinLockAcquire(&MyReplicationSlot->mutex);
 
+		/* remember the old restart lsn */
+		restart_lsn = MyReplicationSlot->data.restart_lsn;
+
 		MyReplicationSlot->data.confirmed_flush = lsn;
 
 		/* if we're past the location required for bumping xmin, do so */
@@ -1888,6 +1894,18 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		/* first write new xmin to disk, so we know what's up after a crash */
 		if (updated_xmin || updated_restart)
 		{
+#ifdef USE_INJECTION_POINTS
+			XLogSegNo	seg1,
+						seg2;
+
+			XLByteToSeg(restart_lsn, seg1, wal_segment_size);
+			XLByteToSeg(MyReplicationSlot->data.restart_lsn, seg2, wal_segment_size);
+
+			/* trigger injection point, but only if segment changes */
+			if (seg1 != seg2)
+				INJECTION_POINT("logical-replication-slot-advance-segment");
+#endif
+
 			ReplicationSlotMarkDirty();
 			ReplicationSlotSave();
 			elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
-- 
2.47.0

Reply via email to