On Tue, Jun 10, 2025 at 11:55 AM Zhijie Hou (Fujitsu)
<houzj.f...@fujitsu.com> wrote:
>
> Here is the V35 patch set which includes the following changes:
>

Few minor comments:
===================
1.
+Â * Check if the subscriber's configuration is adequate to enable the
+Â * retain_conflict_info option.

I see some funny characters in patch 0003.

2.
+static void
+drop_conflict_slot_if_exists(void)
+{
+ /*
+ * Avoid the overhead of scanning shared memory for a replication slot
+ * that is known to have been dropped.
+ */
+ if (conflict_slot_dropped)
+ return;

This new variable used here looks odd to me. Do you think we can avoid this?

Apart from this, I have made a number of cosmetic changes in the
attached. Kindly include these in the next version, if these look okay
to you. Also, I think we can combine 0001 and 0002 at this stage, as
both are looking in good shape now.

-- 
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 494b8de9ef9..74facd24a61 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1195,6 +1195,13 @@ ApplyLauncherMain(Datum main_arg)
 
                        if (!sub->enabled)
                        {
+                               /*
+                                * This is required to ensure that we don't 
advance the xmin
+                                * of CONFLICT_DETECTION_SLOT even if one of 
the subscriptions
+                                * is not enabled. Otherwise, we won't be able 
to detect
+                                * conflicts reliably for such a subscription 
even though it
+                                * has set the retain_conflict_info option.
+                                */
                                compute_min_nonremovable_xid(NULL, 
sub->retainconflictinfo,
                                                                                
         &xmin, &can_advance_xmin);
                                continue;
@@ -1204,6 +1211,10 @@ ApplyLauncherMain(Datum main_arg)
                        w = logicalrep_worker_find(sub->oid, InvalidOid, false);
                        LWLockRelease(LogicalRepWorkerLock);
 
+                       /*
+                        * Compute the minimum xmin required to protect deleted 
tuples
+                        * required for conflict detection.
+                        */
                        compute_min_nonremovable_xid(w, 
sub->retainconflictinfo, &xmin,
                                                                                
 &can_advance_xmin);
 
@@ -1243,9 +1254,10 @@ ApplyLauncherMain(Datum main_arg)
                }
 
                /*
-                * Maintain the xmin value of the replication slot for conflict
-                * detection if needed. Otherwise, drop the slot if we're no 
longer
-                * retaining information useful for conflict detection.
+                * Drop the CONFLICT_DETECTION_SLOT slot if there is no 
subscription
+                * that requires us to retain the conflict information. 
Otherwise, if
+                * required, advance the slot's xmin to protect deleted tuples
+                * required for the conflict detection.
                 */
                if (!retain_conflict_info)
                        drop_conflict_slot_if_exists();
@@ -1280,11 +1292,13 @@ ApplyLauncherMain(Datum main_arg)
 }
 
 /*
- * Compute the minimum non-removable transaction ID from all apply workers for
- * subscriptions with retain_conflict_info enabled. Store the result in *xmin.
+ * Determine the minimum non-removable transaction ID across all apply workers
+ * for subscriptions that have retain_conflict_info enabled. Store the result
+ * in *xmin.
  *
- * If the slot cannot be advanced during this cycle, due to either a disabled
- * subscription or an inactive worker, *can_advance_xmin is set to false.
+ * If the replication slot cannot be advanced during this cycle, due to either
+ * a disabled subscription or an inactive worker, set *can_advance_xmin to
+ * false.
  */
 static void
 compute_min_nonremovable_xid(LogicalRepWorker *worker,
@@ -1299,8 +1313,8 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker,
                TransactionId nonremovable_xid;
 
                /*
-                * Assume the replication slot for conflict detection is created
-                * before the worker starts.
+                * The replication slot for conflict detection must be created 
before
+                * the worker starts.
                 */
                Assert(MyReplicationSlot);
 
@@ -1321,21 +1335,23 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker,
        else
        {
                /*
-                * Create a replication slot to retain information (e.g., dead 
tuples,
-                * commit timestamps, and origins) useful for conflict 
detection if
-                * any subscription requests it.
+                * Create a replication slot to retain information necessary for
+                * conflict detection such as dead tuples, commit timestamps, 
and
+                * origins if requested by any subscription.
                 *
                 * The slot is created before starting the apply worker to 
prevent it
-                * from unnecessarily maintaining its oldest_nonremovable_xid. 
It is
-                * created even for a disabled subscription to ensure 
information is
-                * available for detecting conflicts during the application of 
remote
-                * changes that occur before the subscription is enabled.
+                * from unnecessarily maintaining its oldest_nonremovable_xid.
+                *
+                * The slot is created even for a disabled subscription to 
ensure that
+                * conflict-related information is available when applying 
remote
+                * changes that occurred before the subscription was enabled.
                 */
                create_conflict_slot_if_not_exists();
 
                /*
-                * Only collect xmin when all workers for subscriptions with
-                * retain_conflict_info enabled are running.
+                * Can't advance xmin of the slot unless all the subscriptions 
with
+                * retain_conflict_info are enabled and the corresponding 
workers are
+                * running.
                 */
                *can_advance_xmin = false;
        }
@@ -1350,19 +1366,19 @@ create_conflict_slot_if_not_exists(void)
 {
        TransactionId xmin_horizon;
 
-       /* Exit early if the replication slot is already created and acquired */
+       /* Exit early, if the replication slot is already created and acquired 
*/
        if (MyReplicationSlot)
                return;
 
-       /* If the replication slot exists, acquire it and exit */
+       /* If the replication slot exists, acquire it, and exit */
        if (SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
        {
                ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
                return;
        }
 
-       ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false,
-                                                 RS_PERSISTENT, false, false, 
false);
+       ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, 
false,
+                                                 false, false);
 
        LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
@@ -1385,23 +1401,21 @@ create_conflict_slot_if_not_exists(void)
 }
 
 /*
- * Attempt to advance the xmin value of the replication slot used to retain
- * information useful for conflict detection.
+ * Advance the xmin the replication slot used to retain information required
+ * for conflict detection.
  */
 static void
 advance_conflict_slot_xmin(TransactionId new_xmin)
 {
        Assert(MyReplicationSlot);
        Assert(TransactionIdIsValid(new_xmin));
-       Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin,
-                                                                               
 new_xmin));
+       Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, 
new_xmin));
 
        SpinLockAcquire(&MyReplicationSlot->mutex);
        MyReplicationSlot->data.xmin = new_xmin;
        SpinLockRelease(&MyReplicationSlot->mutex);
 
        /* first write new xmin to disk, so we know what's up after a crash */
-
        ReplicationSlotMarkDirty();
        ReplicationSlotSave();
        elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
@@ -1428,10 +1442,6 @@ advance_conflict_slot_xmin(TransactionId new_xmin)
 static void
 drop_conflict_slot_if_exists(void)
 {
-       /*
-        * Avoid the overhead of scanning shared memory for a replication slot
-        * that is known to have been dropped.
-        */
        if (conflict_slot_dropped)
                return;
 

Reply via email to