From 496cc60401705a4512915db7ebf3358f7004014e Mon Sep 17 00:00:00 2001
From: "Chao Li (Evan)" <lic@highgo.com>
Date: Wed, 24 Dec 2025 09:17:27 +0800
Subject: [PATCH v2] Refactor replication origin state reset helpers

Factor out common logic for clearing per-transaction and per-session
replication origin state into dedicated helper functions.

This removes duplicated assignments of replorigin_session_origin,
replorigin_session_origin_lsn, and replorigin_session_origin_timestamp
across multiple call sites, and makes the intended scope of each reset
(clear per-transaction state vs. clear per-session state) explicit.

No functional change intended.

Author: Chao Li <lic@highgo.com>
---
 .../replication/logical/applyparallelworker.c |  1 -
 src/backend/replication/logical/origin.c      | 20 +++++++++++-------
 src/backend/replication/logical/tablesync.c   |  5 -----
 src/backend/replication/logical/worker.c      | 15 ++++++-------
 src/include/replication/origin.h              | 21 +++++++++++++++++++
 5 files changed, 39 insertions(+), 23 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index a4aafcf5b6e..b05279e0809 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -962,7 +962,6 @@ ParallelApplyWorkerMain(Datum main_arg)
 	 * origin which was already acquired by its leader process.
 	 */
 	replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
-	replorigin_session_origin = originid;
 	CommitTransactionCommand();
 
 	/*
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 2380f369578..45d7bc5abc8 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1213,6 +1213,9 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 
 	/* probably this one is pointless */
 	ConditionVariableBroadcast(&session_replication_state->origin_cv);
+
+	/* set local state too */
+	replorigin_session_origin = node;
 }
 
 /*
@@ -1233,6 +1236,9 @@ replorigin_session_reset(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("no replication origin is configured")));
 
+	/*
+	 * Clear sessioin state in shared memory
+	 */
 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
 	session_replication_state->acquired_by = 0;
@@ -1242,6 +1248,11 @@ replorigin_session_reset(void)
 	LWLockRelease(ReplicationOriginLock);
 
 	ConditionVariableBroadcast(cv);
+
+	/*
+	 * Clear local session state
+	 */
+	replorigin_session_clear_state();
 }
 
 /*
@@ -1395,8 +1406,6 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
 	pid = PG_GETARG_INT32(1);
 	replorigin_session_setup(origin, pid);
 
-	replorigin_session_origin = origin;
-
 	pfree(name);
 
 	PG_RETURN_VOID();
@@ -1412,10 +1421,6 @@ pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
 
 	replorigin_session_reset();
 
-	replorigin_session_origin = InvalidRepOriginId;
-	replorigin_session_origin_lsn = InvalidXLogRecPtr;
-	replorigin_session_origin_timestamp = 0;
-
 	PG_RETURN_VOID();
 }
 
@@ -1482,8 +1487,7 @@ pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
 {
 	replorigin_check_prerequisites(true, false);
 
-	replorigin_session_origin_lsn = InvalidXLogRecPtr;
-	replorigin_session_origin_timestamp = 0;
+	replorigin_xact_clear_state();
 
 	PG_RETURN_VOID();
 }
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 2522e372036..6ac467a9e19 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -323,9 +323,6 @@ ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
 		 * This is needed to allow the origin to be dropped.
 		 */
 		replorigin_session_reset();
-		replorigin_session_origin = InvalidRepOriginId;
-		replorigin_session_origin_lsn = InvalidXLogRecPtr;
-		replorigin_session_origin_timestamp = 0;
 
 		/*
 		 * Drop the tablesync's origin tracking if exists.
@@ -1320,7 +1317,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		 */
 		originid = replorigin_by_name(originname, false);
 		replorigin_session_setup(originid, 0);
-		replorigin_session_origin = originid;
 		*origin_startpos = replorigin_session_get_progress(false);
 
 		CommitTransactionCommand();
@@ -1407,7 +1403,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
 
 	replorigin_session_setup(originid, 0);
-	replorigin_session_origin = originid;
 
 	/*
 	 * If the user did not opt to run as the owner of the subscription
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 718408bb599..651045debee 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -627,7 +627,7 @@ static inline void reset_apply_error_context_info(void);
 static TransApplyAction get_transaction_apply_action(TransactionId xid,
 													 ParallelApplyWorkerInfo **winfo);
 
-static void replorigin_reset(int code, Datum arg);
+static void on_exit_clear_state(int code, Datum arg);
 
 /*
  * Form the origin name for the subscription.
@@ -5594,7 +5594,7 @@ start_apply(XLogRecPtr origin_startpos)
 		 * transaction loss as that transaction won't be sent again by the
 		 * server.
 		 */
-		replorigin_reset(0, (Datum) 0);
+		replorigin_session_clear_state();
 
 		if (MySubscription->disableonerr)
 			DisableSubscriptionAndExit();
@@ -5652,7 +5652,6 @@ run_apply_worker(void)
 	if (!OidIsValid(originid))
 		originid = replorigin_create(originname);
 	replorigin_session_setup(originid, 0);
-	replorigin_session_origin = originid;
 	origin_startpos = replorigin_session_get_progress(false);
 	CommitTransactionCommand();
 
@@ -5865,18 +5864,16 @@ InitializeLogRepWorker(void)
 	 * replication workers that set up origins and apply remote transactions
 	 * are protected.
 	 */
-	before_shmem_exit(replorigin_reset, (Datum) 0);
+	before_shmem_exit(on_exit_clear_state, (Datum) 0);
 }
 
 /*
- * Reset the origin state.
+ * Callback on exit to reset the origin state.
  */
 static void
-replorigin_reset(int code, Datum arg)
+on_exit_clear_state(int code, Datum arg)
 {
-	replorigin_session_origin = InvalidRepOriginId;
-	replorigin_session_origin_lsn = InvalidXLogRecPtr;
-	replorigin_session_origin_timestamp = 0;
+	replorigin_session_clear_state();
 }
 
 /*
diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h
index 2a73f6aa492..288f9ff658f 100644
--- a/src/include/replication/origin.h
+++ b/src/include/replication/origin.h
@@ -44,6 +44,27 @@ extern PGDLLIMPORT RepOriginId replorigin_session_origin;
 extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn;
 extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
 
+/*
+ *	Clear per-transaction replication origin state.
+ */
+static inline void
+replorigin_xact_clear_state(void)
+{
+	replorigin_session_origin_lsn = InvalidXLogRecPtr;
+	replorigin_session_origin_timestamp = 0;
+}
+
+/*
+ * Clear per-session replication origin state.
+ */
+static inline void
+replorigin_session_clear_state(void)
+{
+	replorigin_xact_clear_state();
+	replorigin_session_origin = InvalidRepOriginId;
+}
+
+
 /* GUCs */
 extern PGDLLIMPORT int max_active_replication_origins;
 
-- 
2.39.5 (Apple Git-154)

