On Mon, Apr 1, 2024 at 6:26 AM Zhijie Hou (Fujitsu)
<houzj.f...@fujitsu.com> wrote:
>
> On Friday, March 29, 2024 2:50 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
> >
>
> >
> >
> > 2.
> > +extern XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto,
> > +   bool *found_consistent_point);
> > +
> >
> > This API looks a bit awkward as the functionality doesn't match the name. 
> > How
> > about having a function with name
> > LogicalSlotAdvanceAndCheckReadynessForDecoding(moveto,
> > ready_for_decoding) with the same functionality as your patch has for
> > pg_logical_replication_slot_advance() and then invoke it both from
> > pg_logical_replication_slot_advance and slotsync.c. The function name is too
> > big, we can think of a shorter name. Any ideas?
>
> How about LogicalSlotAdvanceAndCheckDecodingState() Or just
> LogicalSlotAdvanceAndCheckDecoding()?
>

It is about snapbuild state, so how about naming the function as
LogicalSlotAdvanceAndCheckSnapState()?

I have made quite a few cosmetic changes in comments and code. See
attached. This is atop your latest patch. Can you please review and
include these changes in the next version?

-- 
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/logical.c 
b/src/backend/replication/logical/logical.c
index bbc7cdaf50..7d5884789c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -518,10 +518,9 @@ CreateDecodingContext(XLogRecPtr start_lsn,
                                 errmsg("cannot use physical replication slot 
for logical decoding")));
 
        /*
-        * Do not allow decoding if the replication slot belongs to a different
-        * database unless we are in fast-forward mode. In fast-forward mode, we
-        * ignore storage-level changes and do not need to access the database
-        * object.
+        * We need to access the system tables during decoding to build the 
logical
+        * changes unless we are in fast-forward mode where no changes are
+        * generated.
         */
        if (slot->data.database != MyDatabaseId && !fast_forward)
                ereport(ERROR,
@@ -530,9 +529,9 @@ CreateDecodingContext(XLogRecPtr start_lsn,
                                                NameStr(slot->data.name))));
 
        /*
-        * Do not allow consumption of a "synchronized" slot until the standby 
gets
-        * promoted unless we are syncing replication slots, in which case we 
need
-        * to advance the LSN and xmin of the slot during decoding.
+        * The slots being synced from the primary can't be used for decoding as
+        * they are used after failover. However, we do allow advancing the LSNs
+        * during the synchronization of slots. See update_local_synced_slot.
         */
        if (RecoveryInProgress() && slot->data.synced && 
!IsSyncingReplicationSlots())
                ereport(ERROR,
@@ -2054,8 +2053,8 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
  * mode, no changes are generated anyway.
  *
- * *ready_for_decoding will be set to true if the logical decoding reaches
- * the consistent point; Otherwise, it will be set to false.
+ * *ready_for_decoding will be true if the initial decoding snapshot has
+ * been built; Otherwise, it will be false.
  */
 XLogRecPtr
 LogicalSlotAdvanceAndCheckReadynessForDecoding(XLogRecPtr moveto,
diff --git a/src/backend/replication/logical/slotsync.c 
b/src/backend/replication/logical/slotsync.c
index 886a9fcc7e..9f6b83d486 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -26,10 +26,13 @@
  * pg_sync_replication_slots() periodically to perform the syncs.
  *
  * If synchronized slots fail to build a consistent snapshot from the
- * restart_lsn, they would become unreliable after promotion due to potential
- * data loss from changes before reaching a consistent point. So, we mark such
- * slots as RS_TEMPORARY. Once they successfully reach the consistent point,
- * they will be marked to RS_PERSISTENT.
+ * restart_lsn before reaching confirmed_flush_lsn, they would become
+ * unreliable after promotion due to potential data loss from changes
+ * before reaching a consistent point. This can happen because the slots can
+ * be synced at some random time and we may not reach the consistent point
+ * at the same WAL location as the primary. So, we mark such slots as
+ * RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a
+ * consistent point, they will be marked as RS_PERSISTENT.
  *
  * The slot sync worker waits for some time before the next synchronization,
  * with the duration varying based on whether any slots were updated during
@@ -155,9 +158,9 @@ static void slotsync_failure_callback(int code, Datum arg);
  * If no update was needed (the data of the remote slot is the same as the
  * local slot) return false, otherwise true.
  *
- * If the LSN of the slot is modified, the ready_for_decoding will be set to
- * true if the slot can reach a consistent point; otherwise, it will be set to
- * false.
+ * *ready_for_decoding will be true iff the remote slot's LSN or xmin is
+ * modified, and decoding from the corresponding LSN's can reach a
+ * consistent snapshot.
  */
 static bool
 update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
@@ -168,10 +171,21 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid 
remote_dbid,
 
        Assert(slot->data.invalidated == RS_INVAL_NONE);
 
+       if (ready_for_decoding)
+               *ready_for_decoding = false;
+
        if (remote_slot->confirmed_lsn != slot->data.confirmed_flush ||
                remote_slot->restart_lsn != slot->data.restart_lsn ||
                remote_slot->catalog_xmin != slot->data.catalog_xmin)
        {
+               /*
+                * We can't directly copy the remote slot's LSN or xmin unless 
there
+                * exists a consistent snapshot at that point. Otherwise, after
+                * promotion, the slots may not reach a consistent point before 
the
+                * confirmed_flush_lsn which can lead to a data loss. To avoid 
data
+                * loss, we let slot machinery advance the slot which ensures 
that
+                * snapbuilder/slot statuses are updated properly.
+                */
                if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
                {
                        /*
@@ -191,15 +205,6 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid 
remote_dbid,
                }
                else
                {
-                       /*
-                        * By advancing the restart_lsn, confirmed_lsn, and 
xmin using
-                        * fast-forward logical decoding, we can verify whether 
a
-                        * consistent snapshot can be built. This process also 
involves
-                        * saving necessary snapshots to disk during decoding, 
ensuring
-                        * that logical decoding efficiently reaches a 
consistent point at
-                        * the restart_lsn without the potential loss of data 
during
-                        * snapshot creation.
-                        */
                        
LogicalSlotAdvanceAndCheckReadynessForDecoding(remote_slot->confirmed_lsn,
                                                                                
                                   ready_for_decoding);
                }
@@ -489,11 +494,11 @@ update_and_persist_local_synced_slot(RemoteSlot 
*remote_slot, Oid remote_dbid)
 
        /*
         * Don't persist the slot if it cannot reach the consistent point from 
the
-        * restart_lsn.
+        * restart_lsn. See comments atop this file.
         */
        if (!ready_for_decoding)
        {
-               elog(DEBUG1, "The synced slot could not find consistent point 
from %X/%X",
+               elog(DEBUG1, "could not find consistent point for synced slot; 
restart_lsn = %X/%X",
                         LSN_FORMAT_ARGS(slot->data.restart_lsn));
                return false;
        }

Reply via email to