> Your use looks good to me. So, maybe we can update the docs with the
> dangers if the users of API doesn't follow commit order then it may
> lead to data inconsistency should be sufficient. Additionally, we may
> want to give an example as to how to use this API for parallel apply.

That sounds reasonable. I’ve updated the patch and added more
information to the documentation covering the topics you mentioned.
I also added a Caution block so potential users won’t miss it. I hope
this patch meets your expectations.
From 74a74fd02bce786093c19a23bef9444d0b8ef41d Mon Sep 17 00:00:00 2001
From: Doruk <do...@mixrank.com>
Date: Fri, 15 Aug 2025 21:37:18 +0300
Subject: [PATCH v5] pg_replication_origin_session_setup: pid parameter

Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
 doc/src/sgml/func/func-admin.sgml        | 22 ++++++++++++++++++++--
 src/backend/catalog/system_functions.sql |  9 ++++++++-
 src/backend/replication/logical/origin.c |  4 +++-
 src/include/catalog/pg_proc.dat          |  2 +-
 4 files changed, 32 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 446fdfe..4b86676 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <indexterm>
          <primary>pg_replication_origin_session_setup</primary>
         </indexterm>
-        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
         <returnvalue>void</returnvalue>
        </para>
        <para>
@@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         origin, allowing replay progress to be tracked.
         Can only be used if no origin is currently selected.
         Use <function>pg_replication_origin_session_reset</function> to undo.
-       </para></entry>
+        If multiple processes can safely use the same replication origin (for
+        example, parallel apply processes), the optional <parameter>pid</parameter>
+        parameter can be used to specify the process ID of the first process.
+        The first process must provide <parameter>pid</parameter> equals to
+        <literal>0</literal> and the other processes that share the same
+        replication origin should provide the process ID of the first process.
+       </para>
+       <caution>
+        <para>
+         When multiple processes share the same replication origin, it is critical
+         to maintain commit order to prevent data inconsistency. While processes
+         may send operations out of order, they must commit transactions in the
+         correct sequence to ensure proper replication consistency. The recommended workflow
+         for each worker is: set up the replication origin session with the first process's PID,
+         apply changes within transactions, call <function>pg_replication_origin_xact_setup</function>
+         with the LSN and commit timestamp before committing, then commit the
+         transaction only if everything succeeded.
+        </para>
+       </caution>
+      </entry>
       </row>

       <row>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 566f308..f60287d 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -650,6 +650,13 @@ LANGUAGE INTERNAL
 CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
 AS 'pg_stat_reset_slru';

+CREATE OR REPLACE FUNCTION
+  pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_replication_origin_session_setup';
+
 --
 -- The default permissions for functions mean that anyone can execute them.
 -- A number of functions shouldn't be executable by just anyone, but rather
@@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM

 REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;

-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;

 REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;

diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 87f10e5..98d47e1 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1374,12 +1374,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
 {
 	char	   *name;
 	RepOriginId origin;
+	int			pid;

 	replorigin_check_prerequisites(true, false);

 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
 	origin = replorigin_by_name(name, false);
-	replorigin_session_setup(origin, 0);
+	pid = PG_GETARG_INT32(1);
+	replorigin_session_setup(origin, pid);

 	replorigin_session_origin = origin;

diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 118d6da..dd2d938 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12223,7 +12223,7 @@
 { oid => '6006',
   descr => 'configure session to maintain replication progress tracking for the passed in origin',
   proname => 'pg_replication_origin_session_setup', provolatile => 'v',
-  proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+  proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
   prosrc => 'pg_replication_origin_session_setup' },

 { oid => '6007', descr => 'teardown configured replication progress tracking',

Reply via email to