From fb423bb016482e8bf553f98485346540602f7bae Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Fri, 25 Aug 2023 17:07:13 +0800
Subject: [PATCH v1 1/3] Allocate RelationSyncCache of pgoutput under private
 memory context

The pgoutput module creates a map under cacheMemoryContext to remember the
relation schemas that have been sent.

However, this map is inherently local to an individual pgoutput instance.
Additionally, while the map is adequately cleaned up when shutting down the
output plugin, it is not properly cleared in error paths. Consequently,
subsequent retries may access the outdated map.

To improve it, the patch allocates the map under the memory context specific to
the output plugin and registers a memory context reset callback to clean it.

Note: Instead of completely moving the map variable into the output plugin
private data, we have to keep the static pointer variable for the map to be
accessed by the syscache callbacks. This is because syscache callbacks won't be
un-registered even after shutting down the output plugin, so we need a static
pointer to cache the map pointer so that callbacks can check it.
---
 src/backend/replication/pgoutput/pgoutput.c | 43 +++++++++++++++------
 1 file changed, 31 insertions(+), 12 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index b08ca55041..b3d515b59f 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -219,7 +219,8 @@ static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
 										  uint32 hashvalue);
-static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
+static void set_schema_sent_in_streamed_txn(PGOutputData *data,
+											RelationSyncEntry *entry,
 											TransactionId xid);
 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 											TransactionId xid);
@@ -520,7 +521,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		}
 
 		/* Initialize relation schema cache. */
-		init_rel_sync_cache(CacheMemoryContext);
+		init_rel_sync_cache(data->cachectx);
 	}
 	else
 	{
@@ -736,7 +737,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 	send_relation_and_attrs(relation, xid, ctx, relentry->columns);
 
 	if (in_streaming)
-		set_schema_sent_in_streamed_txn(relentry, topxid);
+		set_schema_sent_in_streamed_txn((PGOutputData *) ctx->output_plugin_private,
+										relentry, topxid);
 	else
 		relentry->schema_sent = true;
 }
@@ -1157,7 +1159,7 @@ init_tuple_slot(PGOutputData *data, Relation relation,
 		TupleDesc	outdesc = RelationGetDescr(ancestor);
 
 		/* Map must live as long as the session does. */
-		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+		oldctx = MemoryContextSwitchTo(data->cachectx);
 
 		entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
 
@@ -1689,11 +1691,11 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
 static void
 pgoutput_shutdown(LogicalDecodingContext *ctx)
 {
-	if (RelationSyncCache)
-	{
-		hash_destroy(RelationSyncCache);
-		RelationSyncCache = NULL;
-	}
+	/*
+	 * Don't need to destroy the hash table as it will be cleaned along with
+	 * logical decoding memory context.
+	 */
+	RelationSyncCache = NULL;
 }
 
 /*
@@ -1858,6 +1860,15 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
+/*
+ * Memory context reset callback for clearing the RelationSyncCache.
+ */
+static void
+reset_rel_sync_cache(void *arg)
+{
+	RelationSyncCache = NULL;
+}
+
 /*
  * Initialize the relation schema sync cache for a decoding session.
  *
@@ -1869,12 +1880,19 @@ static void
 init_rel_sync_cache(MemoryContext cachectx)
 {
 	HASHCTL		ctl;
+	MemoryContextCallback *mcallback;
 	static bool relation_callbacks_registered = false;
 
 	/* Nothing to do if hash table already exists */
 	if (RelationSyncCache != NULL)
 		return;
 
+	/* Ensure that the hash table is reset in error paths. */
+	mcallback = MemoryContextAllocZero(cachectx,
+									   sizeof(MemoryContextCallback));
+	mcallback->func = reset_rel_sync_cache;
+	MemoryContextRegisterResetCallback(cachectx, mcallback);
+
 	/* Make a new hash table for the cache */
 	ctl.keysize = sizeof(Oid);
 	ctl.entrysize = sizeof(RelationSyncEntry);
@@ -1930,11 +1948,12 @@ get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
  * of the relation.
  */
 static void
-set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
+set_schema_sent_in_streamed_txn(PGOutputData *data, RelationSyncEntry *entry,
+								TransactionId xid)
 {
 	MemoryContext oldctx;
 
-	oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+	oldctx = MemoryContextSwitchTo(data->cachectx);
 
 	entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
 
@@ -2005,7 +2024,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
 		{
-			oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+			oldctx = MemoryContextSwitchTo(data->cachectx);
 			if (data->publications)
 			{
 				list_free_deep(data->publications);
-- 
2.30.0.windows.2

