On Tue, Jun 10, 2025 at 11:55 AM Zhijie Hou (Fujitsu)
<[email protected]> wrote:
>
> Here is the V35 patch set which includes the following changes:
>
Few minor comments:
===================
1.
+Â * Check if the subscriber's configuration is adequate to enable the
+Â * retain_conflict_info option.
I see some funny characters in patch 0003.
2.
+static void
+drop_conflict_slot_if_exists(void)
+{
+ /*
+ * Avoid the overhead of scanning shared memory for a replication slot
+ * that is known to have been dropped.
+ */
+ if (conflict_slot_dropped)
+ return;
This new variable used here looks odd to me. Do you think we can avoid this?
Apart from this, I have made a number of cosmetic changes in the
attached. Kindly include these in the next version, if these look okay
to you. Also, I think we can combine 0001 and 0002 at this stage, as
both are looking in good shape now.
--
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/launcher.c
b/src/backend/replication/logical/launcher.c
index 494b8de9ef9..74facd24a61 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1195,6 +1195,13 @@ ApplyLauncherMain(Datum main_arg)
if (!sub->enabled)
{
+ /*
+ * This is required to ensure that we don't
advance the xmin
+ * of CONFLICT_DETECTION_SLOT even if one of
the subscriptions
+ * is not enabled. Otherwise, we won't be able
to detect
+ * conflicts reliably for such a subscription
even though it
+ * has set the retain_conflict_info option.
+ */
compute_min_nonremovable_xid(NULL,
sub->retainconflictinfo,
&xmin, &can_advance_xmin);
continue;
@@ -1204,6 +1211,10 @@ ApplyLauncherMain(Datum main_arg)
w = logicalrep_worker_find(sub->oid, InvalidOid, false);
LWLockRelease(LogicalRepWorkerLock);
+ /*
+ * Compute the minimum xmin required to protect deleted
tuples
+ * required for conflict detection.
+ */
compute_min_nonremovable_xid(w,
sub->retainconflictinfo, &xmin,
&can_advance_xmin);
@@ -1243,9 +1254,10 @@ ApplyLauncherMain(Datum main_arg)
}
/*
- * Maintain the xmin value of the replication slot for conflict
- * detection if needed. Otherwise, drop the slot if we're no
longer
- * retaining information useful for conflict detection.
+ * Drop the CONFLICT_DETECTION_SLOT slot if there is no
subscription
+ * that requires us to retain the conflict information.
Otherwise, if
+ * required, advance the slot's xmin to protect deleted tuples
+ * required for the conflict detection.
*/
if (!retain_conflict_info)
drop_conflict_slot_if_exists();
@@ -1280,11 +1292,13 @@ ApplyLauncherMain(Datum main_arg)
}
/*
- * Compute the minimum non-removable transaction ID from all apply workers for
- * subscriptions with retain_conflict_info enabled. Store the result in *xmin.
+ * Determine the minimum non-removable transaction ID across all apply workers
+ * for subscriptions that have retain_conflict_info enabled. Store the result
+ * in *xmin.
*
- * If the slot cannot be advanced during this cycle, due to either a disabled
- * subscription or an inactive worker, *can_advance_xmin is set to false.
+ * If the replication slot cannot be advanced during this cycle, due to either
+ * a disabled subscription or an inactive worker, set *can_advance_xmin to
+ * false.
*/
static void
compute_min_nonremovable_xid(LogicalRepWorker *worker,
@@ -1299,8 +1313,8 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker,
TransactionId nonremovable_xid;
/*
- * Assume the replication slot for conflict detection is created
- * before the worker starts.
+ * The replication slot for conflict detection must be created
before
+ * the worker starts.
*/
Assert(MyReplicationSlot);
@@ -1321,21 +1335,23 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker,
else
{
/*
- * Create a replication slot to retain information (e.g., dead
tuples,
- * commit timestamps, and origins) useful for conflict
detection if
- * any subscription requests it.
+ * Create a replication slot to retain information necessary for
+ * conflict detection such as dead tuples, commit timestamps,
and
+ * origins if requested by any subscription.
*
* The slot is created before starting the apply worker to
prevent it
- * from unnecessarily maintaining its oldest_nonremovable_xid.
It is
- * created even for a disabled subscription to ensure
information is
- * available for detecting conflicts during the application of
remote
- * changes that occur before the subscription is enabled.
+ * from unnecessarily maintaining its oldest_nonremovable_xid.
+ *
+ * The slot is created even for a disabled subscription to
ensure that
+ * conflict-related information is available when applying
remote
+ * changes that occurred before the subscription was enabled.
*/
create_conflict_slot_if_not_exists();
/*
- * Only collect xmin when all workers for subscriptions with
- * retain_conflict_info enabled are running.
+ * Can't advance xmin of the slot unless all the subscriptions
with
+ * retain_conflict_info are enabled and the corresponding
workers are
+ * running.
*/
*can_advance_xmin = false;
}
@@ -1350,19 +1366,19 @@ create_conflict_slot_if_not_exists(void)
{
TransactionId xmin_horizon;
- /* Exit early if the replication slot is already created and acquired */
+ /* Exit early, if the replication slot is already created and acquired
*/
if (MyReplicationSlot)
return;
- /* If the replication slot exists, acquire it and exit */
+ /* If the replication slot exists, acquire it, and exit */
if (SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
{
ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
return;
}
- ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false,
- RS_PERSISTENT, false, false,
false);
+ ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT,
false,
+ false, false);
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
@@ -1385,23 +1401,21 @@ create_conflict_slot_if_not_exists(void)
}
/*
- * Attempt to advance the xmin value of the replication slot used to retain
- * information useful for conflict detection.
+ * Advance the xmin the replication slot used to retain information required
+ * for conflict detection.
*/
static void
advance_conflict_slot_xmin(TransactionId new_xmin)
{
Assert(MyReplicationSlot);
Assert(TransactionIdIsValid(new_xmin));
- Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin,
-
new_xmin));
+ Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin,
new_xmin));
SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->data.xmin = new_xmin;
SpinLockRelease(&MyReplicationSlot->mutex);
/* first write new xmin to disk, so we know what's up after a crash */
-
ReplicationSlotMarkDirty();
ReplicationSlotSave();
elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
@@ -1428,10 +1442,6 @@ advance_conflict_slot_xmin(TransactionId new_xmin)
static void
drop_conflict_slot_if_exists(void)
{
- /*
- * Avoid the overhead of scanning shared memory for a replication slot
- * that is known to have been dropped.
- */
if (conflict_slot_dropped)
return;