From 3d9c474bc60dded52df5673724f9e793b58b1bcd Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Tue, 25 Nov 2025 10:11:30 +0100
Subject: [PATCH 3/3] Convert listenChannels to hash table

---
 src/backend/commands/async.c     | 230 ++++++++++++++++++++++---------
 src/tools/pgindent/typedefs.list |   1 +
 2 files changed, 168 insertions(+), 63 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index d5676d43a30..ebc06ba771d 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -68,7 +68,7 @@
  *	  CommitTransaction() which will then do the actual transaction commit.
  *
  *	  After commit we are called another time (AtCommit_Notify()). Here we
- *	  make any actual updates to the effective listen state (listenChannels).
+ *	  make any actual updates to the effective listen state (listenChannelsHash).
  *	  Then we signal any backends that may be interested in our messages
  *	  (including our own backend, if listening).  This is done by
  *	  SignalBackends(), which scans the list of listening backends and sends a
@@ -313,16 +313,25 @@ static SlruCtlData NotifyCtlData;
 #define QUEUE_FULL_WARN_INTERVAL	5000	/* warn at most once every 5s */
 
 /*
- * listenChannels identifies the channels we are actually listening to
- * (ie, have committed a LISTEN on).  It is a simple list of channel names,
- * allocated in TopMemoryContext.
+ * listenChannelsHash identifies the channels we are listening to (or will be
+ * after the current transaction commits).  Entries with active=true are
+ * committed listens; entries with active=false are pre-allocated for pending
+ * LISTENs that will become active at commit.
+ *
+ * This hash table is allocated in TopMemoryContext.
  */
-static List *listenChannels = NIL;	/* list of C strings */
+typedef struct ListenChannelEntry
+{
+	char		channel[NAMEDATALEN];	/* hash key - must be first */
+	bool		active;			/* true if committed listen */
+} ListenChannelEntry;
+
+static HTAB *listenChannelsHash = NULL;
 
 /*
  * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
  * all actions requested in the current transaction.  As explained above,
- * we don't actually change listenChannels until we reach transaction commit.
+ * we don't actually change listenChannelsHash until we reach transaction commit.
  *
  * The list is kept in CurTransactionContext.  In subtransactions, each
  * subtransaction has its own list in its own CurTransactionContext, but
@@ -438,7 +447,9 @@ static inline int64 asyncQueuePageDiff(int64 p, int64 q);
 static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
 static void queue_listen(ListenActionKind action, const char *channel);
 static void Async_UnlistenOnExit(int code, Datum arg);
-static void Exec_ListenPreCommit(void);
+static void initListenChannelsHash(void);
+static void CleanupInactiveListenChannels(void);
+static void Exec_ListenPreCommit(const char *channel);
 static void Exec_ListenCommit(const char *channel);
 static void Exec_UnlistenCommit(const char *channel);
 static void Exec_UnlistenAllCommit(void);
@@ -504,6 +515,60 @@ initSignalArrays(void)
 	MemoryContextSwitchTo(oldcontext);
 }
 
+/*
+ * initListenChannelsHash
+ *		Lazy initialization of the listen channels hash table.
+ */
+static void
+initListenChannelsHash(void)
+{
+	HASHCTL		hash_ctl;
+
+	if (listenChannelsHash != NULL)
+		return;
+
+	memset(&hash_ctl, 0, sizeof(hash_ctl));
+	hash_ctl.keysize = NAMEDATALEN;
+	hash_ctl.entrysize = sizeof(ListenChannelEntry);
+
+	listenChannelsHash = hash_create("Listen Channels", 64, &hash_ctl,
+									 HASH_ELEM | HASH_STRINGS);
+}
+
+/*
+ * CleanupInactiveListenChannels
+ *		Remove all entries with active=false from the hash table.
+ *		If the hash table becomes empty, destroy it.
+ */
+static void
+CleanupInactiveListenChannels(void)
+{
+	HASH_SEQ_STATUS status;
+	ListenChannelEntry *entry;
+
+	if (listenChannelsHash == NULL)
+		return;
+
+	hash_seq_init(&status, listenChannelsHash);
+
+	while ((entry = hash_seq_search(&status)) != NULL)
+	{
+		if (!entry->active)
+		{
+			if (hash_search(listenChannelsHash, entry->channel,
+							HASH_REMOVE, NULL) == NULL)
+				elog(ERROR, "hash table corrupted");
+		}
+	}
+
+	/* Destroy hash table if now empty */
+	if (hash_get_num_entries(listenChannelsHash) == 0)
+	{
+		hash_destroy(listenChannelsHash);
+		listenChannelsHash = NULL;
+	}
+}
+
 /*
  * Report space needed for our shared memory area
  */
@@ -709,7 +774,7 @@ Async_Notify(const char *channel, const char *payload)
  *		Common code for listen, unlisten, unlisten all commands.
  *
  *		Adds the request to the list of pending actions.
- *		Actual update of the listenChannels list happens during transaction
+ *		Actual update of the listenChannelsHash happens during transaction
  *		commit.
  */
 static void
@@ -809,30 +874,52 @@ Async_UnlistenAll(void)
  * SQL function: return a set of the channel names this backend is actively
  * listening to.
  *
- * Note: this coding relies on the fact that the listenChannels list cannot
+ * Note: this coding relies on the fact that the listenChannelsHash cannot
  * change within a transaction.
  */
 Datum
 pg_listening_channels(PG_FUNCTION_ARGS)
 {
 	FuncCallContext *funcctx;
+	HASH_SEQ_STATUS *status;
 
 	/* stuff done only on the first call of the function */
 	if (SRF_IS_FIRSTCALL())
 	{
+		MemoryContext oldcontext;
+
 		/* create a function context for cross-call persistence */
 		funcctx = SRF_FIRSTCALL_INIT();
+
+		if (listenChannelsHash != NULL)
+		{
+			oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+			status = palloc(sizeof(HASH_SEQ_STATUS));
+			hash_seq_init(status, listenChannelsHash);
+			funcctx->user_fctx = status;
+			MemoryContextSwitchTo(oldcontext);
+		}
+		else
+			funcctx->user_fctx = NULL;
 	}
 
 	/* stuff done on every call of the function */
 	funcctx = SRF_PERCALL_SETUP();
+	status = funcctx->user_fctx;
 
-	if (funcctx->call_cntr < list_length(listenChannels))
+	if (status != NULL)
 	{
-		char	   *channel = (char *) list_nth(listenChannels,
-												funcctx->call_cntr);
+		ListenChannelEntry *entry = hash_seq_search(status);
 
-		SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
+		if (entry != NULL)
+		{
+			/*
+			 * All entries should be active; inactive ones are cleaned up at
+			 * commit/abort
+			 */
+			Assert(entry->active);
+			SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(entry->channel));
+		}
 	}
 
 	SRF_RETURN_DONE(funcctx);
@@ -849,6 +936,7 @@ static void
 Async_UnlistenOnExit(int code, Datum arg)
 {
 	Exec_UnlistenAllCommit();
+	CleanupInactiveListenChannels();
 	asyncQueueUnregister();
 }
 
@@ -904,7 +992,7 @@ PreCommit_Notify(void)
 			switch (actrec->action)
 			{
 				case LISTEN_LISTEN:
-					Exec_ListenPreCommit();
+					Exec_ListenPreCommit(actrec->channel);
 					break;
 				case LISTEN_UNLISTEN:
 					/* there is no Exec_UnlistenPreCommit() */
@@ -1005,7 +1093,7 @@ PreCommit_Notify(void)
  *
  *		This is called at transaction commit, after committing to clog.
  *
- *		Update listenChannels and clear transaction-local state.
+ *		Update listenChannelsHash and clear transaction-local state.
  *
  *		If we issued any notifications in the transaction, send signals to
  *		listening backends (possibly including ourselves) to process them.
@@ -1049,8 +1137,11 @@ AtCommit_Notify(void)
 		}
 	}
 
+	/* Clean up inactive entries from the hash table */
+	CleanupInactiveListenChannels();
+
 	/* If no longer listening to anything, get out of listener array */
-	if (amRegisteredListener && listenChannels == NIL)
+	if (amRegisteredListener && listenChannelsHash == NULL)
 		asyncQueueUnregister();
 
 	/*
@@ -1071,11 +1162,22 @@ AtCommit_Notify(void)
  * This function must make sure we are ready to catch any incoming messages.
  */
 static void
-Exec_ListenPreCommit(void)
+Exec_ListenPreCommit(const char *channel)
 {
 	QueuePosition head;
 	QueuePosition max;
 	ProcNumber	prevListener;
+	ListenChannelEntry *entry;
+	bool		found;
+
+	/*
+	 * Pre-allocate the hash table entry for this channel. This happens before
+	 * commit, so an OOM error here is safe (causes transaction abort).
+	 */
+	initListenChannelsHash();
+	entry = hash_search(listenChannelsHash, channel, HASH_ENTER, &found);
+	if (!found)
+		entry->active = false;
 
 	/*
 	 * Nothing to do if we are already listening to something, nor if we
@@ -1085,7 +1187,7 @@ Exec_ListenPreCommit(void)
 		return;
 
 	if (Trace_notify)
-		elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
+		elog(DEBUG1, "Exec_ListenPreCommit(%s,%d)", channel, MyProcPid);
 
 	/*
 	 * Before registering, make sure we will unlisten before dying. (Note:
@@ -1163,54 +1265,48 @@ Exec_ListenPreCommit(void)
 /*
  * Exec_ListenCommit --- subroutine for AtCommit_Notify
  *
- * Add the channel to the list of channels we are listening on.
+ * Set the channel's hash entry to active.
  */
 static void
 Exec_ListenCommit(const char *channel)
 {
-	MemoryContext oldcontext;
+	ListenChannelEntry *entry;
 
-	/* Do nothing if we are already listening on this channel */
-	if (IsListeningOn(channel))
-		return;
+	if (Trace_notify)
+		elog(DEBUG1, "Exec_ListenCommit(%s,%d)", channel, MyProcPid);
 
 	/*
-	 * Add the new channel name to listenChannels.
-	 *
-	 * XXX It is theoretically possible to get an out-of-memory failure here,
-	 * which would be bad because we already committed.  For the moment it
-	 * doesn't seem worth trying to guard against that, but maybe improve this
-	 * later.
+	 * Find the pre-allocated entry and mark it active. The entry must exist
+	 * because Exec_ListenPreCommit created it.
 	 */
-	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
-	listenChannels = lappend(listenChannels, pstrdup(channel));
-	MemoryContextSwitchTo(oldcontext);
+	Assert(listenChannelsHash != NULL);
+	entry = hash_search(listenChannelsHash, channel, HASH_FIND, NULL);
+	Assert(entry != NULL);
+
+	entry->active = true;
 }
 
 /*
  * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
  *
- * Remove the specified channel name from listenChannels.
+ * Mark the specified channel as inactive.  We unset active rather than
+ * removing the entry, to avoid needing to allocate memory if a subsequent
+ * LISTEN in the same transaction re-adds it.
  */
 static void
 Exec_UnlistenCommit(const char *channel)
 {
-	ListCell   *q;
+	ListenChannelEntry *entry;
 
 	if (Trace_notify)
 		elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
 
-	foreach(q, listenChannels)
-	{
-		char	   *lchan = (char *) lfirst(q);
+	if (listenChannelsHash == NULL)
+		return;
 
-		if (strcmp(lchan, channel) == 0)
-		{
-			listenChannels = foreach_delete_current(listenChannels, q);
-			pfree(lchan);
-			break;
-		}
-	}
+	entry = hash_search(listenChannelsHash, channel, HASH_FIND, NULL);
+	if (entry != NULL)
+		entry->active = false;
 
 	/*
 	 * We do not complain about unlistening something not being listened;
@@ -1226,34 +1322,35 @@ Exec_UnlistenCommit(const char *channel)
 static void
 Exec_UnlistenAllCommit(void)
 {
+	HASH_SEQ_STATUS seq;
+	ListenChannelEntry *entry;
+
 	if (Trace_notify)
 		elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
 
-	list_free_deep(listenChannels);
-	listenChannels = NIL;
+	if (listenChannelsHash == NULL)
+		return;
+
+	hash_seq_init(&seq, listenChannelsHash);
+	while ((entry = hash_seq_search(&seq)) != NULL)
+		entry->active = false;
 }
 
 /*
  * Test whether we are actively listening on the given channel name.
  *
  * Note: this function is executed for every notification found in the queue.
- * Perhaps it is worth further optimization, eg convert the list to a sorted
- * array so we can binary-search it.  In practice the list is likely to be
- * fairly short, though.
  */
 static bool
 IsListeningOn(const char *channel)
 {
-	ListCell   *p;
+	ListenChannelEntry *entry;
 
-	foreach(p, listenChannels)
-	{
-		char	   *lchan = (char *) lfirst(p);
+	if (listenChannelsHash == NULL)
+		return false;
 
-		if (strcmp(lchan, channel) == 0)
-			return true;
-	}
-	return false;
+	entry = hash_search(listenChannelsHash, channel, HASH_FIND, NULL);
+	return (entry != NULL && entry->active);
 }
 
 /*
@@ -1263,7 +1360,7 @@ IsListeningOn(const char *channel)
 static void
 asyncQueueUnregister(void)
 {
-	Assert(listenChannels == NIL);	/* else caller error */
+	Assert(listenChannelsHash == NULL); /* else caller error */
 
 	if (!amRegisteredListener)	/* nothing to do */
 		return;
@@ -1695,12 +1792,19 @@ SignalBackends(void)
 void
 AtAbort_Notify(void)
 {
+	/*
+	 * Clean up any pre-allocated hash entries that weren't committed. These
+	 * have active=false and would have been set to active=true in
+	 * Exec_ListenCommit if the transaction had committed.
+	 */
+	CleanupInactiveListenChannels();
+
 	/*
 	 * If we LISTEN but then roll back the transaction after PreCommit_Notify,
-	 * we have registered as a listener but have not made any entry in
-	 * listenChannels.  In that case, deregister again.
+	 * we have registered as a listener but have not made any active entry in
+	 * listenChannelsHash.  In that case, deregister again.
 	 */
-	if (amRegisteredListener && listenChannels == NIL)
+	if (amRegisteredListener && listenChannelsHash == NULL)
 		asyncQueueUnregister();
 
 	/* And clean up */
@@ -2080,7 +2184,7 @@ asyncQueueProcessPageEntries(QueuePosition *current,
 			 * over it on the first LISTEN in a session, and not get stuck on
 			 * it indefinitely.
 			 */
-			if (listenChannels == NIL)
+			if (listenChannelsHash == NULL)
 				continue;
 
 			if (TransactionIdDidCommit(qe->xid))
@@ -2332,7 +2436,7 @@ ProcessIncomingNotify(bool flush)
 	notifyInterruptPending = false;
 
 	/* Do nothing else if we aren't actively listening */
-	if (listenChannels == NIL)
+	if (listenChannelsHash == NULL)
 		return;
 
 	if (Trace_notify)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index c751c25a04d..46ea8f2ff8e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1564,6 +1564,7 @@ ListDictionary
 ListParsedLex
 ListenAction
 ListenActionKind
+ListenChannelEntry
 ListenStmt
 LoInfo
 LoadStmt
-- 
2.50.1

