From 052c9df805da4be6a0afcd28690e004a20e01221 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumarb@google.com>
Date: Sun, 14 Sep 2025 08:38:22 +0530
Subject: [PATCH v2 1/2] Add configurable conflict log history table for
 Logical Replication

This patch adds a feature to provide a structured, queryable record of all
logical replication conflicts. The current approach of logging conflicts as
plain text in the server logs makes it difficult to query, analyze, and
use for external monitoring and automation.

This patch addresses these limitations by introducing a configurable
conflict_log_table option in the CREATE SUBSCRIPTION command. Key design
decisions include:

User-Defined Table: The conflict log is stored in a user-managed table
rather than a system catalog.

Structured Data: Conflict details, including the original and remote tuples,
are stored in JSON columns, providing a flexible format to accommodate different
table schemas.

Comprehensive Information: The log table captures essential attributes such as
local and remote transaction IDs, LSNs, commit timestamps, and conflict type,
providing a complete record for post-mortem analysis.

This feature will make logical replication conflicts easier to monitor and manage,
significantly improving the overall resilience and operability of replication setups.
---
 src/backend/commands/subscriptioncmds.c    | 127 ++++++++++++++-
 src/backend/replication/logical/conflict.c | 170 +++++++++++++++++++++
 src/backend/replication/logical/worker.c   |  10 +-
 src/backend/utils/cache/lsyscache.c        |  37 +++++
 src/include/catalog/pg_subscription.h      |   5 +
 src/include/replication/conflict.h         |   2 +
 src/include/replication/worker_internal.h  |   4 +
 src/include/utils/lsyscache.h              |   1 +
 8 files changed, 352 insertions(+), 4 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 750d262fcca..dbaeed4b0b1 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
 #include "executor/executor.h"
+#include "executor/spi.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
@@ -47,10 +48,12 @@
 #include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
+#include "utils/regproc.h"
 #include "utils/syscache.h"
 
 /*
@@ -75,6 +78,7 @@
 #define SUBOPT_MAX_RETENTION_DURATION	0x00008000
 #define SUBOPT_LSN					0x00010000
 #define SUBOPT_ORIGIN				0x00020000
+#define SUBOPT_CONFLICT_TABLE		0x00030000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -103,6 +107,7 @@ typedef struct SubOpts
 	bool		retaindeadtuples;
 	int32		maxretention;
 	char	   *origin;
+	char	   *conflicttable;
 	XLogRecPtr	lsn;
 } SubOpts;
 
@@ -118,7 +123,7 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub,
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
 static void CheckAlterSubOption(Subscription *sub, const char *option,
 								bool slot_needs_update, bool isTopLevel);
-
+static void CreateConflictHistoryTable(Oid namespaceId, char *conflictrel);
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -174,6 +179,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->maxretention = 0;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_CONFLICT_TABLE))
+		opts->conflicttable = NULL;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -385,6 +392,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_CONFLICT_TABLE) &&
+				 strcmp(defel->defname, "conflict_log_table") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_CONFLICT_TABLE))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_CONFLICT_TABLE;
+			opts->conflicttable = defGetString(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -560,6 +576,65 @@ publicationListToArray(List *publist)
 	return PointerGetDatum(arr);
 }
 
+/*
+ * CreateConflictHistoryTable: Create conflict log history table.
+ *
+ * The subscription creator becomes the owner of this table and has all
+ * privileges on it.
+ */
+static void
+CreateConflictHistoryTable(Oid namespaceId, char *conflictrel)
+{
+	StringInfoData 	querybuf;
+
+	/*
+	 * Check if table with same name already present, if so report and error
+	 * as currently we do not support user created table as conflict history
+	 * table.
+	 */
+	if (OidIsValid(get_relname_relid(conflictrel, namespaceId)))
+		ereport(ERROR,
+				(errcode(ERRCODE_DUPLICATE_OBJECT),
+				 errmsg("table \"%s.%s\" already exists",
+						get_namespace_name(namespaceId), conflictrel)));
+
+	initStringInfo(&querybuf);
+
+	/*
+	 * Build and execute the CREATE TABLE query.
+	 */
+	appendStringInfo(&querybuf,
+					 "CREATE TABLE %s.%s ("
+					 "relid	Oid,"
+					 "local_xid xid,"
+					 "remote_xid xid,"
+					 "local_lsn pg_lsn,"
+					 "remote_commit_lsn pg_lsn,"
+					 "local_commit_ts TIMESTAMPTZ,"
+					 "remote_commit_ts TIMESTAMPTZ,"
+					 "table_schema	TEXT,"
+					 "table_name	TEXT,"
+					 "conflict_type TEXT,"
+					 "local_origin	TEXT,"
+					 "remote_origin	TEXT,"
+					 "key_tuple		JSON,"
+					 "local_tuple	JSON,"
+					 "remote_tuple	JSON)",
+					 quote_identifier(get_namespace_name(namespaceId)),
+					 quote_identifier(conflictrel));
+
+	if (SPI_connect() != SPI_OK_CONNECT)
+		elog(ERROR, "SPI_connect failed");
+
+	if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY)
+		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+	if (SPI_finish() != SPI_OK_FINISH)
+		elog(ERROR, "SPI_finish failed");
+
+	pfree(querybuf.data);
+}
+
 /*
  * Create new subscription.
  */
@@ -580,6 +655,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	bits32		supported_opts;
 	SubOpts		opts = {0};
 	AclResult	aclresult;
+	Oid			conflict_table_nspid;
+	char	   *conflict_table;
 
 	/*
 	 * Parse and check options.
@@ -593,7 +670,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
 					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
 					  SUBOPT_RETAIN_DEAD_TUPLES |
-					  SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
+					  SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN |
+					  SUBOPT_CONFLICT_TABLE);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -728,6 +806,25 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_suborigin - 1] =
 		CStringGetTextDatum(opts.origin);
 
+	/*
+	 * If a conflict log history table name is specified, parse the schema and
+	 * table name from the string. Store the namespace OID and the table name in
+	 * the pg_subscription catalog tuple.
+	 */
+	if (opts.conflicttable)
+	{
+		List   *names = stringToQualifiedNameList(opts.conflicttable, NULL);
+
+		conflict_table_nspid =
+				QualifiedNameGetCreationNamespace(names, &conflict_table);
+		values[Anum_pg_subscription_subconflictnspid - 1] =
+					ObjectIdGetDatum(conflict_table_nspid);
+		values[Anum_pg_subscription_subconflicttable - 1] =
+					CStringGetTextDatum(conflict_table);
+	}
+	else
+		nulls[Anum_pg_subscription_subconflicttable - 1] = true;
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
@@ -739,6 +836,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
 	replorigin_create(originname);
 
+	/* If conflict log history table name is given than create the table. */
+	if (opts.conflicttable)
+		CreateConflictHistoryTable(conflict_table_nspid, conflict_table);
+
 	/*
 	 * Connect to remote side to execute requested commands and fetch table
 	 * info.
@@ -1272,7 +1373,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
 								  SUBOPT_RETAIN_DEAD_TUPLES |
 								  SUBOPT_MAX_RETENTION_DURATION |
-								  SUBOPT_ORIGIN);
+								  SUBOPT_ORIGIN |
+								  SUBOPT_CONFLICT_TABLE);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1527,6 +1629,25 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					origin = opts.origin;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_TABLE))
+				{
+					Oid		nspid;
+					char   *relname = NULL;
+					List   *names =
+						stringToQualifiedNameList(opts.conflicttable, NULL);
+
+					nspid = QualifiedNameGetCreationNamespace(names, &relname);
+					values[Anum_pg_subscription_subconflictnspid - 1] =
+								ObjectIdGetDatum(nspid);
+					values[Anum_pg_subscription_subconflicttable - 1] =
+						CStringGetTextDatum(relname);
+
+					replaces[Anum_pg_subscription_subconflictnspid - 1] = true;
+					replaces[Anum_pg_subscription_subconflicttable - 1] = true;
+
+					CreateConflictHistoryTable(nspid, relname);
+				}
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 16695592265..b1658977aed 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -15,13 +15,23 @@
 #include "postgres.h"
 
 #include "access/commit_ts.h"
+#include "access/heapam.h"
 #include "access/tableam.h"
+#include "access/table.h"
+#include "catalog/indexing.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_namespace_d.h"
+#include "catalog/pg_type.h"
 #include "executor/executor.h"
+#include "executor/spi.h"
 #include "pgstat.h"
 #include "replication/conflict.h"
 #include "replication/worker_internal.h"
 #include "storage/lmgr.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
 
 static const char *const ConflictTypeNames[] = {
 	[CT_INSERT_EXISTS] = "insert_exists",
@@ -52,6 +62,16 @@ static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
 									   Oid indexoid);
 static char *build_index_value_desc(EState *estate, Relation localrel,
 									TupleTableSlot *slot, Oid indexoid);
+static Datum TupleTableSlotToJsonDatum(TupleTableSlot *slot);
+
+static void InsertConflictLog(Relation rel,
+							  TransactionId local_xid,
+							  TimestampTz local_ts,
+							  ConflictType conflict_type,
+							  RepOriginId origin_id,
+							  TupleTableSlot *searchslot,
+							  TupleTableSlot *localslot,
+							  TupleTableSlot *remoteslot);
 
 /*
  * Get the xmin and commit timestamp data (origin and timestamp) associated
@@ -112,6 +132,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 
 	/* Form errdetail message by combining conflicting tuples information. */
 	foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
+	{
 		errdetail_apply_conflict(estate, relinfo, type, searchslot,
 								 conflicttuple->slot, remoteslot,
 								 conflicttuple->indexoid,
@@ -120,6 +141,15 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 								 conflicttuple->ts,
 								 &err_detail);
 
+		/* Insert conflict details to log history table. */
+		InsertConflictLog(relinfo->ri_RelationDesc,
+						  conflicttuple->xmin,
+						  conflicttuple->ts, type,
+						  conflicttuple->origin,
+						  searchslot, conflicttuple->slot,
+						  remoteslot);
+	}
+
 	pgstat_report_subscription_conflict(MySubscription->oid, type);
 
 	ereport(elevel,
@@ -525,3 +555,143 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
 
 	return index_value;
 }
+
+/*
+ * Helper function to convert a TupleTableSlot to Jsonb
+ *
+ * This would be a new internal helper function for logical replication
+ * Needs to handle various data types and potentially TOASTed data
+ */
+static Datum
+TupleTableSlotToJsonDatum(TupleTableSlot *slot)
+{
+	HeapTuple	tuple = ExecCopySlotHeapTuple(slot);
+	Datum		datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor);
+	Datum		json;
+
+	if (TupIsNull(slot))
+		return 0;
+
+	json = DirectFunctionCall1(row_to_json, datum);
+	heap_freetuple(tuple);
+
+	return json;
+}
+
+/*
+ * InsertConflictLog
+ *
+ * Insert details about a logical replication conflict to a conflict history
+ * table.
+ */
+static void
+InsertConflictLog(Relation rel, TransactionId local_xid, TimestampTz local_ts,
+				  ConflictType conflict_type, RepOriginId origin_id,
+				  TupleTableSlot *searchslot, TupleTableSlot *localslot,
+				  TupleTableSlot *remoteslot)
+{
+	Datum		values[MAX_CONFLICT_ATTR_NUM];
+	bool		nulls[MAX_CONFLICT_ATTR_NUM];
+	Oid			nspid;
+	Oid			relid;
+	Relation	conflictrel;
+	int			attno;
+	int			options = HEAP_INSERT_NO_LOGICAL;
+	char	   *relname;
+	char	   *origin = NULL;
+	char	   *remote_origin = NULL;
+	HeapTuple	tup;
+	XLogRecPtr	local_lsn = 0;
+
+	/* If conflict history is not enabled for the subscription just return. */
+	relname = get_subscription_conflictrel(MyLogicalRepWorker->subid, &nspid);
+	if (relname == NULL)
+		return;
+
+	/* TODO: proper error code */
+	relid = get_relname_relid(relname, nspid);
+	if (!OidIsValid(relid))
+		elog(ERROR, "conflict log history table does not exists");
+	conflictrel = table_open(relid, RowExclusiveLock);
+	if (conflictrel == NULL)
+		elog(ERROR, "could not open conflict log history table");
+
+
+	/* Initialize values and nulls arrays */
+	memset(values, 0, sizeof(Datum) * MAX_CONFLICT_ATTR_NUM);
+	memset(nulls, 0, sizeof(bool) * MAX_CONFLICT_ATTR_NUM);
+
+	/* Populate the values and nulls arrays */
+	attno = 0;
+	values[attno] = ObjectIdGetDatum(RelationGetRelid(rel));
+	attno++;
+
+	values[attno] = TransactionIdGetDatum(local_xid);
+	attno++;
+
+	values[attno] = TransactionIdGetDatum(remote_xid);
+	attno++;
+
+	values[attno] = LSNGetDatum(local_lsn);
+	attno++;
+
+	values[attno] = LSNGetDatum(remote_final_lsn);
+	attno++;
+
+	values[attno] = TimestampTzGetDatum(local_ts);
+	attno++;
+
+	values[attno] = TimestampTzGetDatum(remote_commit_ts);
+	attno++;
+
+	values[attno] =
+			CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel)));
+	attno++;
+
+	values[attno] = CStringGetTextDatum(RelationGetRelationName(rel));
+	attno++;
+
+	values[attno] = CStringGetTextDatum(ConflictTypeNames[conflict_type]);
+	attno++;
+
+	if (origin_id != InvalidRepOriginId)
+		replorigin_by_oid(origin_id, true, &origin);
+
+	if (origin != NULL)
+		values[attno] = CStringGetTextDatum(origin);
+	else
+		nulls[attno] = true;
+	attno++;
+
+	if (replorigin_session_origin != InvalidRepOriginId)
+		replorigin_by_oid(replorigin_session_origin, true, &remote_origin);
+
+	if (remote_origin != NULL)
+		values[attno] = CStringGetTextDatum(remote_origin);
+	else
+		nulls[attno] = true;
+	attno++;
+
+	if (searchslot != NULL)
+		values[attno] = TupleTableSlotToJsonDatum(searchslot);
+	else
+		nulls[attno] = true;
+	attno++;
+
+	if (localslot != NULL)
+		values[attno] = TupleTableSlotToJsonDatum(localslot);
+	else
+		nulls[attno] = true;
+	attno++;
+
+	if (remoteslot != NULL)
+		values[attno] = TupleTableSlotToJsonDatum(remoteslot);
+	else
+		nulls[attno] = true;
+
+	tup = heap_form_tuple(RelationGetDescr(conflictrel), values, nulls);
+	heap_insert(conflictrel, tup, GetCurrentCommandId(true), options, NULL);
+	table_close(conflictrel, RowExclusiveLock);
+
+	pfree(relname);
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 419e478b4c6..ffd40857329 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -482,7 +482,9 @@ static bool MySubscriptionValid = false;
 static List *on_commit_wakeup_workers_subids = NIL;
 
 bool		in_remote_transaction = false;
-static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+TransactionId	remote_xid = InvalidTransactionId;
+TimestampTz	remote_commit_ts = 0;
 
 /* fields valid only when processing streamed transaction */
 static bool in_streamed_transaction = false;
@@ -1213,6 +1215,8 @@ apply_handle_begin(StringInfo s)
 	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
 
 	remote_final_lsn = begin_data.final_lsn;
+	remote_commit_ts = begin_data.committime;
+	remote_xid = begin_data.xid;
 
 	maybe_start_skipping_changes(begin_data.final_lsn);
 
@@ -1724,6 +1728,10 @@ apply_handle_stream_start(StringInfo s)
 	/* extract XID of the top-level transaction */
 	stream_xid = logicalrep_read_stream_start(s, &first_segment);
 
+	remote_xid = stream_xid;
+	remote_final_lsn = InvalidXLogRecPtr;
+	remote_commit_ts = 0;
+
 	if (!TransactionIdIsValid(stream_xid))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index fa7cd7e06a7..abecdf9f6dc 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -3881,3 +3881,40 @@ get_subscription_name(Oid subid, bool missing_ok)
 
 	return subname;
 }
+
+/*
+ * get_subscription_conflictrel
+ *
+ * Get conflict relation name and namespace id from subscription.
+ */
+char *
+get_subscription_conflictrel(Oid subid, Oid *nspid)
+{
+	HeapTuple	tup;
+	Datum		datum;
+	bool		isnull;
+	char	   *relname;
+	Form_pg_subscription subform;
+
+	tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+
+	if (!HeapTupleIsValid(tup))
+		return NULL;
+
+	subform = (Form_pg_subscription) GETSTRUCT(tup);
+
+	/* Get conflict table name */
+	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+							tup,
+							Anum_pg_subscription_subconflicttable,
+							&isnull);
+	if (isnull)
+		return NULL;
+
+	*nspid = subform->subconflictnspid;
+	relname = pstrdup(TextDatumGetCString(datum));
+
+	ReleaseSysCache(tup);
+
+	return relname;
+}
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 55cb9b1eefa..ec31e2b1d56 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -80,6 +80,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	bool		subretaindeadtuples;	/* True if dead tuples useful for
 										 * conflict detection are retained */
+	Oid			subconflictnspid;	/* Namespace Oid in which the conflict history
+									 * table is created. */
 
 	int32		submaxretention;	/* The maximum duration (in milliseconds)
 									 * for which information useful for
@@ -105,6 +107,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	/* Only publish data originating from the specified origin */
 	text		suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY);
+
+	/* conflict log history table name if valid */
+	text		subconflicttable;
 #endif
 } FormData_pg_subscription;
 
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index c8fbf9e51b8..adc46e79286 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -62,6 +62,7 @@ typedef enum
 } ConflictType;
 
 #define CONFLICT_NUM_TYPES (CT_MULTIPLE_UNIQUE_CONFLICTS + 1)
+#define	MAX_CONFLICT_ATTR_NUM	15
 
 /*
  * Information for the existing local row that caused the conflict.
@@ -89,4 +90,5 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
 								TupleTableSlot *remoteslot,
 								List *conflicttuples);
 extern void InitConflictIndexes(ResultRelInfo *relInfo);
+
 #endif
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index de003802612..84bd6383615 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -251,6 +251,10 @@ extern PGDLLIMPORT bool in_remote_transaction;
 
 extern PGDLLIMPORT bool InitializingApplyWorker;
 
+extern XLogRecPtr remote_final_lsn;
+extern TimestampTz remote_commit_ts;
+extern TransactionId	remote_xid;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index 50fb149e9ac..dc6df5843a4 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -210,6 +210,7 @@ extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
 extern char *get_publication_name(Oid pubid, bool missing_ok);
 extern Oid	get_subscription_oid(const char *subname, bool missing_ok);
 extern char *get_subscription_name(Oid subid, bool missing_ok);
+extern char *get_subscription_conflictrel(Oid subid, Oid *nspid);
 
 #define type_is_array(typid)  (get_element_type(typid) != InvalidOid)
 /* type_is_array_domain accepts both plain arrays and domains over arrays */
-- 
2.49.0

