Attached is a patch to mark a logical replication slot as dirty if its confirmed lsn is changed.
Aesthetically I'm not sure if it's better to do it per this patch and call ReplicationSlotMarkDirty after we release the spinlock, add a new ReplicationSlotMarkDirtyLocked() that skips the spinlock acquisition, or add a bool is_locked arg to ReplicationSlotMarkDirty and update all call sites. All will have the same result. I've confirmed this works as expected as part of the failover slots test suite but it'd be pretty seriously cumbersome to extract. If anyone feels strongly about it I can add a test that shows that - start master - create slot - do some stuff - replay from slot - fast-stop master - start master - replay from slot doesn't see the same data a second time, but if we repeat it using immediate stop it will see the same data when replaying again. Also attached is another patch to amend the docs to reflect the fact that a slot can actually replay the same change more than once. I avoided the strong temptation to update other parts of the docs nearby. Both these are fixes that should IMO be applied to 9.6. -- Craig Ringer http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From e2bda9fa26f9d92059490d2b9ea7c37ae45f81b1 Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Wed, 16 Mar 2016 15:12:34 +0800 Subject: [PATCH] Dirty replication slots when confirm_lsn is changed --- src/backend/replication/logical/logical.c | 62 +++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 20 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2c7b749..d3fb1a5 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -440,6 +440,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) } ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr; + ReplicationSlotMarkDirty(); } /* @@ -850,10 +851,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) { bool updated_xmin = false; bool updated_restart = false; + bool updated_confirm = false; SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.confirmed_flush = lsn; + if (MyReplicationSlot->data.confirmed_flush != lsn) + { + MyReplicationSlot->data.confirmed_flush = lsn; + updated_confirm = true; + } /* if were past the location required for bumping xmin, do so */ if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr && @@ -891,34 +897,50 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); - /* first write new xmin to disk, so we know whats up after a crash */ - if (updated_xmin || updated_restart) + if (updated_xmin || updated_restart || updated_confirm) { ReplicationSlotMarkDirty(); - ReplicationSlotSave(); - elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart); - } - /* - * Now the new xmin is safely on disk, we can let the global value - * advance. We do not take ProcArrayLock or similar since we only - * advance xmin here and there's not much harm done by a concurrent - * computation missing that. - */ - if (updated_xmin) - { - SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin; - SpinLockRelease(&MyReplicationSlot->mutex); + /* + * first write new xmin to disk, so we know whats up + * after a crash. + */ + if (updated_xmin || updated_restart) + { + ReplicationSlotSave(); + elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart); + } - ReplicationSlotsUpdateRequiredXmin(false); - ReplicationSlotsUpdateRequiredLSN(); + /* + * Now the new xmin is safely on disk, we can let the global value + * advance. We do not take ProcArrayLock or similar since we only + * advance xmin here and there's not much harm done by a concurrent + * computation missing that. + */ + if (updated_xmin) + { + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotsUpdateRequiredXmin(false); + ReplicationSlotsUpdateRequiredLSN(); + } } } else { + bool dirtied = false; + SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.confirmed_flush = lsn; + if (MyReplicationSlot->data.confirmed_flush != lsn) + { + MyReplicationSlot->data.confirmed_flush = lsn; + dirtied = true; + } SpinLockRelease(&MyReplicationSlot->mutex); + + if (dirtied) + ReplicationSlotMarkDirty(); } } -- 2.1.0
From 1f8d9319ef9c934c414cebf1f5223c3b3023bf7f Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Wed, 16 Mar 2016 15:45:16 +0800 Subject: [PATCH 2/2] Correct incorrect claim that slots output a change "exactly once" Replication slots may actually emit a change more than once if the master crashes before flushing the slot. See http://www.postgresql.org/message-id/camsr+ygsatrgqpcx9qx4eocizwsa27xjkeipsotaje8ofix...@mail.gmail.com for details. --- doc/src/sgml/logicaldecoding.sgml | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index c7b43ed..a971918 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -12,7 +12,6 @@ <para> Changes are sent out in streams identified by logical replication slots. - Each stream outputs each change exactly once. </para> <para> @@ -204,8 +203,7 @@ $ pg_recvlogical -d postgres --slot test --drop-slot In the context of logical replication, a slot represents a stream of changes that can be replayed to a client in the order they were made on the origin server. Each slot streams a sequence of changes from a single - database, sending each change exactly once (except when peeking forward - in the stream). + database. </para> <note> @@ -218,7 +216,17 @@ $ pg_recvlogical -d postgres --slot test --drop-slot <para> A replication slot has an identifier that is unique across all databases in a <productname>PostgreSQL</productname> cluster. Slots persist - independently of the connection using them and are crash-safe. + independently of the connection using them. Slot creation and drop is + crash-safe, and slots will never be corrupted by a crash. + </para> + + <para> + A logical slot outputs each database change at least once. A slot will + usually only emit a change once, but recently-sent changes may be sent + again if the server server crashes and restarts. Clients should remember + the last LSN they saw when decoding and skip over any repeated data or + (when using the replication protocol) request that decoding start from + that LSN rather than letting the server determine the start point. </para> <para> -- 2.1.0
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers