On Wed, Jun 24, 2026 at 12:53 PM Dilip Kumar <[email protected]> wrote:
>
> On Tue, Jun 23, 2026 at 2:22 PM vignesh C <[email protected]> wrote:
> >
> > Few comments:
> > 1) Currently we are storing these in shared memory. Looking at the
> > implementation, these fields are purely worker-private state used to
> > ferry data across the error boundary from prepare_conflict_log_tuple()
> > (inside the PG_TRY block) to ProcessPendingConflictLogTuple() (inside
> > the PG_CATCH block).

Good point.

If it is not required by another process, should
> > it be moved out of shared memory.
> > +       /* A conflict log tuple that is prepared but not yet inserted. */
> > +       HeapTuple       conflict_log_tuple;
> > +
> > +       /*
> > +        * Error-context string describing the conflict above, used to
> > annotate any
> > +        * error raised while inserting conflict_log_tuple into the 
> > conflict log
> > +        * table.  Allocated, like conflict_log_tuple, in ApplyContext.
> > +        */
> > +       char       *conflict_log_errcontext;
>
> Yeah there is no need for them to be in shared memory, but do we have
> any other data sturcture where these fits naturally, or we can make
> them global variables?
>

Or we can have a file local struct PendingConflictLogData similar to
FlushPosition. See the attached top-up patch. As the comment
("Allocated, like conflict_log_tuple, in ApplyContext") says it is
allocated in process-local Apply context, it is not safe to keep them
in shared memory.

>
> > 4) Is the condition remote_commit_ts > 0 done intentionally?
> > +       if (remote_commit_ts > 0)
> > +               values[attno++] = TimestampTzGetDatum(remote_commit_ts);
> > +       else
> > +               nulls[attno++] = true;
> >
> > As I had seen some negative values for certain timestamps. Shouldn't
> > the check be != 0?
> > SELECT extract(epoch FROM '1969-12-31 23:59:59+00'::timestamptz);
> >   extract
> > -----------
> >  -1.000000
> > (1 row)
>
> I think the 0 can also be generated for timestamptz, but since we are
> initializing `timestamptz` with 0, checking it seems correct.= 0, but
> I need to put more thought into this.
>

I think either way (>0 or !=0) are fine as both will serve the desired
purpose but I think !=0 will be more robust because we are using 0 as
sentinel value for remote_commit_ts.

-- 
With Regards,
Amit Kapila.
From 773671f06bf3c2e577d0ad99d40a23b84cae9258 Mon Sep 17 00:00:00 2001
From: Amit Kapila <[email protected]>
Date: Wed, 24 Jun 2026 16:08:18 +0530
Subject: [PATCH v1_amit] Move pending conflict log state out of shared memory

The prepared conflict tuple and its error-context string are purely
worker-private data ferried across the apply error boundary, so keep them in a
process-local static struct in conflict.c instead of the shared 
LogicalRepWorker.
---
 src/backend/replication/logical/conflict.c | 67 ++++++++++++++--------
 src/backend/replication/logical/launcher.c |  1 -
 src/include/replication/worker_internal.h  | 10 ----
 3 files changed, 43 insertions(+), 35 deletions(-)

diff --git a/src/backend/replication/logical/conflict.c 
b/src/backend/replication/logical/conflict.c
index 983cdf94cf0..e927c152028 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -101,6 +101,25 @@ static const char *const ConflictTypeNames[] = {
        [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
 };
 
+/*
+ * Worker-private state for a conflict that has been prepared for the conflict
+ * log table but not yet inserted.  It carries the prepared tuple, and a
+ * description of the conflict used for error context, from
+ * prepare_conflict_log_tuple() across the apply error boundary to
+ * ProcessPendingConflictLogTuple()/InsertConflictLogTuple().  Both pointers
+ * reference memory allocated in ApplyContext.
+ *
+ * This is purely process-local state, so it lives here rather than in the
+ * shared LogicalRepWorker struct.
+ */
+typedef struct PendingConflictLogData
+{
+       HeapTuple       tuple;                  /* prepared, not-yet-inserted 
conflict tuple */
+       char       *errcontext_str;     /* conflict description for error 
context */
+} PendingConflictLog;
+
+static PendingConflictLog pending_conflict_log = {0};
+
 static int     errcode_apply_conflict(ConflictType type);
 static void errdetail_apply_conflict(EState *estate,
                                                                         
ResultRelInfo *relinfo,
@@ -427,15 +446,15 @@ ReportApplyConflict(EState *estate, ResultRelInfo 
*relinfo, int elevel,
                                 * insertion path 
(ProcessPendingConflictLogTuple) does not retry
                                 * this same failing insert.
                                 */
-                               if (MyLogicalRepWorker->conflict_log_tuple != 
NULL)
+                               if (pending_conflict_log.tuple != NULL)
                                {
-                                       
heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
-                                       MyLogicalRepWorker->conflict_log_tuple 
= NULL;
+                                       
heap_freetuple(pending_conflict_log.tuple);
+                                       pending_conflict_log.tuple = NULL;
                                }
-                               if (MyLogicalRepWorker->conflict_log_errcontext 
!= NULL)
+                               if (pending_conflict_log.errcontext_str != NULL)
                                {
-                                       
pfree(MyLogicalRepWorker->conflict_log_errcontext);
-                                       
MyLogicalRepWorker->conflict_log_errcontext = NULL;
+                                       
pfree(pending_conflict_log.errcontext_str);
+                                       pending_conflict_log.errcontext_str = 
NULL;
                                }
                                PG_RE_THROW();
                        }
@@ -468,7 +487,7 @@ ProcessPendingConflictLogTuple(void)
        Relation        conflictlogrel;
 
        /* Nothing to do */
-       if (MyLogicalRepWorker->conflict_log_tuple == NULL)
+       if (pending_conflict_log.tuple == NULL)
                return;
 
        /*
@@ -499,12 +518,12 @@ ProcessPendingConflictLogTuple(void)
                 * ReportApplyConflict().  Nothing more to do; just discard the 
prepared
                 * tuple and its context string.
                 */
-               heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
-               MyLogicalRepWorker->conflict_log_tuple = NULL;
-               if (MyLogicalRepWorker->conflict_log_errcontext)
+               heap_freetuple(pending_conflict_log.tuple);
+               pending_conflict_log.tuple = NULL;
+               if (pending_conflict_log.errcontext_str)
                {
-                       pfree(MyLogicalRepWorker->conflict_log_errcontext);
-                       MyLogicalRepWorker->conflict_log_errcontext = NULL;
+                       pfree(pending_conflict_log.errcontext_str);
+                       pending_conflict_log.errcontext_str = NULL;
                }
        }
 
@@ -634,7 +653,7 @@ InsertConflictLogTuple(Relation conflictlogrel)
        ErrorContextCallback errcallback;
 
        /* A valid tuple must be prepared and stored in MyLogicalRepWorker. */
-       Assert(MyLogicalRepWorker->conflict_log_tuple != NULL);
+       Assert(pending_conflict_log.tuple != NULL);
 
        /*
         * Set up an error context so that a failure to insert (e.g. the 
conflict
@@ -642,22 +661,22 @@ InsertConflictLogTuple(Relation conflictlogrel)
         * identifying the conflict we were trying to log.
         */
        errcallback.callback = conflict_log_insert_errcontext;
-       errcallback.arg = MyLogicalRepWorker->conflict_log_errcontext;
+       errcallback.arg = pending_conflict_log.errcontext_str;
        errcallback.previous = error_context_stack;
        error_context_stack = &errcallback;
 
-       heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple,
+       heap_insert(conflictlogrel, pending_conflict_log.tuple,
                                GetCurrentCommandId(true), 
HEAP_INSERT_NO_LOGICAL, NULL);
 
        error_context_stack = errcallback.previous;
 
        /* Free the conflict log tuple and its context string. */
-       heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
-       MyLogicalRepWorker->conflict_log_tuple = NULL;
-       if (MyLogicalRepWorker->conflict_log_errcontext)
+       heap_freetuple(pending_conflict_log.tuple);
+       pending_conflict_log.tuple = NULL;
+       if (pending_conflict_log.errcontext_str)
        {
-               pfree(MyLogicalRepWorker->conflict_log_errcontext);
-               MyLogicalRepWorker->conflict_log_errcontext = NULL;
+               pfree(pending_conflict_log.errcontext_str);
+               pending_conflict_log.errcontext_str = NULL;
        }
 }
 
@@ -1394,7 +1413,7 @@ build_local_conflicts_json_array(EState *estate, Relation 
rel,
  *
  * This routine prepares a tuple detailing a conflict encountered during
  * logical replication. The prepared tuple will be stored in
- * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the
+ * pending_conflict_log.tuple which should be inserted into the
  * conflict log table by calling InsertConflictLogTuple.
  */
 static void
@@ -1411,7 +1430,7 @@ prepare_conflict_log_tuple(EState *estate, Relation rel,
        char       *remote_origin = NULL;
        MemoryContext   oldctx;
 
-       Assert(MyLogicalRepWorker->conflict_log_tuple == NULL);
+       Assert(pending_conflict_log.tuple == NULL);
 
        /* Populate the values and nulls arrays. */
        attno = 0;
@@ -1475,7 +1494,7 @@ prepare_conflict_log_tuple(EState *estate, Relation rel,
        Assert(attno + 1 == NUM_CONFLICT_ATTRS);
 
        oldctx = MemoryContextSwitchTo(ApplyContext);
-       MyLogicalRepWorker->conflict_log_tuple =
+       pending_conflict_log.tuple =
                heap_form_tuple(RelationGetDescr(conflictlogrel), values, 
nulls);
 
        /*
@@ -1483,7 +1502,7 @@ prepare_conflict_log_tuple(EState *estate, Relation rel,
         * the tuple into the conflict log table fails, the resulting error 
carries
         * enough context to identify the conflict (see InsertConflictLogTuple).
         */
-       MyLogicalRepWorker->conflict_log_errcontext =
+       pending_conflict_log.errcontext_str =
                psprintf("while logging conflict \"%s\" detected on relation 
\"%s\"",
                                 ConflictTypeNames[conflict_type],
                                 RelationGetRelationName(rel));
diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 05a30342f69..313e31ff2e3 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -487,7 +487,6 @@ retry:
        worker->oldest_nonremovable_xid = retain_dead_tuples
                ? MyReplicationSlot->data.xmin
                : InvalidTransactionId;
-       worker->conflict_log_tuple = NULL;
        worker->last_lsn = InvalidXLogRecPtr;
        TIMESTAMP_NOBEGIN(worker->last_send_time);
        TIMESTAMP_NOBEGIN(worker->last_recv_time);
diff --git a/src/include/replication/worker_internal.h 
b/src/include/replication/worker_internal.h
index 79c90dddd89..00ad0d86a79 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -100,16 +100,6 @@ typedef struct LogicalRepWorker
         */
        TransactionId oldest_nonremovable_xid;
 
-       /* A conflict log tuple that is prepared but not yet inserted. */
-       HeapTuple       conflict_log_tuple;
-
-       /*
-        * Error-context string describing the conflict above, used to annotate 
any
-        * error raised while inserting conflict_log_tuple into the conflict log
-        * table.  Allocated, like conflict_log_tuple, in ApplyContext.
-        */
-       char       *conflict_log_errcontext;
-
        /* Stats. */
        XLogRecPtr      last_lsn;
        TimestampTz last_send_time;
-- 
2.54.0

Reply via email to