On Thu, Aug 28, 2025 at 4:39 PM Amit Kapila <[email protected]> wrote:
>
> On Thu, Aug 28, 2025 at 8:02 AM Zhijie Hou (Fujitsu)
> <[email protected]> wrote:
> >
> > I noticed that Cfbot failed to compile the document due to a typo after
> > renaming
> > the subscription option. Here are the updated V67 patches to fix that, only
> > the doc
> > in 0001 is modified.
> >
>
> I have made a number of cosmetic and comment changes in the attached
> atop 0001 patch. Kindly include in next version, if you think they are
> good to include.
>
Sorry, by mistake, I attached the wrong file. Please find the correct one now.
--
With Regards,
Amit Kapila.
diff --git a/doc/src/sgml/ref/create_subscription.sgml
b/doc/src/sgml/ref/create_subscription.sgml
index 52ac1183e44..8cf90eda787 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -526,12 +526,11 @@ CREATE SUBSCRIPTION <replaceable
class="parameter">subscription_name</replaceabl
<term><literal>max_retention_duration</literal>
(<type>integer</type>)</term>
<listitem>
<para>
- Maximum duration for which this subscription's apply worker is
allowed
- to retain the information useful for conflict detection when
- <literal>retain_dead_tuples</literal> is enabled for the associated
- subscriptions. The default value is <literal>0</literal>, indicating
- that the information is retained until it is no longer needed for
- detection purposes. This value is taken as milliseconds.
+ Maximum duration in milliseconds for which this subscription's apply
worker
+ is allowed to retain the information useful for conflict detection
when
+ <literal>retain_dead_tuples</literal> is enabled. The default value
+ is <literal>0</literal>, indicating that the information is retained
+ until it is no longer needed for detection purposes.
</para>
<para>
The information useful for conflict detection is no longer retained
if
@@ -541,12 +540,13 @@ CREATE SUBSCRIPTION <replaceable
class="parameter">subscription_name</replaceabl
<literal>max_retention_duration</literal> set within the
corresponding subscription. To re-enable retention manually, you can
disable <literal>retain_dead_tuples</literal> for all subscriptions
and
- re-enable it after confirming this replication slot has been dropped.
+ re-enable it after confirming replication slot
+ <quote><literal>pg_conflict_detection</literal></quote> has been
dropped.
</para>
<para>
- Note that overall retention will not stop if other subscriptions
- specify a greater value and have not exceeded it, or if they set this
- option to 0.
+ Note that overall retention will not stop if other subscriptions that
+ have a value greater than 0 for this parameter have not exceeded it,
+ or if they set this option to 0.
</para>
<para>
This option is effective only when
diff --git a/src/backend/catalog/system_views.sql
b/src/backend/catalog/system_views.sql
index fbd3d1900f4..c77fa0234bb 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1389,7 +1389,7 @@ REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
subbinary, substream, subtwophasestate, subdisableonerr,
subpasswordrequired, subrunasowner, subfailover,
- subretaindeadtuples, submaxretention,
subretentionactive,
+ subretaindeadtuples, submaxretention, subretentionactive,
subslotname, subsynccommit, subpublications, suborigin)
ON pg_subscription TO public;
diff --git a/src/backend/commands/subscriptioncmds.c
b/src/backend/commands/subscriptioncmds.c
index f9d92702434..aeb0c413067 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -662,8 +662,8 @@ CreateSubscription(ParseState *pstate,
CreateSubscriptionStmt *stmt,
}
/*
- * Ensure that the configurations for retain_dead_tuples and
- * max_retention_duration is appropriate.
+ * Ensure that system configuration paramters are set appropriately to
+ * support retain_dead_tuples and max_retention_duration.
*/
CheckSubDeadTupleRetention(true, !opts.enabled, WARNING,
opts.retaindeadtuples, opts.retaindeadtuples,
@@ -1490,8 +1490,8 @@ AlterSubscription(ParseState *pstate,
AlterSubscriptionStmt *stmt,
}
/*
- * Ensure that the configurations for
retain_dead_tuples and
- * max_retention_duration is appropriate.
+ * Ensure that system configuration paramters
are set appropriately to
+ * support retain_dead_tuples and
max_retention_duration.
*/
if (IsSet(opts.specified_opts,
SUBOPT_RETAIN_DEAD_TUPLES) ||
IsSet(opts.specified_opts,
SUBOPT_MAX_RETENTION_DURATION))
diff --git a/src/backend/replication/logical/launcher.c
b/src/backend/replication/logical/launcher.c
index c0211867881..969d48a1d5d 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -23,7 +23,6 @@
#include "access/tableam.h"
#include "access/xact.h"
#include "catalog/pg_subscription.h"
-#include "catalog/pg_subscription_d.h"
#include "catalog/pg_subscription_rel.h"
#include "funcapi.h"
#include "lib/dshash.h"
@@ -1258,7 +1257,7 @@ ApplyLauncherMain(Datum main_arg)
/*
* Compute the minimum xmin required to protect
dead tuples
* required for conflict detection among all
running apply
- * workers that enables retain_dead_tuples.
+ * workers.
*/
if (sub->retaindeadtuples &&
sub->retentionactive &&
@@ -1328,11 +1327,10 @@ ApplyLauncherMain(Datum main_arg)
* advance the slot's xmin to protect dead tuples required for
the
* conflict detection.
*
- * However, if all apply workers for subscriptions with
- * retain_dead_tuples enabled have requested to cease retention,
- * marking it as inactive, the new xmin will be set to
- * InvalidTransactionId. We then update slot.xmin accordingly to
- * permit the removal of dead tuples.
+ * Additionally, if all apply workers for subscriptions with
+ * retain_dead_tuples enabled have requested to stop retention,
+ * the slot's xmin will be set to InvalidTransactionId allowing
the
+ * removal of dead tuples.
*/
if (MyReplicationSlot)
{
diff --git a/src/backend/replication/logical/worker.c
b/src/backend/replication/logical/worker.c
index d378eb08c71..df16cbe3716 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -173,6 +173,14 @@
* Advance the non-removable transaction ID if the current flush location has
* reached or surpassed the last received WAL position.
*
+ * - RDT_STOP_CONFLICT_INFO_RETENTION:
+ * This phase is required only when max_retention_duration is defined. We
+ * enter this phase if the wait time in either the
+ * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
+ * configured max_retention_duration. In this phase,
+ * pg_subscription.subretentionactive is updated to false within a new
+ * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
+ *
* The overall state progression is: GET_CANDIDATE_XID ->
* REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
* REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
@@ -190,16 +198,6 @@
* update_deleted is necessary, as the UPDATEs in remote transactions should be
* ignored if their timestamp is earlier than that of the dead tuples.
*
- * If max_retention_duration is defined, one additional phase is
- * involved:
- *
- * - RDT_STOP_CONFLICT_INFO_RETENTION:
- * This phase is triggered when the wait time in either the
- * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
- * max_retention_duration. During this phase,
- * pg_subscription.subretentionactive is updated to false within a new
- * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
- *
* Note that advancing the non-removable transaction ID is not supported if the
* publisher is also a physical standby. This is because the logical walsender
* on the standby can only get the WAL replay position but there may be more
@@ -569,9 +567,9 @@ static void request_publisher_status(RetainDeadTuplesData
*rdt_data);
static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
bool
status_received);
static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
+static bool should_stop_conflict_info_retention(RetainDeadTuplesData*
rdt_data);
static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data);
-static bool should_stop_conflict_info_retention(RetainDeadTuplesData
*rdt_data);
static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
bool new_xid_found);
@@ -3247,16 +3245,16 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid
localidxoid,
/*
* For conflict detection, we use the leader worker's
* oldest_nonremovable_xid value instead of invoking
- * GetOldestNonRemovableTransactionId() or relying on the conflict
- * detection slot's xmin. The oldest_nonremovable_xid acts as a
threshold
- * to identify tuples that were recently deleted. These tuples are not
- * visible to concurrent transactions, but we log an update_deleted
- * conflict if such a tuple matches the remote update being applied.
+ * GetOldestNonRemovableTransactionId() or using the conflict detection
+ * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
identify
+ * tuples that were recently deleted. These deleted tuples are no longer
+ * visible to concurrent transactions. However, if a remote update
matches
+ * such a tuple, we log an update_deleted conflict.
*
- * Although GetOldestNonRemovableTransactionId() and slot.xmin can
return
- * a value older than the oldest_nonremovable_xid, for our current
purpose
- * it is acceptable to treat tuples deleted by transactions prior to
- * oldest_nonremovable_xid as update_missing conflicts.
+ * While GetOldestNonRemovableTransactionId() and slot.xmin may return
+ * transaction IDs older than oldest_nonremovable_xid, for our current
+ * purpose, it is acceptable to treat tuples deleted by transactions
prior
+ * to oldest_nonremovable_xid as update_missing conflicts.
*/
if (am_leader_apply_worker())
{
@@ -4669,6 +4667,55 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
process_rdt_phase_transition(rdt_data, false);
}
+/*
+ * Check whether conflict information retention should be stopped due to
+ * exceeding the maximum wait time (max_retention_duration).
+ *
+ * If retention should be stopped, transition to the
+ * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return
+ * false.
+ *
+ * Note: Retention won't be resumed automatically. The user must manually
+ * disable retain_dead_tuples and re-enable it after confirming that the
+ * replication slot maintained by the launcher has been dropped.
+ */
+static bool
+should_stop_conflict_info_retention(RetainDeadTuplesData* rdt_data)
+{
+ TimestampTz now;
+
+ Assert(TransactionIdIsValid(rdt_data->candidate_xid));
+ Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS ||
+ rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH);
+
+ if (!MySubscription->maxretention)
+ return false;
+
+ /*
+ * Use last_recv_time when applying changes in the loop to avoid
+ * unnecessary system time retrieval. If last_recv_time is not
available,
+ * obtain the current timestamp.
+ */
+ now = rdt_data->last_recv_time ? rdt_data->last_recv_time :
GetCurrentTimestamp();
+
+ /*
+ * Return early if the wait time has not exceeded the configured maximum
+ * (max_retention_duration). Time spent waiting for table
synchronization
+ * is excluded from this calculation, as it occurs infrequently.
+ */
+ if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
+
MySubscription->maxretention +
+
rdt_data->table_sync_wait_time))
+ return false;
+
+ rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
+
+ /* process the next phase */
+ process_rdt_phase_transition(rdt_data, false);
+
+ return true;
+}
+
/*
* Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase.
*/
@@ -4732,67 +4779,22 @@ reset_retention_data_fields(RetainDeadTuplesData
*rdt_data)
rdt_data->table_sync_wait_time = 0;
}
-/*
- * Check whether conflict information retention should be stopped due to
- * exceeding the maximum wait time (max_retention_duration).
- *
- * If retention should be stopped, transition to the
- * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return
- * false.
- *
- * Note: Retention won't be resumed automatically. The user must manually
- * disable retain_dead_tuples and re-enable it after confirming that the
- * replication slot maintained by the launcher has been dropped.
- */
-static bool
-should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
-{
- TimestampTz now;
-
- Assert(TransactionIdIsValid(rdt_data->candidate_xid));
- Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS ||
- rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH);
-
- if (!MySubscription->maxretention)
- return false;
-
- /*
- * Use last_recv_time when applying changes in the loop to avoid
- * unnecessary system time retrieval. If last_recv_time is not
available,
- * obtain the current timestamp.
- */
- now = rdt_data->last_recv_time ? rdt_data->last_recv_time :
GetCurrentTimestamp();
-
- /*
- * Return early if the wait time has not exceeded the configured maximum
- * (max_retention_duration). Time spent waiting for table
synchronization
- * is excluded from this calculation, as it occurs infrequently.
- */
- if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
-
MySubscription->maxretention +
-
rdt_data->table_sync_wait_time))
- return false;
-
- rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
-
- /* process the next phase */
- process_rdt_phase_transition(rdt_data, false);
-
- return true;
-}
-
/*
* Adjust the interval for advancing non-removable transaction IDs.
*
- * We double the interval to try advancing the non-removable transaction IDs if
- * there is no activity on the node. The maximum value of the interval is
capped
- * by wal_receiver_status_interval if it is not zero, otherwise to a 3 minutes
- * which should be sufficient to avoid using CPU or network resources without
- * much benefit. However, this maximum interval will not exceed
- * max_retention_duration.
+ * If there is no activity on the node, we progressively double the interval
+ * used to advance non-removable transaction ID. This helps conserve CPU
+ * and network resources when there's little benefit to frequent updates.
+ *
+ * The interval is capped by the lowest of the following:
+ * - wal_receiver_status_interval (if set),
+ * - a default maximum of 3 minutes,
+ * - max_retention_duration.
*
- * The interval is reset to the lesser of 100ms and
- * max_retention_duration once there is some activity on the node.
+ * This ensures the interval never exceeds the retention boundary, even if
+ * other limits are higher. Once activity resumes on the node, the interval
+ * is reset to lesser of 100ms and max_retention_duration, allowing timely
+ * advancement of non-removable transaction ID.
*
* XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
* consider the other interval or a separate GUC if the need arises.