On Wed, Jun 24, 2026 at 12:53 PM Dilip Kumar <[email protected]> wrote:
>
> On Tue, Jun 23, 2026 at 2:22 PM vignesh C <[email protected]> wrote:
> >
> > Few comments:
> > 1) Currently we are storing these in shared memory. Looking at the
> > implementation, these fields are purely worker-private state used to
> > ferry data across the error boundary from prepare_conflict_log_tuple()
> > (inside the PG_TRY block) to ProcessPendingConflictLogTuple() (inside
> > the PG_CATCH block).
Good point.
If it is not required by another process, should
> > it be moved out of shared memory.
> > + /* A conflict log tuple that is prepared but not yet inserted. */
> > + HeapTuple conflict_log_tuple;
> > +
> > + /*
> > + * Error-context string describing the conflict above, used to
> > annotate any
> > + * error raised while inserting conflict_log_tuple into the
> > conflict log
> > + * table. Allocated, like conflict_log_tuple, in ApplyContext.
> > + */
> > + char *conflict_log_errcontext;
>
> Yeah there is no need for them to be in shared memory, but do we have
> any other data sturcture where these fits naturally, or we can make
> them global variables?
>
Or we can have a file local struct PendingConflictLogData similar to
FlushPosition. See the attached top-up patch. As the comment
("Allocated, like conflict_log_tuple, in ApplyContext") says it is
allocated in process-local Apply context, it is not safe to keep them
in shared memory.
>
> > 4) Is the condition remote_commit_ts > 0 done intentionally?
> > + if (remote_commit_ts > 0)
> > + values[attno++] = TimestampTzGetDatum(remote_commit_ts);
> > + else
> > + nulls[attno++] = true;
> >
> > As I had seen some negative values for certain timestamps. Shouldn't
> > the check be != 0?
> > SELECT extract(epoch FROM '1969-12-31 23:59:59+00'::timestamptz);
> > extract
> > -----------
> > -1.000000
> > (1 row)
>
> I think the 0 can also be generated for timestamptz, but since we are
> initializing `timestamptz` with 0, checking it seems correct.= 0, but
> I need to put more thought into this.
>
I think either way (>0 or !=0) are fine as both will serve the desired
purpose but I think !=0 will be more robust because we are using 0 as
sentinel value for remote_commit_ts.
--
With Regards,
Amit Kapila.
From 773671f06bf3c2e577d0ad99d40a23b84cae9258 Mon Sep 17 00:00:00 2001
From: Amit Kapila <[email protected]>
Date: Wed, 24 Jun 2026 16:08:18 +0530
Subject: [PATCH v1_amit] Move pending conflict log state out of shared memory
The prepared conflict tuple and its error-context string are purely
worker-private data ferried across the apply error boundary, so keep them in a
process-local static struct in conflict.c instead of the shared
LogicalRepWorker.
---
src/backend/replication/logical/conflict.c | 67 ++++++++++++++--------
src/backend/replication/logical/launcher.c | 1 -
src/include/replication/worker_internal.h | 10 ----
3 files changed, 43 insertions(+), 35 deletions(-)
diff --git a/src/backend/replication/logical/conflict.c
b/src/backend/replication/logical/conflict.c
index 983cdf94cf0..e927c152028 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -101,6 +101,25 @@ static const char *const ConflictTypeNames[] = {
[CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
};
+/*
+ * Worker-private state for a conflict that has been prepared for the conflict
+ * log table but not yet inserted. It carries the prepared tuple, and a
+ * description of the conflict used for error context, from
+ * prepare_conflict_log_tuple() across the apply error boundary to
+ * ProcessPendingConflictLogTuple()/InsertConflictLogTuple(). Both pointers
+ * reference memory allocated in ApplyContext.
+ *
+ * This is purely process-local state, so it lives here rather than in the
+ * shared LogicalRepWorker struct.
+ */
+typedef struct PendingConflictLogData
+{
+ HeapTuple tuple; /* prepared, not-yet-inserted
conflict tuple */
+ char *errcontext_str; /* conflict description for error
context */
+} PendingConflictLog;
+
+static PendingConflictLog pending_conflict_log = {0};
+
static int errcode_apply_conflict(ConflictType type);
static void errdetail_apply_conflict(EState *estate,
ResultRelInfo *relinfo,
@@ -427,15 +446,15 @@ ReportApplyConflict(EState *estate, ResultRelInfo
*relinfo, int elevel,
* insertion path
(ProcessPendingConflictLogTuple) does not retry
* this same failing insert.
*/
- if (MyLogicalRepWorker->conflict_log_tuple !=
NULL)
+ if (pending_conflict_log.tuple != NULL)
{
-
heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
- MyLogicalRepWorker->conflict_log_tuple
= NULL;
+
heap_freetuple(pending_conflict_log.tuple);
+ pending_conflict_log.tuple = NULL;
}
- if (MyLogicalRepWorker->conflict_log_errcontext
!= NULL)
+ if (pending_conflict_log.errcontext_str != NULL)
{
-
pfree(MyLogicalRepWorker->conflict_log_errcontext);
-
MyLogicalRepWorker->conflict_log_errcontext = NULL;
+
pfree(pending_conflict_log.errcontext_str);
+ pending_conflict_log.errcontext_str =
NULL;
}
PG_RE_THROW();
}
@@ -468,7 +487,7 @@ ProcessPendingConflictLogTuple(void)
Relation conflictlogrel;
/* Nothing to do */
- if (MyLogicalRepWorker->conflict_log_tuple == NULL)
+ if (pending_conflict_log.tuple == NULL)
return;
/*
@@ -499,12 +518,12 @@ ProcessPendingConflictLogTuple(void)
* ReportApplyConflict(). Nothing more to do; just discard the
prepared
* tuple and its context string.
*/
- heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
- MyLogicalRepWorker->conflict_log_tuple = NULL;
- if (MyLogicalRepWorker->conflict_log_errcontext)
+ heap_freetuple(pending_conflict_log.tuple);
+ pending_conflict_log.tuple = NULL;
+ if (pending_conflict_log.errcontext_str)
{
- pfree(MyLogicalRepWorker->conflict_log_errcontext);
- MyLogicalRepWorker->conflict_log_errcontext = NULL;
+ pfree(pending_conflict_log.errcontext_str);
+ pending_conflict_log.errcontext_str = NULL;
}
}
@@ -634,7 +653,7 @@ InsertConflictLogTuple(Relation conflictlogrel)
ErrorContextCallback errcallback;
/* A valid tuple must be prepared and stored in MyLogicalRepWorker. */
- Assert(MyLogicalRepWorker->conflict_log_tuple != NULL);
+ Assert(pending_conflict_log.tuple != NULL);
/*
* Set up an error context so that a failure to insert (e.g. the
conflict
@@ -642,22 +661,22 @@ InsertConflictLogTuple(Relation conflictlogrel)
* identifying the conflict we were trying to log.
*/
errcallback.callback = conflict_log_insert_errcontext;
- errcallback.arg = MyLogicalRepWorker->conflict_log_errcontext;
+ errcallback.arg = pending_conflict_log.errcontext_str;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
- heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple,
+ heap_insert(conflictlogrel, pending_conflict_log.tuple,
GetCurrentCommandId(true),
HEAP_INSERT_NO_LOGICAL, NULL);
error_context_stack = errcallback.previous;
/* Free the conflict log tuple and its context string. */
- heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
- MyLogicalRepWorker->conflict_log_tuple = NULL;
- if (MyLogicalRepWorker->conflict_log_errcontext)
+ heap_freetuple(pending_conflict_log.tuple);
+ pending_conflict_log.tuple = NULL;
+ if (pending_conflict_log.errcontext_str)
{
- pfree(MyLogicalRepWorker->conflict_log_errcontext);
- MyLogicalRepWorker->conflict_log_errcontext = NULL;
+ pfree(pending_conflict_log.errcontext_str);
+ pending_conflict_log.errcontext_str = NULL;
}
}
@@ -1394,7 +1413,7 @@ build_local_conflicts_json_array(EState *estate, Relation
rel,
*
* This routine prepares a tuple detailing a conflict encountered during
* logical replication. The prepared tuple will be stored in
- * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the
+ * pending_conflict_log.tuple which should be inserted into the
* conflict log table by calling InsertConflictLogTuple.
*/
static void
@@ -1411,7 +1430,7 @@ prepare_conflict_log_tuple(EState *estate, Relation rel,
char *remote_origin = NULL;
MemoryContext oldctx;
- Assert(MyLogicalRepWorker->conflict_log_tuple == NULL);
+ Assert(pending_conflict_log.tuple == NULL);
/* Populate the values and nulls arrays. */
attno = 0;
@@ -1475,7 +1494,7 @@ prepare_conflict_log_tuple(EState *estate, Relation rel,
Assert(attno + 1 == NUM_CONFLICT_ATTRS);
oldctx = MemoryContextSwitchTo(ApplyContext);
- MyLogicalRepWorker->conflict_log_tuple =
+ pending_conflict_log.tuple =
heap_form_tuple(RelationGetDescr(conflictlogrel), values,
nulls);
/*
@@ -1483,7 +1502,7 @@ prepare_conflict_log_tuple(EState *estate, Relation rel,
* the tuple into the conflict log table fails, the resulting error
carries
* enough context to identify the conflict (see InsertConflictLogTuple).
*/
- MyLogicalRepWorker->conflict_log_errcontext =
+ pending_conflict_log.errcontext_str =
psprintf("while logging conflict \"%s\" detected on relation
\"%s\"",
ConflictTypeNames[conflict_type],
RelationGetRelationName(rel));
diff --git a/src/backend/replication/logical/launcher.c
b/src/backend/replication/logical/launcher.c
index 05a30342f69..313e31ff2e3 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -487,7 +487,6 @@ retry:
worker->oldest_nonremovable_xid = retain_dead_tuples
? MyReplicationSlot->data.xmin
: InvalidTransactionId;
- worker->conflict_log_tuple = NULL;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
diff --git a/src/include/replication/worker_internal.h
b/src/include/replication/worker_internal.h
index 79c90dddd89..00ad0d86a79 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -100,16 +100,6 @@ typedef struct LogicalRepWorker
*/
TransactionId oldest_nonremovable_xid;
- /* A conflict log tuple that is prepared but not yet inserted. */
- HeapTuple conflict_log_tuple;
-
- /*
- * Error-context string describing the conflict above, used to annotate
any
- * error raised while inserting conflict_log_tuple into the conflict log
- * table. Allocated, like conflict_log_tuple, in ApplyContext.
- */
- char *conflict_log_errcontext;
-
/* Stats. */
XLogRecPtr last_lsn;
TimestampTz last_send_time;
--
2.54.0