Hia,

We've added some information such as the command and the timestamp to
the error context message by commit abc0910e2. This patch adds further
information to it: replication origin name and commit-LSN.

This will be helpful for users to set the origin name and LSN to
pg_replication_origin_advance().

The errcontext message would become like follows:

*Before
ERROR:  duplicate key value violates unique constraint "test_pkey"
DETAIL:  Key (c)=(1) already exists.
CONTEXT:  processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 at 2022-02-28
20:59:56.005909+09

* After
ERROR:  duplicate key value violates unique constraint "test_pkey"
DETAIL:  Key (c)=(1) already exists.
CONTEXT:  processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 committed at LSN
0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+09 from replication
origin "pg_16395"

I'm a bit concerned that the message may be too long.

I've attached two patches: the first one changes
apply_error_callback() so that it uses complete sentences with if-else
blocks in order to have a translation work, the second patch adds the
origin name and commit-LSN to the errcontext message.

Regards,

--
Masahiko Sawada
EDB:  https://www.enterprisedb.com/
From dc6d97c71394c7c216920b9aa1d55bf33c5ac472 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 24 Feb 2022 16:56:58 +0900
Subject: [PATCH 2/2] Add the origin name and remote commit-LSN to logical
 replication worker errcontext.

This commits adds both the commit-LSN and replication origin name to
the existing error context message.

This will help users in specifying the origin name and commit-LSN to
pg_replication_origin_advance() SQL function to skip the particular transaction.
---
 doc/src/sgml/logical-replication.sgml    | 19 +++++--
 src/backend/replication/logical/worker.c | 71 ++++++++++++++++++------
 2 files changed, 67 insertions(+), 23 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 96b4886e08..a96cc21a1c 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -354,12 +354,21 @@
   <para>
    The resolution can be done either by changing data or permissions on the subscriber so
    that it does not conflict with the incoming change or by skipping the
-   transaction that conflicts with the existing data.  The transaction can be
-   skipped by calling the <link linkend="pg-replication-origin-advance">
+   transaction that conflicts with the existing data.  When a conflict produces
+   an error, it is shown in the subscriber's server logs as follows:
+<screen>
+ERROR:  duplicate key value violates unique constraint "test_pkey"
+DETAIL:  Key (c)=(1) already exists.
+CONTEXT:  processing remote data during "INSERT" for replication target relation "public.test" in transaction 725 committed at LSN 0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+00 from replication origin "pg_16395"
+</screen>
+   The LSN of the transaction that contains the change violating the constraint and
+   the replication origin name can be found from those outputs (LSN 0/14C0378 and
+   replication origin <literal>pg_16395</literal> in the above case).  The transaction
+   can be skipped by calling the <link linkend="pg-replication-origin-advance">
    <function>pg_replication_origin_advance()</function></link> function with
-   a <parameter>node_name</parameter> corresponding to the subscription name,
-   and a position.  The current position of origins can be seen in the
-   <link linkend="view-pg-replication-origin-status">
+   the <parameter>node_name</parameter> and the next LSN of the commit LSN
+   (i.e., 0/14C0379) from those outputs.  The current position of origins can be
+   seen in the <link linkend="view-pg-replication-origin-status">
    <structname>pg_replication_origin_status</structname></link> system view.
   </para>
  </sect1>
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ac49e73b45..3fe5f50806 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg
 	/* Remote node information */
 	int			remote_attnum;	/* -1 if invalid */
 	TransactionId remote_xid;
+	XLogRecPtr	commit_lsn;
+	char	   *origin_name;
 	TimestampTz ts;				/* commit, rollback, or prepare timestamp */
 } ApplyErrorCallbackArg;
 
@@ -235,6 +237,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
 	.rel = NULL,
 	.remote_attnum = -1,
 	.remote_xid = InvalidTransactionId,
+	.commit_lsn = InvalidXLogRecPtr,
+	.origin_name = NULL,
 	.ts = 0,
 };
 
@@ -334,7 +338,8 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
 
 /* Functions for apply error callback */
 static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
+static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn,
+												TimestampTz ts);
 static inline void reset_apply_error_context_info(void);
 
 /*
@@ -787,7 +792,8 @@ apply_handle_begin(StringInfo s)
 	LogicalRepBeginData begin_data;
 
 	logicalrep_read_begin(s, &begin_data);
-	set_apply_error_context_xact(begin_data.xid, begin_data.committime);
+	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn,
+								 begin_data.committime);
 
 	remote_final_lsn = begin_data.final_lsn;
 
@@ -839,7 +845,8 @@ apply_handle_begin_prepare(StringInfo s)
 				 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
 
 	logicalrep_read_begin_prepare(s, &begin_data);
-	set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time);
+	set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn,
+								 begin_data.prepare_time);
 
 	remote_final_lsn = begin_data.prepare_lsn;
 
@@ -938,7 +945,8 @@ apply_handle_commit_prepared(StringInfo s)
 	char		gid[GIDSIZE];
 
 	logicalrep_read_commit_prepared(s, &prepare_data);
-	set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
+	set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn,
+								 prepare_data.commit_time);
 
 	/* Compute GID for two_phase transactions. */
 	TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -979,7 +987,8 @@ apply_handle_rollback_prepared(StringInfo s)
 	char		gid[GIDSIZE];
 
 	logicalrep_read_rollback_prepared(s, &rollback_data);
-	set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
+	set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn,
+								 rollback_data.rollback_time);
 
 	/* Compute GID for two_phase transactions. */
 	TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1044,7 +1053,8 @@ apply_handle_stream_prepare(StringInfo s)
 				 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
 
 	logicalrep_read_stream_prepare(s, &prepare_data);
-	set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time);
+	set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn,
+								 prepare_data.prepare_time);
 
 	elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
 
@@ -1126,7 +1136,7 @@ apply_handle_stream_start(StringInfo s)
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("invalid transaction ID in streamed replication transaction")));
 
-	set_apply_error_context_xact(stream_xid, 0);
+	set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr, 0);
 
 	/*
 	 * Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1215,7 +1225,7 @@ apply_handle_stream_abort(StringInfo s)
 	 */
 	if (xid == subxid)
 	{
-		set_apply_error_context_xact(xid, 0);
+		set_apply_error_context_xact(xid, InvalidXLogRecPtr, 0);
 		stream_cleanup_files(MyLogicalRepWorker->subid, xid);
 	}
 	else
@@ -1241,7 +1251,7 @@ apply_handle_stream_abort(StringInfo s)
 		bool		found = false;
 		char		path[MAXPGPATH];
 
-		set_apply_error_context_xact(subxid, 0);
+		set_apply_error_context_xact(subxid, InvalidXLogRecPtr, 0);
 
 		subidx = -1;
 		begin_replication_step();
@@ -1426,7 +1436,7 @@ apply_handle_stream_commit(StringInfo s)
 				 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
 
 	xid = logicalrep_read_stream_commit(s, &commit_data);
-	set_apply_error_context_xact(xid, commit_data.committime);
+	set_apply_error_context_xact(xid, commit_data.commit_lsn, commit_data.committime);
 
 	elog(DEBUG1, "received commit for streamed transaction %u", xid);
 
@@ -3507,6 +3517,17 @@ ApplyWorkerMain(Datum main_arg)
 		myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
 
 		pfree(syncslotname);
+
+		/*
+		 * Allocate the origin name in long-lived context for error context
+		 * message
+		 */
+		ReplicationOriginNameForTablesync(MySubscription->oid,
+										  MyLogicalRepWorker->relid,
+										  originname,
+										  sizeof(originname));
+		apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+																   originname);
 	}
 	else
 	{
@@ -3550,6 +3571,13 @@ ApplyWorkerMain(Datum main_arg)
 		 * does some initializations on the upstream so let's still call it.
 		 */
 		(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+		/*
+		 * Allocate the origin name in long-lived context for error context
+		 * message
+		 */
+		apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+																   originname);
 	}
 
 	/*
@@ -3673,33 +3701,40 @@ apply_error_callback(void *arg)
 			errcontext("processing remote data during \"%s\"",
 					   logicalrep_message_type(errarg->command));
 		else
-			errcontext("processing remote data during \"%s\" in transaction %u at %s",
+			errcontext("processing remote data during \"%s\" in transaction %u committed at LSN %X/%X and timestamp %s from replication origin \"%s\"",
 					   logicalrep_message_type(errarg->command),
 					   errarg->remote_xid,
-					   (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");
+					   LSN_FORMAT_ARGS(errarg->commit_lsn),
+					   (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)",
+					   errarg->origin_name);
 	}
 	else if (errarg->remote_attnum < 0)
-		errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u at %s",
+		errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u committed at LSN %X/%X and timestamp %s from replication origin \"%s\"",
 				   logicalrep_message_type(errarg->command),
 				   errarg->rel->remoterel.nspname,
 				   errarg->rel->remoterel.relname,
 				   errarg->remote_xid,
-				   (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");
+				   LSN_FORMAT_ARGS(errarg->commit_lsn),
+				   (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)",
+				   errarg->origin_name);
 	else
-		errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u at %s",
+		errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u committed at LSN %X/%X and timestamp %s from replication origin \"%s\"",
 				   logicalrep_message_type(errarg->command),
 				   errarg->rel->remoterel.nspname,
 				   errarg->rel->remoterel.relname,
 				   errarg->rel->remoterel.attnames[errarg->remote_attnum],
 				   errarg->remote_xid,
-				   (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");
+				   LSN_FORMAT_ARGS(errarg->commit_lsn),
+				   (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)",
+				   errarg->origin_name);
 }
 
 /* Set transaction information of apply error callback */
 static inline void
-set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
+set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn, TimestampTz ts)
 {
 	apply_error_callback_arg.remote_xid = xid;
+	apply_error_callback_arg.commit_lsn = lsn;
 	apply_error_callback_arg.ts = ts;
 }
 
@@ -3710,5 +3745,5 @@ reset_apply_error_context_info(void)
 	apply_error_callback_arg.command = 0;
 	apply_error_callback_arg.rel = NULL;
 	apply_error_callback_arg.remote_attnum = -1;
-	set_apply_error_context_xact(InvalidTransactionId, 0);
+	set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr, 0);
 }
-- 
2.24.3 (Apple Git-128)

From fd5b78993d0e73144ceedad6dce29cf641eb06ed Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 28 Feb 2022 17:53:28 +0900
Subject: [PATCH 1/2] Use complete sentences in logical replication worker
 errcontext.

Previously, the message for logical replication worker errcontext is
incrementally built, which was not translation friendly.  Instead, we
use complete sentences with if-else branches.
---
 src/backend/replication/logical/worker.c | 49 ++++++++++++------------
 1 file changed, 24 insertions(+), 25 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5d9acc6173..ac49e73b45 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3662,38 +3662,37 @@ IsLogicalWorker(void)
 static void
 apply_error_callback(void *arg)
 {
-	StringInfoData buf;
 	ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
 
 	if (apply_error_callback_arg.command == 0)
 		return;
 
-	initStringInfo(&buf);
-	appendStringInfo(&buf, _("processing remote data during \"%s\""),
-					 logicalrep_message_type(errarg->command));
-
-	/* append relation information */
-	if (errarg->rel)
-	{
-		appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""),
-						 errarg->rel->remoterel.nspname,
-						 errarg->rel->remoterel.relname);
-		if (errarg->remote_attnum >= 0)
-			appendStringInfo(&buf, _(" column \"%s\""),
-							 errarg->rel->remoterel.attnames[errarg->remote_attnum]);
-	}
-
-	/* append transaction information */
-	if (TransactionIdIsNormal(errarg->remote_xid))
+	if (errarg->rel == NULL)
 	{
-		appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);
-		if (errarg->ts != 0)
-			appendStringInfo(&buf, _(" at %s"),
-							 timestamptz_to_str(errarg->ts));
+		if (!TransactionIdIsValid(errarg->remote_xid))
+			errcontext("processing remote data during \"%s\"",
+					   logicalrep_message_type(errarg->command));
+		else
+			errcontext("processing remote data during \"%s\" in transaction %u at %s",
+					   logicalrep_message_type(errarg->command),
+					   errarg->remote_xid,
+					   (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");
 	}
-
-	errcontext("%s", buf.data);
-	pfree(buf.data);
+	else if (errarg->remote_attnum < 0)
+		errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u at %s",
+				   logicalrep_message_type(errarg->command),
+				   errarg->rel->remoterel.nspname,
+				   errarg->rel->remoterel.relname,
+				   errarg->remote_xid,
+				   (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");
+	else
+		errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u at %s",
+				   logicalrep_message_type(errarg->command),
+				   errarg->rel->remoterel.nspname,
+				   errarg->rel->remoterel.relname,
+				   errarg->rel->remoterel.attnames[errarg->remote_attnum],
+				   errarg->remote_xid,
+				   (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");
 }
 
 /* Set transaction information of apply error callback */
-- 
2.24.3 (Apple Git-128)

Reply via email to