On Wed, Dec 4, 2024 at 4:29 PM Amit Kapila <[email protected]> wrote:
>
> 1.
> + if (can_advance_nonremovable_xid(&data, last_recv_timestamp))
> + maybe_advance_nonremovable_xid(&data);
>
> In can_advance_nonremovable_xid(), we determine whether to advance the
> oldest xid based on 'last_recv_timestamp' and then again in
> maybe_advance_nonremovable_xid()->get_candidate_xid(), we compare it
> with the current time. How does that make sense? Shall we use
> 'last_recv_timestamp' directly in get_candidate_xid() as that will
> avoid the additional time check in can_advance_nonremovable_xid()?
>
> 2.
> + TimestampTz next_attempt_time; /* when to attemp to advance the xid during
> + * change application */
> +} RetainConflictInfoData;
>
> This new variable introduced in this version is not used in the patch.
> Any reason or just a leftover?
>
> Apart from the above, I have made a few updates in the comments in the
> attached. Please include those after review.
>
A few more comments:
1.
+static void
+wait_for_local_flush(RetainConflictInfoData *data)
{
...
+
+ data->phase = RCI_GET_CANDIDATE_XID;
+
+ maybe_advance_nonremovable_xid(data);
+}
Isn't it better to reset all the fields of data before the next round
of GET_CANDIDATE_XID phase? If we do that then we don't need to reset
data->remote_lsn = InvalidXLogRecPtr; and data->last_phase_at =
InvalidFullTransactionId; individually in request_publisher_status()
and get_candidate_xid() respectively. Also, it looks clean and logical
to me unless I am missing something.
2.
+ /*
+ * Issue a warning if there is a detected clock skew between the publisher
+ * and subscriber.
+ *
+ * XXX Consider waiting for the publisher's clock to catch up with the
+ * subscriber's before proceeding to the next phase.
+ */
+ if (TimestampDifferenceExceeds(data->reply_time,
+ data->candidate_xid_time, 0))
+ ereport(WARNING,
+ errmsg("non-removable transaction ID may be advanced prematurely"),
+ errdetail("The clock on the publisher is behind that of the subscriber."));
Shouldn't this be an ERROR as this will lead to the removal of rows
required to detect update_delete conflict?
Apart from the above, I have made a few more updates in the comments
in the attached. Please include those after review.
--
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/worker.c
b/src/backend/replication/logical/worker.c
index 06ba6d3a64..e89e811c51 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4135,6 +4135,7 @@ get_candidate_xid(RetainConflictInfoData *data)
data->last_phase_at = InvalidFullTransactionId;
data->phase = RCI_REQUEST_PUBLISHER_STATUS;
+ /* process the next phase */
maybe_advance_nonremovable_xid(data);
}
@@ -4156,6 +4157,10 @@ request_publisher_status(RetainConflictInfoData *data)
else
resetStringInfo(request_message);
+ /*
+ * We send the current time to update the remote walsender's latest
reply
+ * message received time.
+ */
pq_sendbyte(request_message, 'S');
pq_sendint64(request_message, GetCurrentTimestamp());
@@ -4213,6 +4218,7 @@ wait_for_publisher_status(RetainConflictInfoData *data)
else
data->phase = RCI_REQUEST_PUBLISHER_STATUS;
+ /* process the next phase */
maybe_advance_nonremovable_xid(data);
}
@@ -4226,8 +4232,10 @@ wait_for_local_flush(RetainConflictInfoData *data)
FullTransactionIdIsValid(data->candidate_xid));
/*
- * Issue a warning if there is a detected clock skew between the
publisher
- * and subscriber.
+ * We expect the publisher and subscriber clocks to be in sync using
+ * time sync service like NTP. Otherwise, we will advance this worker's
+ * oldest_nonremovable_xid prematurely, leading to the removal of rows
+ * required to detect update_delete conflict.
*
* XXX Consider waiting for the publisher's clock to catch up with the
* subscriber's before proceeding to the next phase.
@@ -4235,7 +4243,7 @@ wait_for_local_flush(RetainConflictInfoData *data)
if (TimestampDifferenceExceeds(data->reply_time,
data->candidate_xid_time, 0))
ereport(WARNING,
- errmsg("non-removable transaction ID may be
advanced prematurely"),
+ errmsg("oldest_nonremovable_xid transaction ID
may be advanced prematurely"),
errdetail("The clock on the publisher is behind
that of the subscriber."));
/*
@@ -4276,6 +4284,7 @@ wait_for_local_flush(RetainConflictInfoData *data)
data->phase = RCI_GET_CANDIDATE_XID;
+ /* process the next phase */
maybe_advance_nonremovable_xid(data);
}