> Your use looks good to me. So, maybe we can update the docs with the > dangers if the users of API doesn't follow commit order then it may > lead to data inconsistency should be sufficient. Additionally, we may > want to give an example as to how to use this API for parallel apply.
That sounds reasonable. I’ve updated the patch and added more information to the documentation covering the topics you mentioned. I also added a Caution block so potential users won’t miss it. I hope this patch meets your expectations.
From 74a74fd02bce786093c19a23bef9444d0b8ef41d Mon Sep 17 00:00:00 2001 From: Doruk <do...@mixrank.com> Date: Fri, 15 Aug 2025 21:37:18 +0300 Subject: [PATCH v5] pg_replication_origin_session_setup: pid parameter Since the introduction of parallel apply workers (commit 216a784829c), the replorigin_session_setup() was extended to accept an extra parameter: pid. This process ID is used to inform that multiple processes are sharing the same replication origin to apply changes in parallel. The replorigin_session_setup function has a SQL user interface: pg_replication_origin_session_setup. This commit adds an optional parameter that passes the process ID to the internal function replorigin_session_setup. It allows multiple processes to use the same replication origin if you are using the replication functions. --- doc/src/sgml/func/func-admin.sgml | 22 ++++++++++++++++++++-- src/backend/catalog/system_functions.sql | 9 ++++++++- src/backend/replication/logical/origin.c | 4 +++- src/include/catalog/pg_proc.dat | 2 +- 4 files changed, 32 insertions(+), 5 deletions(-) diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml index 446fdfe..4b86676 100644 --- a/doc/src/sgml/func/func-admin.sgml +++ b/doc/src/sgml/func/func-admin.sgml @@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset <indexterm> <primary>pg_replication_origin_session_setup</primary> </indexterm> - <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> ) + <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> ) <returnvalue>void</returnvalue> </para> <para> @@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset origin, allowing replay progress to be tracked. Can only be used if no origin is currently selected. Use <function>pg_replication_origin_session_reset</function> to undo. - </para></entry> + If multiple processes can safely use the same replication origin (for + example, parallel apply processes), the optional <parameter>pid</parameter> + parameter can be used to specify the process ID of the first process. + The first process must provide <parameter>pid</parameter> equals to + <literal>0</literal> and the other processes that share the same + replication origin should provide the process ID of the first process. + </para> + <caution> + <para> + When multiple processes share the same replication origin, it is critical + to maintain commit order to prevent data inconsistency. While processes + may send operations out of order, they must commit transactions in the + correct sequence to ensure proper replication consistency. The recommended workflow + for each worker is: set up the replication origin session with the first process's PID, + apply changes within transactions, call <function>pg_replication_origin_xact_setup</function> + with the LSN and commit timestamp before committing, then commit the + transaction only if everything succeeded. + </para> + </caution> + </entry> </row> <row> diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 566f308..f60287d 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -650,6 +650,13 @@ LANGUAGE INTERNAL CALLED ON NULL INPUT VOLATILE PARALLEL SAFE AS 'pg_stat_reset_slru'; +CREATE OR REPLACE FUNCTION + pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0) +RETURNS void +LANGUAGE INTERNAL +STRICT VOLATILE +AS 'pg_replication_origin_session_setup'; + -- -- The default permissions for functions mean that anyone can execute them. -- A number of functions shouldn't be executable by just anyone, but rather @@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public; -REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public; +REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public; REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public; diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 87f10e5..98d47e1 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -1374,12 +1374,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS) { char *name; RepOriginId origin; + int pid; replorigin_check_prerequisites(true, false); name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); origin = replorigin_by_name(name, false); - replorigin_session_setup(origin, 0); + pid = PG_GETARG_INT32(1); + replorigin_session_setup(origin, pid); replorigin_session_origin = origin; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 118d6da..dd2d938 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12223,7 +12223,7 @@ { oid => '6006', descr => 'configure session to maintain replication progress tracking for the passed in origin', proname => 'pg_replication_origin_session_setup', provolatile => 'v', - proparallel => 'u', prorettype => 'void', proargtypes => 'text', + proparallel => 'u', prorettype => 'void', proargtypes => 'text int4', prosrc => 'pg_replication_origin_session_setup' }, { oid => '6007', descr => 'teardown configured replication progress tracking',