From dab41bb18d776d0e514d30441935c9ad5472293d Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Thu, 14 Aug 2025 19:16:10 +0900
Subject: [PATCH v2] pgoutput: allow cleaning up Relsync cache entries

---
 src/backend/replication/pgoutput/pgoutput.c | 224 ++++++++++++++------
 1 file changed, 155 insertions(+), 69 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 80540c017bd..d78de94f914 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -219,10 +219,15 @@ typedef struct PGOutputTxnData
 /* Map used to remember which relation schemas we sent. */
 static HTAB *RelationSyncCache = NULL;
 
+/* How many relsync cache entries have been invalidated? */
+static int RelationSyncCacheInvalidateCounter = 0;
+
 static void init_rel_sync_cache(MemoryContext cachectx);
-static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
+static void cleanup_streamed_txn(TransactionId xid, bool is_commit);
 static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
 											 Relation relation);
+static void cleanup_rel_sync_entry(RelationSyncEntry *entry);
+static void maybe_cleanup_rel_sync_cache(void);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
 									LogicalDecodingContext *ctx,
 									RelationSyncEntry *relentry);
@@ -628,6 +633,9 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
+
+	/* Cleanup RelSyncCache if needed */
+	maybe_cleanup_rel_sync_cache();
 }
 
 /*
@@ -659,6 +667,9 @@ pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
+
+	/* Cleanup RelSyncCache if needed */
+	maybe_cleanup_rel_sync_cache();
 }
 
 /*
@@ -1764,6 +1775,7 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 	{
 		hash_destroy(RelationSyncCache);
 		RelationSyncCache = NULL;
+		RelationSyncCacheInvalidateCounter = 0;
 	}
 }
 
@@ -1892,7 +1904,10 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
 
 	OutputPluginWrite(ctx, true);
 
-	cleanup_rel_sync_cache(toptxn->xid, false);
+	cleanup_streamed_txn(toptxn->xid, false);
+
+	/* Cleanup RelSyncCache if needed */
+	maybe_cleanup_rel_sync_cache();
 }
 
 /*
@@ -1919,7 +1934,10 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
 
-	cleanup_rel_sync_cache(txn->xid, true);
+	cleanup_streamed_txn(txn->xid, true);
+
+	/* Cleanup RelSyncCache if needed */
+	maybe_cleanup_rel_sync_cache();
 }
 
 /*
@@ -1938,6 +1956,9 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
+
+	/* Cleanup RelSyncCache if needed */
+	maybe_cleanup_rel_sync_cache();
 }
 
 /*
@@ -2060,6 +2081,11 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->columns = NULL;
 		entry->attrmap = NULL;
 	}
+	else
+	{
+		/* The entry would be re-used, decrement the counter */
+		RelationSyncCacheInvalidateCounter--;
+	}
 
 	/* Validate the entry */
 	if (!entry->replicate_valid)
@@ -2091,71 +2117,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			publications_valid = true;
 		}
 
-		/*
-		 * Reset schema_sent status as the relation definition may have
-		 * changed.  Also reset pubactions to empty in case rel was dropped
-		 * from a publication.  Also free any objects that depended on the
-		 * earlier definition.
-		 */
-		entry->schema_sent = false;
-		entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
-		list_free(entry->streamed_txns);
-		entry->streamed_txns = NIL;
-		bms_free(entry->columns);
-		entry->columns = NULL;
-		entry->pubactions.pubinsert = false;
-		entry->pubactions.pubupdate = false;
-		entry->pubactions.pubdelete = false;
-		entry->pubactions.pubtruncate = false;
-
-		/*
-		 * Tuple slots cleanups. (Will be rebuilt later if needed).
-		 */
-		if (entry->old_slot)
-		{
-			TupleDesc	desc = entry->old_slot->tts_tupleDescriptor;
-
-			Assert(desc->tdrefcount == -1);
-
-			ExecDropSingleTupleTableSlot(entry->old_slot);
-
-			/*
-			 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
-			 * do it now to avoid any leaks.
-			 */
-			FreeTupleDesc(desc);
-		}
-		if (entry->new_slot)
-		{
-			TupleDesc	desc = entry->new_slot->tts_tupleDescriptor;
-
-			Assert(desc->tdrefcount == -1);
-
-			ExecDropSingleTupleTableSlot(entry->new_slot);
-
-			/*
-			 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
-			 * do it now to avoid any leaks.
-			 */
-			FreeTupleDesc(desc);
-		}
-
-		entry->old_slot = NULL;
-		entry->new_slot = NULL;
-
-		if (entry->attrmap)
-			free_attrmap(entry->attrmap);
-		entry->attrmap = NULL;
-
-		/*
-		 * Row filter cache cleanups.
-		 */
-		if (entry->entry_cxt)
-			MemoryContextDelete(entry->entry_cxt);
-
-		entry->entry_cxt = NULL;
-		entry->estate = NULL;
-		memset(entry->exprstate, 0, sizeof(entry->exprstate));
+		/* Cleanup existing data */
+		cleanup_rel_sync_entry(entry);
 
 		/*
 		 * Build publication cache. We can't use one provided by relcache as
@@ -2311,6 +2274,124 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 	return entry;
 }
 
+/*
+ * Cleanup attributes in the given entry to reuse it.
+ */
+static void
+cleanup_rel_sync_entry(RelationSyncEntry *entry)
+{
+	/*
+	 * Reset schema_sent status as the relation definition may have changed.
+	 * Also reset pubactions to empty in case rel was dropped from a
+	 * publication.  Also free any objects that depended on the earlier
+	 * definition.
+	 */
+	entry->schema_sent = false;
+	entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
+	list_free(entry->streamed_txns);
+	entry->streamed_txns = NIL;
+	bms_free(entry->columns);
+	entry->columns = NULL;
+	entry->pubactions.pubinsert = false;
+	entry->pubactions.pubupdate = false;
+	entry->pubactions.pubdelete = false;
+	entry->pubactions.pubtruncate = false;
+
+	/*
+	 * Tuple slots cleanups. (Will be rebuilt later if needed).
+	 */
+	if (entry->old_slot)
+	{
+		TupleDesc	desc = entry->old_slot->tts_tupleDescriptor;
+
+		Assert(desc->tdrefcount == -1);
+
+		ExecDropSingleTupleTableSlot(entry->old_slot);
+
+		/*
+		 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+		 * it now to avoid any leaks.
+		 */
+		FreeTupleDesc(desc);
+	}
+	if (entry->new_slot)
+	{
+		TupleDesc	desc = entry->new_slot->tts_tupleDescriptor;
+
+		Assert(desc->tdrefcount == -1);
+
+		ExecDropSingleTupleTableSlot(entry->new_slot);
+
+		/*
+		 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+		 * it now to avoid any leaks.
+		 */
+		FreeTupleDesc(desc);
+	}
+
+	entry->old_slot = NULL;
+	entry->new_slot = NULL;
+
+	if (entry->attrmap)
+		free_attrmap(entry->attrmap);
+	entry->attrmap = NULL;
+
+	/*
+	 * Row filter cache cleanups.
+	 */
+	if (entry->entry_cxt)
+		MemoryContextDelete(entry->entry_cxt);
+
+	entry->entry_cxt = NULL;
+	entry->estate = NULL;
+	memset(entry->exprstate, 0, sizeof(entry->exprstate));
+}
+
+/*
+ * Cleanup invalidated entries in the cache.
+ *
+ * Whole entries must be checked to clean up the relsync cache. Since this
+ * might be the quite expensive task, the cleanup would be done when the
+ * invalidation happened many times.
+ */
+static void
+maybe_cleanup_rel_sync_cache(void)
+{
+#define NINVALIDATION_THRESHOLD 100
+
+	Assert(RelationSyncCache);
+
+	/*
+	 * Cleanup can be done if the when the invalidation happened many times.
+	 */
+	if (RelationSyncCacheInvalidateCounter > NINVALIDATION_THRESHOLD)
+	{
+		RelationSyncEntry *entry;
+		HASH_SEQ_STATUS status;
+
+		hash_seq_init(&status, RelationSyncCache);
+		while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+		{
+			/* Skip the valid entry */
+			if (entry->replicate_valid)
+				continue;
+
+			/* Cleanup the entry */
+			cleanup_rel_sync_entry(entry);
+
+			/* Remove the etnry from the cache */
+			if (hash_search(RelationSyncCache,
+							&entry->relid,
+							HASH_REMOVE,
+							NULL) == NULL)
+				elog(ERROR, "hash table corrupted");
+		}
+
+		/* Reset the counter */
+		RelationSyncCacheInvalidateCounter = 0;
+	}
+}
+
 /*
  * Cleanup list of streamed transactions and update the schema_sent flag.
  *
@@ -2323,7 +2404,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
  * cache - so tweak the schema_sent flag accordingly.
  */
 static void
-cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
+cleanup_streamed_txn(TransactionId xid, bool is_commit)
 {
 	HASH_SEQ_STATUS hash_seq;
 	RelationSyncEntry *entry;
@@ -2387,7 +2468,10 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 		entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
 												  HASH_FIND, NULL);
 		if (entry != NULL)
+		{
 			entry->replicate_valid = false;
+			RelationSyncCacheInvalidateCounter++;
+		}
 	}
 	else
 	{
@@ -2398,6 +2482,7 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 		while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
 		{
 			entry->replicate_valid = false;
+			RelationSyncCacheInvalidateCounter++;
 		}
 	}
 }
@@ -2429,6 +2514,7 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
 	{
 		entry->replicate_valid = false;
+		RelationSyncCacheInvalidateCounter++;
 	}
 }
 
-- 
2.47.1

