Hia, We've added some information such as the command and the timestamp to the error context message by commit abc0910e2. This patch adds further information to it: replication origin name and commit-LSN.
This will be helpful for users to set the origin name and LSN to pg_replication_origin_advance(). The errcontext message would become like follows: *Before ERROR: duplicate key value violates unique constraint "test_pkey" DETAIL: Key (c)=(1) already exists. CONTEXT: processing remote data during "INSERT" for replication target relation "public.test" in transaction 726 at 2022-02-28 20:59:56.005909+09 * After ERROR: duplicate key value violates unique constraint "test_pkey" DETAIL: Key (c)=(1) already exists. CONTEXT: processing remote data during "INSERT" for replication target relation "public.test" in transaction 726 committed at LSN 0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+09 from replication origin "pg_16395" I'm a bit concerned that the message may be too long. I've attached two patches: the first one changes apply_error_callback() so that it uses complete sentences with if-else blocks in order to have a translation work, the second patch adds the origin name and commit-LSN to the errcontext message. Regards, -- Masahiko Sawada EDB: https://www.enterprisedb.com/
From dc6d97c71394c7c216920b9aa1d55bf33c5ac472 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <sawada.mshk@gmail.com> Date: Thu, 24 Feb 2022 16:56:58 +0900 Subject: [PATCH 2/2] Add the origin name and remote commit-LSN to logical replication worker errcontext. This commits adds both the commit-LSN and replication origin name to the existing error context message. This will help users in specifying the origin name and commit-LSN to pg_replication_origin_advance() SQL function to skip the particular transaction. --- doc/src/sgml/logical-replication.sgml | 19 +++++-- src/backend/replication/logical/worker.c | 71 ++++++++++++++++++------ 2 files changed, 67 insertions(+), 23 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 96b4886e08..a96cc21a1c 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -354,12 +354,21 @@ <para> The resolution can be done either by changing data or permissions on the subscriber so that it does not conflict with the incoming change or by skipping the - transaction that conflicts with the existing data. The transaction can be - skipped by calling the <link linkend="pg-replication-origin-advance"> + transaction that conflicts with the existing data. When a conflict produces + an error, it is shown in the subscriber's server logs as follows: +<screen> +ERROR: duplicate key value violates unique constraint "test_pkey" +DETAIL: Key (c)=(1) already exists. +CONTEXT: processing remote data during "INSERT" for replication target relation "public.test" in transaction 725 committed at LSN 0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+00 from replication origin "pg_16395" +</screen> + The LSN of the transaction that contains the change violating the constraint and + the replication origin name can be found from those outputs (LSN 0/14C0378 and + replication origin <literal>pg_16395</literal> in the above case). The transaction + can be skipped by calling the <link linkend="pg-replication-origin-advance"> <function>pg_replication_origin_advance()</function></link> function with - a <parameter>node_name</parameter> corresponding to the subscription name, - and a position. The current position of origins can be seen in the - <link linkend="view-pg-replication-origin-status"> + the <parameter>node_name</parameter> and the next LSN of the commit LSN + (i.e., 0/14C0379) from those outputs. The current position of origins can be + seen in the <link linkend="view-pg-replication-origin-status"> <structname>pg_replication_origin_status</structname></link> system view. </para> </sect1> diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ac49e73b45..3fe5f50806 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg /* Remote node information */ int remote_attnum; /* -1 if invalid */ TransactionId remote_xid; + XLogRecPtr commit_lsn; + char *origin_name; TimestampTz ts; /* commit, rollback, or prepare timestamp */ } ApplyErrorCallbackArg; @@ -235,6 +237,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg = .rel = NULL, .remote_attnum = -1, .remote_xid = InvalidTransactionId, + .commit_lsn = InvalidXLogRecPtr, + .origin_name = NULL, .ts = 0, }; @@ -334,7 +338,8 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); /* Functions for apply error callback */ static void apply_error_callback(void *arg); -static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts); +static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn, + TimestampTz ts); static inline void reset_apply_error_context_info(void); /* @@ -787,7 +792,8 @@ apply_handle_begin(StringInfo s) LogicalRepBeginData begin_data; logicalrep_read_begin(s, &begin_data); - set_apply_error_context_xact(begin_data.xid, begin_data.committime); + set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn, + begin_data.committime); remote_final_lsn = begin_data.final_lsn; @@ -839,7 +845,8 @@ apply_handle_begin_prepare(StringInfo s) errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); logicalrep_read_begin_prepare(s, &begin_data); - set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time); + set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn, + begin_data.prepare_time); remote_final_lsn = begin_data.prepare_lsn; @@ -938,7 +945,8 @@ apply_handle_commit_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_commit_prepared(s, &prepare_data); - set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time); + set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn, + prepare_data.commit_time); /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, @@ -979,7 +987,8 @@ apply_handle_rollback_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_rollback_prepared(s, &rollback_data); - set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time); + set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn, + rollback_data.rollback_time); /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, @@ -1044,7 +1053,8 @@ apply_handle_stream_prepare(StringInfo s) errmsg_internal("tablesync worker received a STREAM PREPARE message"))); logicalrep_read_stream_prepare(s, &prepare_data); - set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time); + set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn, + prepare_data.prepare_time); elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid); @@ -1126,7 +1136,7 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid transaction ID in streamed replication transaction"))); - set_apply_error_context_xact(stream_xid, 0); + set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr, 0); /* * Initialize the worker's stream_fileset if we haven't yet. This will be @@ -1215,7 +1225,7 @@ apply_handle_stream_abort(StringInfo s) */ if (xid == subxid) { - set_apply_error_context_xact(xid, 0); + set_apply_error_context_xact(xid, InvalidXLogRecPtr, 0); stream_cleanup_files(MyLogicalRepWorker->subid, xid); } else @@ -1241,7 +1251,7 @@ apply_handle_stream_abort(StringInfo s) bool found = false; char path[MAXPGPATH]; - set_apply_error_context_xact(subxid, 0); + set_apply_error_context_xact(subxid, InvalidXLogRecPtr, 0); subidx = -1; begin_replication_step(); @@ -1426,7 +1436,7 @@ apply_handle_stream_commit(StringInfo s) errmsg_internal("STREAM COMMIT message without STREAM STOP"))); xid = logicalrep_read_stream_commit(s, &commit_data); - set_apply_error_context_xact(xid, commit_data.committime); + set_apply_error_context_xact(xid, commit_data.commit_lsn, commit_data.committime); elog(DEBUG1, "received commit for streamed transaction %u", xid); @@ -3507,6 +3517,17 @@ ApplyWorkerMain(Datum main_arg) myslotname = MemoryContextStrdup(ApplyContext, syncslotname); pfree(syncslotname); + + /* + * Allocate the origin name in long-lived context for error context + * message + */ + ReplicationOriginNameForTablesync(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, + originname); } else { @@ -3550,6 +3571,13 @@ ApplyWorkerMain(Datum main_arg) * does some initializations on the upstream so let's still call it. */ (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + + /* + * Allocate the origin name in long-lived context for error context + * message + */ + apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, + originname); } /* @@ -3673,33 +3701,40 @@ apply_error_callback(void *arg) errcontext("processing remote data during \"%s\"", logicalrep_message_type(errarg->command)); else - errcontext("processing remote data during \"%s\" in transaction %u at %s", + errcontext("processing remote data during \"%s\" in transaction %u committed at LSN %X/%X and timestamp %s from replication origin \"%s\"", logicalrep_message_type(errarg->command), errarg->remote_xid, - (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)"); + LSN_FORMAT_ARGS(errarg->commit_lsn), + (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)", + errarg->origin_name); } else if (errarg->remote_attnum < 0) - errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u at %s", + errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u committed at LSN %X/%X and timestamp %s from replication origin \"%s\"", logicalrep_message_type(errarg->command), errarg->rel->remoterel.nspname, errarg->rel->remoterel.relname, errarg->remote_xid, - (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)"); + LSN_FORMAT_ARGS(errarg->commit_lsn), + (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)", + errarg->origin_name); else - errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u at %s", + errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u committed at LSN %X/%X and timestamp %s from replication origin \"%s\"", logicalrep_message_type(errarg->command), errarg->rel->remoterel.nspname, errarg->rel->remoterel.relname, errarg->rel->remoterel.attnames[errarg->remote_attnum], errarg->remote_xid, - (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)"); + LSN_FORMAT_ARGS(errarg->commit_lsn), + (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)", + errarg->origin_name); } /* Set transaction information of apply error callback */ static inline void -set_apply_error_context_xact(TransactionId xid, TimestampTz ts) +set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn, TimestampTz ts) { apply_error_callback_arg.remote_xid = xid; + apply_error_callback_arg.commit_lsn = lsn; apply_error_callback_arg.ts = ts; } @@ -3710,5 +3745,5 @@ reset_apply_error_context_info(void) apply_error_callback_arg.command = 0; apply_error_callback_arg.rel = NULL; apply_error_callback_arg.remote_attnum = -1; - set_apply_error_context_xact(InvalidTransactionId, 0); + set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr, 0); } -- 2.24.3 (Apple Git-128)
From fd5b78993d0e73144ceedad6dce29cf641eb06ed Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <sawada.mshk@gmail.com> Date: Mon, 28 Feb 2022 17:53:28 +0900 Subject: [PATCH 1/2] Use complete sentences in logical replication worker errcontext. Previously, the message for logical replication worker errcontext is incrementally built, which was not translation friendly. Instead, we use complete sentences with if-else branches. --- src/backend/replication/logical/worker.c | 49 ++++++++++++------------ 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5d9acc6173..ac49e73b45 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3662,38 +3662,37 @@ IsLogicalWorker(void) static void apply_error_callback(void *arg) { - StringInfoData buf; ApplyErrorCallbackArg *errarg = &apply_error_callback_arg; if (apply_error_callback_arg.command == 0) return; - initStringInfo(&buf); - appendStringInfo(&buf, _("processing remote data during \"%s\""), - logicalrep_message_type(errarg->command)); - - /* append relation information */ - if (errarg->rel) - { - appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""), - errarg->rel->remoterel.nspname, - errarg->rel->remoterel.relname); - if (errarg->remote_attnum >= 0) - appendStringInfo(&buf, _(" column \"%s\""), - errarg->rel->remoterel.attnames[errarg->remote_attnum]); - } - - /* append transaction information */ - if (TransactionIdIsNormal(errarg->remote_xid)) + if (errarg->rel == NULL) { - appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid); - if (errarg->ts != 0) - appendStringInfo(&buf, _(" at %s"), - timestamptz_to_str(errarg->ts)); + if (!TransactionIdIsValid(errarg->remote_xid)) + errcontext("processing remote data during \"%s\"", + logicalrep_message_type(errarg->command)); + else + errcontext("processing remote data during \"%s\" in transaction %u at %s", + logicalrep_message_type(errarg->command), + errarg->remote_xid, + (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)"); } - - errcontext("%s", buf.data); - pfree(buf.data); + else if (errarg->remote_attnum < 0) + errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u at %s", + logicalrep_message_type(errarg->command), + errarg->rel->remoterel.nspname, + errarg->rel->remoterel.relname, + errarg->remote_xid, + (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)"); + else + errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u at %s", + logicalrep_message_type(errarg->command), + errarg->rel->remoterel.nspname, + errarg->rel->remoterel.relname, + errarg->rel->remoterel.attnames[errarg->remote_attnum], + errarg->remote_xid, + (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)"); } /* Set transaction information of apply error callback */ -- 2.24.3 (Apple Git-128)