From 4898cb800b699c439c284ceff611311cf4e43c9d Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Thu, 14 Aug 2025 19:16:10 +0900
Subject: [PATCH v3] pgoutput: allow cleaning up Relsync cache entries

A list invalidated_caches is introduced to track invalidated entries. The number
of invalid entries is checked, and they are cleaned up when it exceeds the
threshold. The sweep can happen after a replicated COMMIT/ABORT/PREPARE message.
---
 src/backend/replication/pgoutput/pgoutput.c | 244 ++++++++++++++------
 src/test/recovery/t/100_cachectx_oom.pl     | 188 +++++++++++++++
 2 files changed, 363 insertions(+), 69 deletions(-)
 create mode 100644 src/test/recovery/t/100_cachectx_oom.pl

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 80540c017bd..b5ea100b599 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -185,6 +185,16 @@ typedef struct RelationSyncEntry
 	 * row filter expressions, column list, etc.
 	 */
 	MemoryContext entry_cxt;
+
+	/*
+	 * A node in the list of invalidated entries.
+	 */
+	dlist_node	invalidated_node;
+
+	/*
+	 * Have this already been pushed to the invalidated list?
+	 */
+	bool		already_pushed;
 } RelationSyncEntry;
 
 /*
@@ -219,10 +229,15 @@ typedef struct PGOutputTxnData
 /* Map used to remember which relation schemas we sent. */
 static HTAB *RelationSyncCache = NULL;
 
+/* RelSyncCache entries which have been invalidated once */
+static dclist_head invalidated_caches = DCLIST_STATIC_INIT(invalidated_caches);
+
 static void init_rel_sync_cache(MemoryContext cachectx);
-static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
+static void cleanup_streamed_txn(TransactionId xid, bool is_commit);
 static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
 											 Relation relation);
+static void cleanup_rel_sync_entry(RelationSyncEntry *entry);
+static void maybe_cleanup_rel_sync_cache(void);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
 									LogicalDecodingContext *ctx,
 									RelationSyncEntry *relentry);
@@ -628,6 +643,9 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
+
+	/* Cleanup RelSyncCache if needed */
+	maybe_cleanup_rel_sync_cache();
 }
 
 /*
@@ -659,6 +677,9 @@ pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
+
+	/* Cleanup RelSyncCache if needed */
+	maybe_cleanup_rel_sync_cache();
 }
 
 /*
@@ -1764,6 +1785,7 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 	{
 		hash_destroy(RelationSyncCache);
 		RelationSyncCache = NULL;
+		dclist_init(&invalidated_caches);
 	}
 }
 
@@ -1892,7 +1914,10 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
 
 	OutputPluginWrite(ctx, true);
 
-	cleanup_rel_sync_cache(toptxn->xid, false);
+	cleanup_streamed_txn(toptxn->xid, false);
+
+	/* Cleanup RelSyncCache if needed */
+	maybe_cleanup_rel_sync_cache();
 }
 
 /*
@@ -1919,7 +1944,10 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
 
-	cleanup_rel_sync_cache(txn->xid, true);
+	cleanup_streamed_txn(txn->xid, true);
+
+	/* Cleanup RelSyncCache if needed */
+	maybe_cleanup_rel_sync_cache();
 }
 
 /*
@@ -1938,6 +1966,9 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
+
+	/* Cleanup RelSyncCache if needed */
+	maybe_cleanup_rel_sync_cache();
 }
 
 /*
@@ -1968,6 +1999,8 @@ init_rel_sync_cache(MemoryContext cachectx)
 
 	Assert(RelationSyncCache != NULL);
 
+	dclist_init(&invalidated_caches);
+
 	/* No more to do if we already registered callbacks */
 	if (relation_callbacks_registered)
 		return;
@@ -2059,6 +2092,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->publish_as_relid = InvalidOid;
 		entry->columns = NULL;
 		entry->attrmap = NULL;
+		entry->already_pushed = false;
 	}
 
 	/* Validate the entry */
@@ -2091,71 +2125,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			publications_valid = true;
 		}
 
-		/*
-		 * Reset schema_sent status as the relation definition may have
-		 * changed.  Also reset pubactions to empty in case rel was dropped
-		 * from a publication.  Also free any objects that depended on the
-		 * earlier definition.
-		 */
-		entry->schema_sent = false;
-		entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
-		list_free(entry->streamed_txns);
-		entry->streamed_txns = NIL;
-		bms_free(entry->columns);
-		entry->columns = NULL;
-		entry->pubactions.pubinsert = false;
-		entry->pubactions.pubupdate = false;
-		entry->pubactions.pubdelete = false;
-		entry->pubactions.pubtruncate = false;
-
-		/*
-		 * Tuple slots cleanups. (Will be rebuilt later if needed).
-		 */
-		if (entry->old_slot)
-		{
-			TupleDesc	desc = entry->old_slot->tts_tupleDescriptor;
-
-			Assert(desc->tdrefcount == -1);
-
-			ExecDropSingleTupleTableSlot(entry->old_slot);
-
-			/*
-			 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
-			 * do it now to avoid any leaks.
-			 */
-			FreeTupleDesc(desc);
-		}
-		if (entry->new_slot)
-		{
-			TupleDesc	desc = entry->new_slot->tts_tupleDescriptor;
-
-			Assert(desc->tdrefcount == -1);
-
-			ExecDropSingleTupleTableSlot(entry->new_slot);
-
-			/*
-			 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
-			 * do it now to avoid any leaks.
-			 */
-			FreeTupleDesc(desc);
-		}
-
-		entry->old_slot = NULL;
-		entry->new_slot = NULL;
-
-		if (entry->attrmap)
-			free_attrmap(entry->attrmap);
-		entry->attrmap = NULL;
-
-		/*
-		 * Row filter cache cleanups.
-		 */
-		if (entry->entry_cxt)
-			MemoryContextDelete(entry->entry_cxt);
-
-		entry->entry_cxt = NULL;
-		entry->estate = NULL;
-		memset(entry->exprstate, 0, sizeof(entry->exprstate));
+		/* Cleanup existing data */
+		cleanup_rel_sync_entry(entry);
 
 		/*
 		 * Build publication cache. We can't use one provided by relcache as
@@ -2311,6 +2282,118 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 	return entry;
 }
 
+/*
+ * Cleanup attributes in the given entry to reuse it.
+ */
+static void
+cleanup_rel_sync_entry(RelationSyncEntry *entry)
+{
+	entry->schema_sent = false;
+	entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
+	list_free(entry->streamed_txns);
+	entry->streamed_txns = NIL;
+	bms_free(entry->columns);
+	entry->columns = NULL;
+	entry->pubactions.pubinsert = false;
+	entry->pubactions.pubupdate = false;
+	entry->pubactions.pubdelete = false;
+	entry->pubactions.pubtruncate = false;
+
+	/*
+	 * Tuple slots cleanups. (Will be rebuilt later if needed).
+	 */
+	if (entry->old_slot)
+	{
+		TupleDesc	desc = entry->old_slot->tts_tupleDescriptor;
+
+		Assert(desc->tdrefcount == -1);
+
+		ExecDropSingleTupleTableSlot(entry->old_slot);
+
+		/*
+		 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+		 * it now to avoid any leaks.
+		 */
+		FreeTupleDesc(desc);
+	}
+	if (entry->new_slot)
+	{
+		TupleDesc	desc = entry->new_slot->tts_tupleDescriptor;
+
+		Assert(desc->tdrefcount == -1);
+
+		ExecDropSingleTupleTableSlot(entry->new_slot);
+
+		/*
+		 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+		 * it now to avoid any leaks.
+		 */
+		FreeTupleDesc(desc);
+	}
+
+	entry->old_slot = NULL;
+	entry->new_slot = NULL;
+
+	if (entry->attrmap)
+		free_attrmap(entry->attrmap);
+	entry->attrmap = NULL;
+
+	/*
+	 * Row filter cache cleanups.
+	 */
+	if (entry->entry_cxt)
+		MemoryContextDelete(entry->entry_cxt);
+
+	entry->entry_cxt = NULL;
+	entry->estate = NULL;
+	memset(entry->exprstate, 0, sizeof(entry->exprstate));
+}
+
+/*
+ * Cleanup invalidated entries in the cache.
+ */
+static void
+maybe_cleanup_rel_sync_cache(void)
+{
+#define NINVALIDATION_THRESHOLD 100
+
+	Assert(RelationSyncCache);
+
+	/*
+	 * Cleanup would be done if the list has enough entries.
+	 */
+	if (dclist_count(&invalidated_caches) > NINVALIDATION_THRESHOLD)
+	{
+		while (dclist_count(&invalidated_caches))
+		{
+			dlist_node *node;
+			RelationSyncEntry *entry;
+
+			node = dclist_pop_head_node(&invalidated_caches);
+			entry = dclist_container(RelationSyncEntry, invalidated_node,
+									 node);
+
+			/*
+			 * Skip if the entry is valid. This meant that the relsync cache
+			 * was invalidated once but used again.
+			 */
+			if (entry->replicate_valid)
+				continue;
+
+			/* Remove the entry from the cache */
+			if (hash_search(RelationSyncCache,
+							&entry->relid,
+							HASH_REMOVE, NULL) == NULL)
+				elog(ERROR, "hash table corrupted");
+
+			/* Cleanup the entry */
+			cleanup_rel_sync_entry(entry);
+		}
+
+		Assert(dclist_is_empty(&invalidated_caches));
+	}
+}
+
 /*
  * Cleanup list of streamed transactions and update the schema_sent flag.
  *
@@ -2323,7 +2406,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
  * cache - so tweak the schema_sent flag accordingly.
  */
 static void
-cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
+cleanup_streamed_txn(TransactionId xid, bool is_commit)
 {
 	HASH_SEQ_STATUS hash_seq;
 	RelationSyncEntry *entry;
@@ -2387,7 +2470,16 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 		entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
 												  HASH_FIND, NULL);
 		if (entry != NULL)
+		{
+			/* Push to the invalidated list if not yet */
+			if (!entry->already_pushed)
+			{
+				dclist_push_tail(&invalidated_caches, &entry->invalidated_node);
+				entry->already_pushed = true;
+			}
+
 			entry->replicate_valid = false;
+		}
 	}
 	else
 	{
@@ -2397,6 +2489,13 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 		hash_seq_init(&status, RelationSyncCache);
 		while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
 		{
+			/* Push to the invalidated list if not yet */
+			if (!entry->already_pushed)
+			{
+				dclist_push_tail(&invalidated_caches, &entry->invalidated_node);
+				entry->already_pushed = true;
+			}
+
 			entry->replicate_valid = false;
 		}
 	}
@@ -2428,6 +2527,13 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	hash_seq_init(&status, RelationSyncCache);
 	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
 	{
+		/* Push to the invalidated list if not yet */
+		if (!entry->already_pushed)
+		{
+			dclist_push_tail(&invalidated_caches, &entry->invalidated_node);
+			entry->already_pushed = true;
+		}
+
 		entry->replicate_valid = false;
 	}
 }
diff --git a/src/test/recovery/t/100_cachectx_oom.pl b/src/test/recovery/t/100_cachectx_oom.pl
new file mode 100644
index 00000000000..cb6dd10df76
--- /dev/null
+++ b/src/test/recovery/t/100_cachectx_oom.pl
@@ -0,0 +1,188 @@
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+use Time::HiRes qw(usleep nanosleep);
+
+my $node_primary;
+my $node_subscriber;
+my $pid;
+my $kid;
+my $publisher_connstr;
+my $count = 1000;
+my $process = 3;
+
+$node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 'logical');
+$node_primary->start;
+
+$node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',	'wal_receiver_timeout = 0');
+$node_subscriber->start;
+$node_primary->safe_psql('postgres',
+	"CREATE PUBLICATION my_pub FOR ALL TABLES;");
+
+$publisher_connstr = $node_primary->connstr . ' dbname=postgres';
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION my_sub CONNECTION '$publisher_connstr' PUBLICATION my_pub;"
+);
+
+usleep(1_000_000); #1s
+
+$node_primary->safe_psql('postgres', qq(
+	CREATE TABLE my_table(
+		col01 int8,
+		col02 int8,
+		col03 int8,
+		col04 int8,
+		col05 int8,
+		col06 int8,
+		col07 int8,
+		col08 int8,
+		col09 int8,
+		col10 int8,
+		col11 char(256),
+		col12 char(256),
+		col13 char(256),
+		col14 char(256),
+		col15 char(256),
+		col16 char(256),
+		col17 char(256),
+		col18 char(256),
+		col19 char(256),
+		col20 char(256),
+		col21 jsonb,
+		col22 jsonb,
+		col23 jsonb,
+		col24 jsonb,
+		col25 jsonb,
+		col26 jsonb,
+		col27 jsonb,
+		col28 jsonb,
+		col29 jsonb,
+		col30 jsonb,
+		col31 boolean,
+		col32 boolean,
+		col33 boolean,
+		col34 boolean,
+		col35 boolean,
+		col36 boolean,
+		col37 boolean,
+		col38 boolean,
+		col39 boolean,
+		col40 boolean,
+		col41 timestamp,
+		col42 timestamp,
+		col43 timestamp,
+		col44 timestamp,
+		col45 timestamp,
+		col46 timestamp,
+		col47 timestamp,
+		col48 timestamp,
+		col49 timestamp,
+		col50 timestamp);));
+
+$node_subscriber->safe_psql('postgres', qq(
+	CREATE TABLE my_table(
+		col01 int8,
+		col02 int8,
+		col03 int8,
+		col04 int8,
+		col05 int8,
+		col06 int8,
+		col07 int8,
+		col08 int8,
+		col09 int8,
+		col10 int8,
+		col11 char(256),
+		col12 char(256),
+		col13 char(256),
+		col14 char(256),
+		col15 char(256),
+		col16 char(256),
+		col17 char(256),
+		col18 char(256),
+		col19 char(256),
+		col20 char(256),
+		col21 jsonb,
+		col22 jsonb,
+		col23 jsonb,
+		col24 jsonb,
+		col25 jsonb,
+		col26 jsonb,
+		col27 jsonb,
+		col28 jsonb,
+		col29 jsonb,
+		col30 jsonb,
+		col31 boolean,
+		col32 boolean,
+		col33 boolean,
+		col34 boolean,
+		col35 boolean,
+		col36 boolean,
+		col37 boolean,
+		col38 boolean,
+		col39 boolean,
+		col40 boolean,
+		col41 timestamp,
+		col42 timestamp,
+		col43 timestamp,
+		col44 timestamp,
+		col45 timestamp,
+		col46 timestamp,
+		col47 timestamp,
+		col48 timestamp,
+		col49 timestamp,
+		col50 timestamp);));
+
+# subscriber
+for (my $i = 0; $i < $process; $i += 1) {
+	$pid = fork();
+	if ($pid == 0)
+	{
+		for (my $j = 0; $j < $count; $j += 1) {
+			$node_subscriber->safe_psql('postgres', qq(
+				CREATE TABLE my_table_$i\_$j AS TABLE my_table WITH NO DATA;));
+			if ( $j % 100 == 0 ) {
+				print "\nsubscriber:$j";
+			}
+		}
+		exec "echo '\nchild:$i bye'";
+	}
+}
+while (($kid = wait()) != -1 ) {
+	print "\nReaped child $kid\n";
+}
+
+# primary
+for (my $i = 0; $i < $process; $i += 1) {
+	$pid = fork();
+	if ($pid == 0)
+	{
+		for (my $j = 0; $j < $count; $j += 1) {
+			$node_primary->safe_psql('postgres', qq(
+				CREATE TABLE my_table_$i\_$j AS TABLE my_table WITH NO DATA;
+				INSERT INTO my_table_$i\_$j (col01) VALUES($j);
+				INSERT INTO my_table_$i\_$j (col01) VALUES($j);
+				DROP TABLE my_table_$i\_$j;
+				));
+			if ( $j % 100 == 0 ) {
+				print "\nnode_primary:$j";
+			}
+		}
+		exec "echo '\nchild:$i bye'";
+	}
+}
+while (($kid = wait()) != -1 ) {
+	print "\nReaped child $kid\n";
+}
+
+usleep(3600_000_000); #3600s
+# usleep(4_000_000);
+
+is(1, 1, "test end");
+$node_primary->stop;
+done_testing();
-- 
2.47.1

