On Mon, Apr 1, 2024 at 6:26 AM Zhijie Hou (Fujitsu)
<[email protected]> wrote:
>
> On Friday, March 29, 2024 2:50 PM Amit Kapila <[email protected]> wrote:
> >
>
> >
> >
> > 2.
> > +extern XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto,
> > + bool *found_consistent_point);
> > +
> >
> > This API looks a bit awkward as the functionality doesn't match the name.
> > How
> > about having a function with name
> > LogicalSlotAdvanceAndCheckReadynessForDecoding(moveto,
> > ready_for_decoding) with the same functionality as your patch has for
> > pg_logical_replication_slot_advance() and then invoke it both from
> > pg_logical_replication_slot_advance and slotsync.c. The function name is too
> > big, we can think of a shorter name. Any ideas?
>
> How about LogicalSlotAdvanceAndCheckDecodingState() Or just
> LogicalSlotAdvanceAndCheckDecoding()?
>
It is about snapbuild state, so how about naming the function as
LogicalSlotAdvanceAndCheckSnapState()?
I have made quite a few cosmetic changes in comments and code. See
attached. This is atop your latest patch. Can you please review and
include these changes in the next version?
--
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/logical.c
b/src/backend/replication/logical/logical.c
index bbc7cdaf50..7d5884789c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -518,10 +518,9 @@ CreateDecodingContext(XLogRecPtr start_lsn,
errmsg("cannot use physical replication slot
for logical decoding")));
/*
- * Do not allow decoding if the replication slot belongs to a different
- * database unless we are in fast-forward mode. In fast-forward mode, we
- * ignore storage-level changes and do not need to access the database
- * object.
+ * We need to access the system tables during decoding to build the
logical
+ * changes unless we are in fast-forward mode where no changes are
+ * generated.
*/
if (slot->data.database != MyDatabaseId && !fast_forward)
ereport(ERROR,
@@ -530,9 +529,9 @@ CreateDecodingContext(XLogRecPtr start_lsn,
NameStr(slot->data.name))));
/*
- * Do not allow consumption of a "synchronized" slot until the standby
gets
- * promoted unless we are syncing replication slots, in which case we
need
- * to advance the LSN and xmin of the slot during decoding.
+ * The slots being synced from the primary can't be used for decoding as
+ * they are used after failover. However, we do allow advancing the LSNs
+ * during the synchronization of slots. See update_local_synced_slot.
*/
if (RecoveryInProgress() && slot->data.synced &&
!IsSyncingReplicationSlots())
ereport(ERROR,
@@ -2054,8 +2053,8 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
* WAL and removal of old catalog tuples. As decoding is done in fast_forward
* mode, no changes are generated anyway.
*
- * *ready_for_decoding will be set to true if the logical decoding reaches
- * the consistent point; Otherwise, it will be set to false.
+ * *ready_for_decoding will be true if the initial decoding snapshot has
+ * been built; Otherwise, it will be false.
*/
XLogRecPtr
LogicalSlotAdvanceAndCheckReadynessForDecoding(XLogRecPtr moveto,
diff --git a/src/backend/replication/logical/slotsync.c
b/src/backend/replication/logical/slotsync.c
index 886a9fcc7e..9f6b83d486 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -26,10 +26,13 @@
* pg_sync_replication_slots() periodically to perform the syncs.
*
* If synchronized slots fail to build a consistent snapshot from the
- * restart_lsn, they would become unreliable after promotion due to potential
- * data loss from changes before reaching a consistent point. So, we mark such
- * slots as RS_TEMPORARY. Once they successfully reach the consistent point,
- * they will be marked to RS_PERSISTENT.
+ * restart_lsn before reaching confirmed_flush_lsn, they would become
+ * unreliable after promotion due to potential data loss from changes
+ * before reaching a consistent point. This can happen because the slots can
+ * be synced at some random time and we may not reach the consistent point
+ * at the same WAL location as the primary. So, we mark such slots as
+ * RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a
+ * consistent point, they will be marked as RS_PERSISTENT.
*
* The slot sync worker waits for some time before the next synchronization,
* with the duration varying based on whether any slots were updated during
@@ -155,9 +158,9 @@ static void slotsync_failure_callback(int code, Datum arg);
* If no update was needed (the data of the remote slot is the same as the
* local slot) return false, otherwise true.
*
- * If the LSN of the slot is modified, the ready_for_decoding will be set to
- * true if the slot can reach a consistent point; otherwise, it will be set to
- * false.
+ * *ready_for_decoding will be true iff the remote slot's LSN or xmin is
+ * modified, and decoding from the corresponding LSN's can reach a
+ * consistent snapshot.
*/
static bool
update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
@@ -168,10 +171,21 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid
remote_dbid,
Assert(slot->data.invalidated == RS_INVAL_NONE);
+ if (ready_for_decoding)
+ *ready_for_decoding = false;
+
if (remote_slot->confirmed_lsn != slot->data.confirmed_flush ||
remote_slot->restart_lsn != slot->data.restart_lsn ||
remote_slot->catalog_xmin != slot->data.catalog_xmin)
{
+ /*
+ * We can't directly copy the remote slot's LSN or xmin unless
there
+ * exists a consistent snapshot at that point. Otherwise, after
+ * promotion, the slots may not reach a consistent point before
the
+ * confirmed_flush_lsn which can lead to a data loss. To avoid
data
+ * loss, we let slot machinery advance the slot which ensures
that
+ * snapbuilder/slot statuses are updated properly.
+ */
if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
{
/*
@@ -191,15 +205,6 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid
remote_dbid,
}
else
{
- /*
- * By advancing the restart_lsn, confirmed_lsn, and
xmin using
- * fast-forward logical decoding, we can verify whether
a
- * consistent snapshot can be built. This process also
involves
- * saving necessary snapshots to disk during decoding,
ensuring
- * that logical decoding efficiently reaches a
consistent point at
- * the restart_lsn without the potential loss of data
during
- * snapshot creation.
- */
LogicalSlotAdvanceAndCheckReadynessForDecoding(remote_slot->confirmed_lsn,
ready_for_decoding);
}
@@ -489,11 +494,11 @@ update_and_persist_local_synced_slot(RemoteSlot
*remote_slot, Oid remote_dbid)
/*
* Don't persist the slot if it cannot reach the consistent point from
the
- * restart_lsn.
+ * restart_lsn. See comments atop this file.
*/
if (!ready_for_decoding)
{
- elog(DEBUG1, "The synced slot could not find consistent point
from %X/%X",
+ elog(DEBUG1, "could not find consistent point for synced slot;
restart_lsn = %X/%X",
LSN_FORMAT_ARGS(slot->data.restart_lsn));
return false;
}