On Tue, Dec 3, 2024 at 9:17 AM Zhijie Hou (Fujitsu)
<houzj.f...@fujitsu.com> wrote:
>
> Also, I removed a change in wait_for_local_flush() which was mis-added in 
> V13_2
> patch.
>

1.
+ if (can_advance_nonremovable_xid(&data, last_recv_timestamp))
+ maybe_advance_nonremovable_xid(&data);

In can_advance_nonremovable_xid(), we determine whether to advance the
oldest xid based on 'last_recv_timestamp' and then again in
maybe_advance_nonremovable_xid()->get_candidate_xid(), we compare it
with the current time. How does that make sense? Shall we use
'last_recv_timestamp' directly in get_candidate_xid() as that will
avoid the additional time check in can_advance_nonremovable_xid()?

2.
+ TimestampTz next_attempt_time; /* when to attemp to advance the xid during
+ * change application */
+} RetainConflictInfoData;

This new variable introduced in this version is not used in the patch.
Any reason or just a leftover?

Apart from the above, I have made a few updates in the comments in the
attached. Please include those after review.

-- 
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index 97f4d9fcba..06ba6d3a64 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3736,9 +3736,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
                                                /*
                                                 * Attempt to advance the 
non-removable transaction ID
-                                                * during change application to 
prevent it from
-                                                * remaining unchanged for long 
periods when the worker
-                                                * is busy.
+                                                * to avoid accumulating dead 
rows when the worker is
+                                                * busy.
                                                 */
                                                if 
(can_advance_nonremovable_xid(&data, last_recv_timestamp))
                                                        
maybe_advance_nonremovable_xid(&data);
@@ -3770,6 +3769,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                                                data.remote_epoch = 
pq_getmsgint(&s, 4);
                                                data.reply_time = 
pq_getmsgint64(&s);
 
+                                               /*
+                                                * This should never happen, see
+                                                * 
ProcessStandbyPSRequestMessage. But if it happens
+                                                * due to a bug, we don't want 
to proceed as it can
+                                                * incorrectly advance 
oldest_nonremovable_xid.
+                                                */
                                                if 
(XLogRecPtrIsInvalid(data.remote_lsn))
                                                        elog(ERROR, "cannot get 
the latest WAL position from the publisher");
 
@@ -4164,8 +4169,8 @@ request_publisher_status(RetainConflictInfoData *data)
        data->phase = RCI_WAIT_FOR_PUBLISHER_STATUS;
 
        /*
-        * Skip calling maybe_advance_nonremovable_xid() since further actions
-        * cannot proceed until the publisher status is received.
+        * Skip calling maybe_advance_nonremovable_xid() since further 
transition
+        * is possible only once we receive the publisher status message.
         */
 }
 
@@ -4244,7 +4249,14 @@ wait_for_local_flush(RetainConflictInfoData *data)
        if (!AllTablesyncsReady())
                return;
 
-       /* Return to wait for the changes to be applied */
+       /*
+        * Return to wait for the changes to be applied.
+        *
+        * XXX The remote flush location (last_flushpos) is updated only when
+        * feedback is sent to the server. So, the advancement of
+        * oldest_nonremovable_xid may be delayed. We can always update
+        * last_flushpos here if we notice such a delay.
+        */
        if (last_flushpos < data->remote_lsn)
                return;
 
@@ -4268,7 +4280,7 @@ wait_for_local_flush(RetainConflictInfoData *data)
 }
 
 /*
- * Determine if the next round of transaction ID advancement can be attempted.
+ * Determine if we can attempt to advance transaction ID.
  *
  * TODO: The remote flush location (last_flushpos) is currently not updated
  * during change application, making it impossible to satisfy the condition of

Reply via email to