On Thu, Aug 28, 2025 at 4:39 PM Amit Kapila <[email protected]> wrote:
>
> On Thu, Aug 28, 2025 at 8:02 AM Zhijie Hou (Fujitsu)
> <[email protected]> wrote:
> >
> > I noticed that Cfbot failed to compile the document due to a typo after 
> > renaming
> > the subscription option. Here are the updated V67 patches to fix that, only 
> > the doc
> > in 0001 is modified.
> >
>
> I have made a number of cosmetic and comment changes in the attached
> atop 0001 patch. Kindly include in next version, if you think they are
> good to include.
>

Sorry, by mistake, I attached the wrong file. Please find the correct one now.

-- 
With Regards,
Amit Kapila.
diff --git a/doc/src/sgml/ref/create_subscription.sgml 
b/doc/src/sgml/ref/create_subscription.sgml
index 52ac1183e44..8cf90eda787 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -526,12 +526,11 @@ CREATE SUBSCRIPTION <replaceable 
class="parameter">subscription_name</replaceabl
         <term><literal>max_retention_duration</literal> 
(<type>integer</type>)</term>
         <listitem>
          <para>
-          Maximum duration for which this subscription's apply worker is 
allowed
-          to retain the information useful for conflict detection when
-          <literal>retain_dead_tuples</literal> is enabled for the associated
-          subscriptions. The default value is <literal>0</literal>, indicating
-          that the information is retained until it is no longer needed for
-          detection purposes. This value is taken as milliseconds.
+          Maximum duration in milliseconds for which this subscription's apply 
worker
+          is allowed to retain the information useful for conflict detection 
when
+          <literal>retain_dead_tuples</literal> is enabled. The default value
+          is <literal>0</literal>, indicating that the information is retained
+          until it is no longer needed for detection purposes.
          </para>
          <para>
           The information useful for conflict detection is no longer retained 
if
@@ -541,12 +540,13 @@ CREATE SUBSCRIPTION <replaceable 
class="parameter">subscription_name</replaceabl
           <literal>max_retention_duration</literal> set within the
           corresponding subscription. To re-enable retention manually, you can
           disable <literal>retain_dead_tuples</literal> for all subscriptions 
and
-          re-enable it after confirming this replication slot has been dropped.
+          re-enable it after confirming replication slot
+          <quote><literal>pg_conflict_detection</literal></quote> has been 
dropped.
          </para>
          <para>
-          Note that overall retention will not stop if other subscriptions
-          specify a greater value and have not exceeded it, or if they set this
-          option to 0.
+          Note that overall retention will not stop if other subscriptions that
+          have a value greater than 0 for this parameter have not exceeded it,
+          or if they set this option to 0.
          </para>
          <para>
           This option is effective only when
diff --git a/src/backend/catalog/system_views.sql 
b/src/backend/catalog/system_views.sql
index fbd3d1900f4..c77fa0234bb 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1389,7 +1389,7 @@ REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
               subbinary, substream, subtwophasestate, subdisableonerr,
                          subpasswordrequired, subrunasowner, subfailover,
-                         subretaindeadtuples, submaxretention, 
subretentionactive,
+              subretaindeadtuples, submaxretention, subretentionactive,
               subslotname, subsynccommit, subpublications, suborigin)
     ON pg_subscription TO public;
 
diff --git a/src/backend/commands/subscriptioncmds.c 
b/src/backend/commands/subscriptioncmds.c
index f9d92702434..aeb0c413067 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -662,8 +662,8 @@ CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
        }
 
        /*
-        * Ensure that the configurations for retain_dead_tuples and
-        * max_retention_duration is appropriate.
+        * Ensure that system configuration paramters are set appropriately to
+        * support retain_dead_tuples and max_retention_duration.
         */
        CheckSubDeadTupleRetention(true, !opts.enabled, WARNING,
                                                           
opts.retaindeadtuples, opts.retaindeadtuples,
@@ -1490,8 +1490,8 @@ AlterSubscription(ParseState *pstate, 
AlterSubscriptionStmt *stmt,
                                }
 
                                /*
-                                * Ensure that the configurations for 
retain_dead_tuples and
-                                * max_retention_duration is appropriate.
+                                * Ensure that system configuration paramters 
are set appropriately to
+                                * support retain_dead_tuples and 
max_retention_duration.
                                 */
                                if (IsSet(opts.specified_opts, 
SUBOPT_RETAIN_DEAD_TUPLES) ||
                                        IsSet(opts.specified_opts, 
SUBOPT_MAX_RETENTION_DURATION))
diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index c0211867881..969d48a1d5d 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -23,7 +23,6 @@
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "catalog/pg_subscription.h"
-#include "catalog/pg_subscription_d.h"
 #include "catalog/pg_subscription_rel.h"
 #include "funcapi.h"
 #include "lib/dshash.h"
@@ -1258,7 +1257,7 @@ ApplyLauncherMain(Datum main_arg)
                                /*
                                 * Compute the minimum xmin required to protect 
dead tuples
                                 * required for conflict detection among all 
running apply
-                                * workers that enables retain_dead_tuples.
+                                * workers.
                                 */
                                if (sub->retaindeadtuples &&
                                        sub->retentionactive &&
@@ -1328,11 +1327,10 @@ ApplyLauncherMain(Datum main_arg)
                 * advance the slot's xmin to protect dead tuples required for 
the
                 * conflict detection.
                 *
-                * However, if all apply workers for subscriptions with
-                * retain_dead_tuples enabled have requested to cease retention,
-                * marking it as inactive, the new xmin will be set to
-                * InvalidTransactionId. We then update slot.xmin accordingly to
-                * permit the removal of dead tuples.
+                * Additionally, if all apply workers for subscriptions with
+                * retain_dead_tuples enabled have requested to stop retention,
+                * the slot's xmin will be set to InvalidTransactionId allowing 
the
+                * removal of dead tuples.
                 */
                if (MyReplicationSlot)
                {
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index d378eb08c71..df16cbe3716 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -173,6 +173,14 @@
  *   Advance the non-removable transaction ID if the current flush location has
  *   reached or surpassed the last received WAL position.
  *
+ * - RDT_STOP_CONFLICT_INFO_RETENTION:
+ *   This phase is required only when max_retention_duration is defined. We
+ *   enter this phase if the wait time in either the
+ *   RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
+ *   configured max_retention_duration. In this phase,
+ *   pg_subscription.subretentionactive is updated to false within a new
+ *   transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
+ *
  * The overall state progression is: GET_CANDIDATE_XID ->
  * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
  * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
@@ -190,16 +198,6 @@
  * update_deleted is necessary, as the UPDATEs in remote transactions should be
  * ignored if their timestamp is earlier than that of the dead tuples.
  *
- * If max_retention_duration is defined, one additional phase is
- * involved:
- *
- * - RDT_STOP_CONFLICT_INFO_RETENTION:
- *   This phase is triggered when the wait time in either the
- *   RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
- *   max_retention_duration. During this phase,
- *   pg_subscription.subretentionactive is updated to false within a new
- *   transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
- *
  * Note that advancing the non-removable transaction ID is not supported if the
  * publisher is also a physical standby. This is because the logical walsender
  * on the standby can only get the WAL replay position but there may be more
@@ -569,9 +567,9 @@ static void request_publisher_status(RetainDeadTuplesData 
*rdt_data);
 static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
                                                                          bool 
status_received);
 static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
+static bool should_stop_conflict_info_retention(RetainDeadTuplesData* 
rdt_data);
 static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
 static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data);
-static bool should_stop_conflict_info_retention(RetainDeadTuplesData 
*rdt_data);
 static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
                                                                                
bool new_xid_found);
 
@@ -3247,16 +3245,16 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid 
localidxoid,
        /*
         * For conflict detection, we use the leader worker's
         * oldest_nonremovable_xid value instead of invoking
-        * GetOldestNonRemovableTransactionId() or relying on the conflict
-        * detection slot's xmin. The oldest_nonremovable_xid acts as a 
threshold
-        * to identify tuples that were recently deleted. These tuples are not
-        * visible to concurrent transactions, but we log an update_deleted
-        * conflict if such a tuple matches the remote update being applied.
+        * GetOldestNonRemovableTransactionId() or using the conflict detection
+        * slot's xmin. The oldest_nonremovable_xid acts as a threshold to 
identify
+        * tuples that were recently deleted. These deleted tuples are no longer
+        * visible to concurrent transactions. However, if a remote update 
matches
+        * such a tuple, we log an update_deleted conflict.
         *
-        * Although GetOldestNonRemovableTransactionId() and slot.xmin can 
return
-        * a value older than the oldest_nonremovable_xid, for our current 
purpose
-        * it is acceptable to treat tuples deleted by transactions prior to
-        * oldest_nonremovable_xid as update_missing conflicts.
+        * While GetOldestNonRemovableTransactionId() and slot.xmin may return
+        * transaction IDs older than oldest_nonremovable_xid, for our current
+        * purpose, it is acceptable to treat tuples deleted by transactions 
prior
+        * to oldest_nonremovable_xid as update_missing conflicts.
         */
        if (am_leader_apply_worker())
        {
@@ -4669,6 +4667,55 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
        process_rdt_phase_transition(rdt_data, false);
 }
 
+/*
+ * Check whether conflict information retention should be stopped due to
+ * exceeding the maximum wait time (max_retention_duration).
+ *
+ * If retention should be stopped, transition to the
+ * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return
+ * false.
+ *
+ * Note: Retention won't be resumed automatically. The user must manually
+ * disable retain_dead_tuples and re-enable it after confirming that the
+ * replication slot maintained by the launcher has been dropped.
+ */
+static bool
+should_stop_conflict_info_retention(RetainDeadTuplesData* rdt_data)
+{
+       TimestampTz now;
+
+       Assert(TransactionIdIsValid(rdt_data->candidate_xid));
+       Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS ||
+                  rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH);
+
+       if (!MySubscription->maxretention)
+               return false;
+
+       /*
+        * Use last_recv_time when applying changes in the loop to avoid
+        * unnecessary system time retrieval. If last_recv_time is not 
available,
+        * obtain the current timestamp.
+        */
+       now = rdt_data->last_recv_time ? rdt_data->last_recv_time : 
GetCurrentTimestamp();
+
+       /*
+        * Return early if the wait time has not exceeded the configured maximum
+        * (max_retention_duration). Time spent waiting for table 
synchronization
+        * is excluded from this calculation, as it occurs infrequently.
+        */
+       if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
+                                                                       
MySubscription->maxretention +
+                                                                       
rdt_data->table_sync_wait_time))
+               return false;
+
+       rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
+
+       /* process the next phase */
+       process_rdt_phase_transition(rdt_data, false);
+
+       return true;
+}
+
 /*
  * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase.
  */
@@ -4732,67 +4779,22 @@ reset_retention_data_fields(RetainDeadTuplesData 
*rdt_data)
        rdt_data->table_sync_wait_time = 0;
 }
 
-/*
- * Check whether conflict information retention should be stopped due to
- * exceeding the maximum wait time (max_retention_duration).
- *
- * If retention should be stopped, transition to the
- * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return
- * false.
- *
- * Note: Retention won't be resumed automatically. The user must manually
- * disable retain_dead_tuples and re-enable it after confirming that the
- * replication slot maintained by the launcher has been dropped.
- */
-static bool
-should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
-{
-       TimestampTz now;
-
-       Assert(TransactionIdIsValid(rdt_data->candidate_xid));
-       Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS ||
-                  rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH);
-
-       if (!MySubscription->maxretention)
-               return false;
-
-       /*
-        * Use last_recv_time when applying changes in the loop to avoid
-        * unnecessary system time retrieval. If last_recv_time is not 
available,
-        * obtain the current timestamp.
-        */
-       now = rdt_data->last_recv_time ? rdt_data->last_recv_time : 
GetCurrentTimestamp();
-
-       /*
-        * Return early if the wait time has not exceeded the configured maximum
-        * (max_retention_duration). Time spent waiting for table 
synchronization
-        * is excluded from this calculation, as it occurs infrequently.
-        */
-       if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
-                                                                       
MySubscription->maxretention +
-                                                                       
rdt_data->table_sync_wait_time))
-               return false;
-
-       rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
-
-       /* process the next phase */
-       process_rdt_phase_transition(rdt_data, false);
-
-       return true;
-}
-
 /*
  * Adjust the interval for advancing non-removable transaction IDs.
  *
- * We double the interval to try advancing the non-removable transaction IDs if
- * there is no activity on the node. The maximum value of the interval is 
capped
- * by wal_receiver_status_interval if it is not zero, otherwise to a 3 minutes
- * which should be sufficient to avoid using CPU or network resources without
- * much benefit. However, this maximum interval will not exceed
- * max_retention_duration.
+ * If there is no activity on the node, we progressively double the interval
+ * used to advance non-removable transaction ID. This helps conserve CPU
+ * and network resources when there's little benefit to frequent updates.
+ *
+ * The interval is capped by the lowest of the following:
+ * - wal_receiver_status_interval (if set),
+ * - a default maximum of 3 minutes,
+ * - max_retention_duration.
  *
- * The interval is reset to the lesser of 100ms and
- * max_retention_duration once there is some activity on the node.
+ * This ensures the interval never exceeds the retention boundary, even if
+ * other limits are higher. Once activity resumes on the node, the interval
+ * is reset to lesser of 100ms and max_retention_duration, allowing timely
+ * advancement of non-removable transaction ID.
  *
  * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
  * consider the other interval or a separate GUC if the need arises.

Reply via email to