On Tue, Jun 05, 2018 at 01:00:30PM +0200, Petr Jelinek wrote:
> I didn't say anything about CreateDecodingContext though. I am talking
> about the fact that we then use the same variable as input to
> XLogReadRecord later in the logical slot code path. The whole point of
> having restart_lsn for logical slot is to have correct start point for
> XLogReadRecord so using the confirmed_flush there is wrong (and has been
> wrong since the original commit).

OK, I finally see the point you are coming at after a couple of hours
brainstorming about it, and indeed using confirmed_lsn is logically
incorrect so the current code is inconsistent with what
DecodingContextFindStartpoint() does, so we rely on restart_lsn to scan
for all the records and the decoder state is here to make sure that the
slot's confirmed_lsn is updated to a consistent position.  I have added
your feedback in the attached (indented and such), which results in some
simplifications with a couple of routines.

I am attaching as well the patch I sent yesterday.  0001 is candidate
for a back-patch, 0002 is for HEAD to fix the slot advance stuff.

There is another open item related to slot advancing here:
https://www.postgresql.org/message-id/2840048a-1184-417a-9da8-3299d207a1d7%40postgrespro.ru
And per my tests the patch is solving this item as well.  I have just
used the test mentioned in the thread which:
1) creates a slot
2) does some activity to generate a couple of WAL pages
3) advances the slot at page boundary
4) Moves again the slot.
This test crashes on HEAD at step 4, and not with the attached.

What do you think?
--
Michael
From 0cd02e359cbf5b74c8231f6619bc479a314213bc Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Fri, 1 Jun 2018 14:30:55 -0400
Subject: [PATCH 1/2] Fix and document lock handling for in-memory replication
 slot data

While debugging issues on HEAD for the new slot forwarding feature of
Postgres 11, some monitoring of the code surrounding in-memory slot data
has proved that the lock handling may cause inconsistent data to be read
by read-only callers of slot functions, particularly
pg_get_replication_slots() which fetches data for the system view
pg_replication_slots.

The code paths involved in those problems concern logical decoding
initialization (down to 9.4) and WAL reservation for slots (new as of
10).

A set of comments documenting all the lock handlings, particularly the
dependency with LW locks for slots and the in_use flag as well as the
internal mutex lock is added, based on a suggested by Simon Riggs.

Backpatch down to 9.4, where WAL decoding has been introduced.

Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9g...@mail.gmail.com
---
 src/backend/replication/logical/logical.c | 13 +++++++++----
 src/backend/replication/slot.c            |  4 ++++
 src/include/replication/slot.h            | 13 +++++++++++++
 3 files changed, 26 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1393591538..61588d626f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin,
 
 	xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
 
+	SpinLockAcquire(&slot->mutex);
 	slot->effective_catalog_xmin = xmin_horizon;
 	slot->data.catalog_xmin = xmin_horizon;
 	if (need_full_snapshot)
 		slot->effective_xmin = xmin_horizon;
+	SpinLockRelease(&slot->mutex);
 
 	ReplicationSlotsComputeRequiredXmin(true);
 
@@ -445,13 +447,14 @@ void
 DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 {
 	XLogRecPtr	startptr;
+	ReplicationSlot *slot = ctx->slot;
 
 	/* Initialize from where to start reading WAL. */
-	startptr = ctx->slot->data.restart_lsn;
+	startptr = slot->data.restart_lsn;
 
 	elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
-		 (uint32) (ctx->slot->data.restart_lsn >> 32),
-		 (uint32) ctx->slot->data.restart_lsn);
+		 (uint32) (slot->data.restart_lsn >> 32),
+		 (uint32) slot->data.restart_lsn);
 
 	/* Wait for a consistent starting point */
 	for (;;)
@@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 		CHECK_FOR_INTERRUPTS();
 	}
 
-	ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+	SpinLockAcquire(&slot->mutex);
+	slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+	SpinLockRelease(&slot->mutex);
 }
 
 /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 056628fe8e..79d7a57d67 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void)
 			XLogRecPtr	flushptr;
 
 			/* start at current insert position */
+			SpinLockAcquire(&slot->mutex);
 			slot->data.restart_lsn = GetXLogInsertRecPtr();
+			SpinLockRelease(&slot->mutex);
 
 			/* make sure we have enough information to start */
 			flushptr = LogStandbySnapshot();
@@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void)
 		}
 		else
 		{
+			SpinLockAcquire(&slot->mutex);
 			slot->data.restart_lsn = GetRedoRecPtr();
+			SpinLockRelease(&slot->mutex);
 		}
 
 		/* prevent WAL removal as fast as possible */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 76a88c6de7..2af2a14994 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -86,6 +86,19 @@ typedef struct ReplicationSlotPersistentData
 
 /*
  * Shared memory state of a single replication slot.
+ *
+ * The in-memory data of replication slots follows a locking model based
+ * on two linked concepts:
+ * - A replication slot's in_use is switched when added or discarded using
+ * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
+ * mode when updating the flag by the backend owning the slot and doing the
+ * operation, while readers (concurrent backends not owning the slot) need
+ * to hold it in shared mode when looking at replication slot data.
+ * - Individual fields are protected by mutex where only the backend owning
+ * the slot is authorized to update the fields from its own slot.  The
+ * backend owning the slot does not need to take this lock when reading its
+ * own fields, while concurrent backends not owning this slot should take the
+ * lock when reading this slot's data.
  */
 typedef struct ReplicationSlot
 {
-- 
2.17.1

From 98ffc5a3705f3cf99f958c696541896825fb2a01 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Fri, 1 Jun 2018 14:37:50 -0400
Subject: [PATCH 2/2] Fix a couple of bugs with replication slot advancing
 feature

A review of the code has showed up a couple of issues fixed by this
commit:
- Physical slots have been using the confirmed LSN position as a start
comparison point which is always 0/0, instead use the restart LSN
position (logical slots need to use the confirmed LSN position, which
was correct).
- The actual slot update was incorrect for both physical and logical
slots.  Physical slots need to use their restart_lsn as base comparison
point (confirmed_flush was used because of previous point), and logical
slots need to begin reading WAL from restart_lsn (confirmed_flush was
used as well), while confirmed_flush is compiled depending on the
decoding context and record read.
- Never return 0/0 is a slot cannot be advanced.  This way, if a slot is
advanced while the activity is idle, then the same position is returned
by to caller over and over without raising an error.  Instead return the
LSN the slot has been advanced to.  With repetitive calls, the same
position is returned.

Note that as the slot is owned by the backend advancing it, then the
read of those field is fine lockless, while updates need to happen while
the slot mutex is held, so fix that on the way.

Author: Michael Paquier
Reviewed-by: Peter Jelinek, Simon Riggs

Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9g...@mail.gmail.com
Discussion: https://www.postgresql.org/message-id/2840048a-1184-417a-9da8-3299d207a1d7%40postgrespro.ru
---
 src/backend/replication/slotfuncs.c | 50 +++++++++++++++++++++--------
 1 file changed, 36 insertions(+), 14 deletions(-)

diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9e10263bb..63cada0786 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -318,32 +318,43 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 /*
  * Helper function for advancing physical replication slot forward.
+ * The LSN position to move to is compared simply to the slot's
+ * restart_lsn, knowing that any position older than that would be
+ * removed by successive checkpoints.
  */
 static XLogRecPtr
-pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
+pg_physical_replication_slot_advance(XLogRecPtr moveto)
 {
-	XLogRecPtr	retlsn = InvalidXLogRecPtr;
+	XLogRecPtr	startlsn = MyReplicationSlot->data.restart_lsn;
+	XLogRecPtr	retlsn = startlsn;
 
-	SpinLockAcquire(&MyReplicationSlot->mutex);
-	if (MyReplicationSlot->data.restart_lsn < moveto)
+	if (startlsn < moveto)
 	{
+		SpinLockAcquire(&MyReplicationSlot->mutex);
 		MyReplicationSlot->data.restart_lsn = moveto;
+		SpinLockRelease(&MyReplicationSlot->mutex);
 		retlsn = moveto;
 	}
-	SpinLockRelease(&MyReplicationSlot->mutex);
 
 	return retlsn;
 }
 
 /*
  * Helper function for advancing logical replication slot forward.
+ * The slot's restart_lsn is used as start point for reading records,
+ * while confirmed_lsn is used as base point for the decoding context.
+ * The LSN position to move to is checked by doing a per-record scan and
+ * logical decoding which makes sure that confirmed_lsn is updated to a
+ * LSN which allows the future slot consumer to get consistent logical
+ * changes.
  */
 static XLogRecPtr
-pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
+pg_logical_replication_slot_advance(XLogRecPtr moveto)
 {
 	LogicalDecodingContext *ctx;
 	ResourceOwner old_resowner = CurrentResourceOwner;
-	XLogRecPtr	retlsn = InvalidXLogRecPtr;
+	XLogRecPtr	startlsn = MyReplicationSlot->data.restart_lsn;
+	XLogRecPtr	retlsn = startlsn;
 
 	PG_TRY();
 	{
@@ -384,7 +395,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
 			if (record != NULL)
 				LogicalDecodingProcessRecord(ctx, ctx->reader);
 
-			/* check limits */
+			/* Stop once the moving point wanted by caller has been reached */
 			if (moveto <= ctx->reader->EndRecPtr)
 				break;
 
@@ -441,7 +452,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 	Name		slotname = PG_GETARG_NAME(0);
 	XLogRecPtr	moveto = PG_GETARG_LSN(1);
 	XLogRecPtr	endlsn;
-	XLogRecPtr	startlsn;
+	XLogRecPtr	minlsn;
 	TupleDesc	tupdesc;
 	Datum		values[2];
 	bool		nulls[2];
@@ -472,21 +483,32 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 	/* Acquire the slot so we "own" it */
 	ReplicationSlotAcquire(NameStr(*slotname), true);
 
-	startlsn = MyReplicationSlot->data.confirmed_flush;
-	if (moveto < startlsn)
+	/*
+	 * Check if the slot is not moving backwards.  Physical slots rely simply
+	 * on restart_lsn as a minimum point, while logical slots have confirmed
+	 * consumption up to confirmed_lsn, meaning that in both cases data older
+	 * than that is not available anymore.
+	 */
+	if (OidIsValid(MyReplicationSlot->data.database))
+		minlsn = MyReplicationSlot->data.confirmed_flush;
+	else
+		minlsn = MyReplicationSlot->data.restart_lsn;
+
+	if (moveto < minlsn)
 	{
 		ReplicationSlotRelease();
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("cannot move slot to %X/%X, minimum is %X/%X",
 						(uint32) (moveto >> 32), (uint32) moveto,
-						(uint32) (startlsn >> 32), (uint32) startlsn)));
+						(uint32) (minlsn >> 32), (uint32) minlsn)));
 	}
 
+	/* Do the actual slot update, depending on the slot type */
 	if (OidIsValid(MyReplicationSlot->data.database))
-		endlsn = pg_logical_replication_slot_advance(startlsn, moveto);
+		endlsn = pg_logical_replication_slot_advance(moveto);
 	else
-		endlsn = pg_physical_replication_slot_advance(startlsn, moveto);
+		endlsn = pg_physical_replication_slot_advance(moveto);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
 	nulls[0] = false;
-- 
2.17.1

Attachment: signature.asc
Description: PGP signature

Reply via email to