From c291e5f8466b4b6806e01d1927332c44d84b3076 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Mar 2025 16:51:19 +0900
Subject: [PATCH v3 2/3] Introduce a new invalidation message to invalidate
 caches in output plugins

---
 src/backend/utils/cache/inval.c | 87 +++++++++++++++++++++++++++++++++
 src/include/storage/sinval.h    | 11 +++++
 src/include/utils/inval.h       | 10 ++++
 3 files changed, 108 insertions(+)

diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 700ccb6df9..d793bc9281 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -271,6 +271,7 @@ int			debug_discard_caches = 0;
 
 #define MAX_SYSCACHE_CALLBACKS 64
 #define MAX_RELCACHE_CALLBACKS 10
+#define MAX_RELSYNC_CALLBACKS 10
 
 static struct SYSCACHECALLBACK
 {
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
 
 static int	relcache_callback_count = 0;
 
+static struct RELSYNCCALLBACK
+{
+	RelSyncCallbackFunction function;
+	Datum		arg;
+}			relsync_callback_list[MAX_RELSYNC_CALLBACKS];
+
+static int	relsync_callback_count = 0;
+
+
 /* ----------------------------------------------------------------
  *				Invalidation subgroup support functions
  * ----------------------------------------------------------------
@@ -832,6 +842,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
 		else if (msg->sn.dbId == MyDatabaseId)
 			InvalidateCatalogSnapshot();
 	}
+	else if (msg->id == SHAREDINVALRELSYNC_ID)
+	{
+		/* We only care about our own database */
+		if (msg->rs.dbId == MyDatabaseId)
+			CallRelSyncCallbacks(msg->rs.relid);
+	}
 	else
 		elog(FATAL, "unrecognized SI message ID: %d", msg->id);
 }
@@ -1695,6 +1711,42 @@ CacheInvalidateRelmap(Oid databaseId)
 }
 
 
+/*
+ * RelationCacheInvalidate
+ *		Register invalidation of the cache in logical decoding output plugin
+ *		for a database.
+ *
+ * This type of invalidation message is used for the specific purpose of output
+ * plugins. Processes which do not decode WALs would do nothing even when it
+ * receives the message.
+ */
+void
+CacheInvalidateRelSync(Oid relid)
+{
+	SharedInvalidationMessage msg;
+
+	msg.rs.id = SHAREDINVALRELSYNC_ID;
+	msg.rs.dbId = MyDatabaseId;
+	msg.rs.relid = relid;
+	/* check AddCatcacheInvalidationMessage() for an explanation */
+	VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+	SendSharedInvalidMessages(&msg, 1);
+}
+
+
+/*
+ * CacheInvalidateRelSyncAll
+ *		Register invalidation of the whole cache in logical decoding output
+ *		plugin.
+ */
+void
+CacheInvalidateRelSyncAll(void)
+{
+	CacheInvalidateRelSync(InvalidOid);
+}
+
+
 /*
  * CacheRegisterSyscacheCallback
  *		Register the specified function to be called for all future
@@ -1763,6 +1815,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 	++relcache_callback_count;
 }
 
+/*
+ * CacheRegisterRelSyncCallback
+ *		Register the specified function to be called for all future
+ *		decoding-cache invalidation events.
+ *
+ * This function is intended to be call from the logical decoding output
+ * plugins.
+ */
+void
+CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+							 Datum arg)
+{
+	if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
+		elog(FATAL, "out of relsync_callback_list slots");
+
+	relsync_callback_list[relsync_callback_count].function = func;
+	relsync_callback_list[relsync_callback_count].arg = arg;
+
+	++relsync_callback_count;
+}
+
 /*
  * CallSyscacheCallbacks
  *
@@ -1788,6 +1861,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
 	}
 }
 
+/*
+ * CallSyscacheCallbacks
+ */
+void
+CallRelSyncCallbacks(Oid relid)
+{
+	for (int i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, relid);
+	}
+}
+
 /*
  * LogLogicalInvalidations
  *
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 2463c0f9fa..90a5af4ed8 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -27,6 +27,7 @@
  *	* invalidate an smgr cache entry for a specific physical relation
  *	* invalidate the mapped-relation mapping for a given database
  *	* invalidate any saved snapshot that might be used to scan a given relation
+ *	* invalidate a specific entry for specific output plugin
  * More types could be added if needed.  The message type is identified by
  * the first "int8" field of the message struct.  Zero or positive means a
  * specific-catcache inval message (and also serves as the catcache ID field).
@@ -110,6 +111,15 @@ typedef struct
 	Oid			relId;			/* relation ID */
 } SharedInvalSnapshotMsg;
 
+#define SHAREDINVALRELSYNC_ID	(-6)
+
+typedef struct
+{
+	int8		id;				/* type field --- must be first */
+	Oid			dbId;			/* database ID */
+	Oid			relid;			/* relation ID, or 0 if whole relcache */
+} SharedInvalRelSyncMsg;
+
 typedef union
 {
 	int8		id;				/* type field --- must be first */
@@ -119,6 +129,7 @@ typedef union
 	SharedInvalSmgrMsg sm;
 	SharedInvalRelmapMsg rm;
 	SharedInvalSnapshotMsg sn;
+	SharedInvalRelSyncMsg rs;
 } SharedInvalidationMessage;
 
 
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 40658ba2ff..5922306c11 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
 
 typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue);
 typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid);
+typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid);
 
 
 extern void AcceptInvalidationMessages(void);
@@ -59,6 +60,10 @@ extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator);
 
 extern void CacheInvalidateRelmap(Oid databaseId);
 
+extern void CacheInvalidateRelSync(Oid relid);
+
+extern void CacheInvalidateRelSyncAll(void);
+
 extern void CacheRegisterSyscacheCallback(int cacheid,
 										  SyscacheCallbackFunction func,
 										  Datum arg);
@@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
 extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 										  Datum arg);
 
+extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+										 Datum arg);
+
 extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
 
+extern void CallRelSyncCallbacks(Oid relid);
+
 extern void InvalidateSystemCaches(void);
 extern void InvalidateSystemCachesExtended(bool debug_discard);
 
-- 
2.43.5

