From 258d75348952c28d8ff3950363471c48e8a8c362 Mon Sep 17 00:00:00 2001
From: "Boudreau, Trey" <trey@treysoft.com>
Date: Tue, 24 Dec 2024 12:15:36 -0600
Subject: [PATCH v1 3/3] WIP Allow LISTENing for all channels.

Extend the notion of LISTENing to include all channels via
'LISTEN *', as per:

https://www.postgresql.org/message-id/737106.1734732462%40sss.pgh.pa.us

This commit does not change the results of pg_listening_channels(),
but some future version of it should.
---
 doc/src/sgml/ref/listen.sgml        |  37 +++++--
 src/backend/commands/async.c        | 149 ++++++++++++++++++++++++----
 src/backend/parser/gram.y           |  10 +-
 src/backend/tcop/utility.c          |   5 +-
 src/include/commands/async.h        |   1 +
 src/include/nodes/parsenodes.h      |   3 +-
 src/test/regress/expected/async.out |   6 ++
 src/test/regress/sql/async.sql      |   7 ++
 8 files changed, 189 insertions(+), 29 deletions(-)

diff --git a/doc/src/sgml/ref/listen.sgml b/doc/src/sgml/ref/listen.sgml
index 6c1f09bd45..67bab0b8f1 100644
--- a/doc/src/sgml/ref/listen.sgml
+++ b/doc/src/sgml/ref/listen.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-LISTEN <replaceable class="parameter">channel</replaceable>
+LISTEN { <replaceable class="parameter">channel</replaceable> | * }
 </synopsis>
  </refsynopsisdiv>
 
@@ -36,6 +36,25 @@ LISTEN <replaceable class="parameter">channel</replaceable>
    this notification channel, nothing is done.
   </para>
 
+  <para>
+   A session can be unregistered for a given notification channel with the
+   <command>UNLISTEN</command> command.  A session's listen
+   registrations are automatically cleared when the session ends.
+  </para>
+
+  <para>
+   The special wildcard <literal>*</literal> cancels all listener
+   registrations for the current session and replaces them with a
+   virtual registration that matches all channels. Subsequent
+   <command>UNLISTEN <replaceable class="parameter">channel</replaceable>
+   </command> commands add to a list of exceptions and will not be
+   delivered. Any <command>LISTEN <replaceable class="parameter">channel</replaceable>
+   </command> commands after <command>LISTEN *</command> will remove
+   previously set <command>UNLISTEN</command> exceptions, if present.
+   Otherwise they will be ignored until the session sees the
+   <command>UNLISTEN *</command> command.
+  </para>
+
   <para>
    Whenever the command <command>NOTIFY <replaceable
    class="parameter">channel</replaceable></command> is invoked, either
@@ -45,12 +64,6 @@ LISTEN <replaceable class="parameter">channel</replaceable>
    application.
   </para>
 
-  <para>
-   A session can be unregistered for a given notification channel with the
-   <command>UNLISTEN</command> command.  A session's listen
-   registrations are automatically cleared when the session ends.
-  </para>
-
   <para>
    The method a client application must use to detect notification events depends on
    which <productname>PostgreSQL</productname> application programming interface it
@@ -77,6 +90,16 @@ LISTEN <replaceable class="parameter">channel</replaceable>
      </para>
     </listitem>
    </varlistentry>
+
+   <varlistentry>
+    <term><literal>*</literal></term>
+    <listitem>
+     <para>
+      All current listen registrations for this session are cleared and
+      replaced by a virtual registration that matches all channels.
+     </para>
+    </listitem>
+   </varlistentry>
   </variablelist>
  </refsect1>
 
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 54f47f024f..607ad81c5d 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -336,11 +336,18 @@ typedef struct
 #include "lib/simplehash.h"
 
 /*
- * listenChannels identifies the channels we are actually listening to
- * (ie, have committed a LISTEN on).  It is a simplehash.h of channel names,
+ * listenChannels identifies the channels we are actually listening to.
+ * We represent it as a boolean flag indicating whether or not we want
+ * to see all notifications, and a simplehash.h of the exeptions,
  * allocated in TopMemoryContext.
  */
-static listen_hash * listenChannels = NULL; /* hash table of ListenHashEntry */
+typedef struct
+{
+	listen_hash *exceptions;
+	bool		wantAll;
+}			ListenChannels;
+
+static ListenChannels * listenChannels = NULL;
 
 /* Initialize the hash table with this many elements when we start listening. */
 #define LISTEN_HASH_START_SIZE 16
@@ -358,6 +365,7 @@ static listen_hash * listenChannels = NULL; /* hash table of ListenHashEntry */
 typedef enum
 {
 	LISTEN_LISTEN,
+	LISTEN_LISTEN_ALL,
 	LISTEN_UNLISTEN,
 	LISTEN_UNLISTEN_ALL,
 } ListenActionKind;
@@ -460,8 +468,11 @@ static void queue_listen(ListenActionKind action, const char *channel);
 static void Async_UnlistenOnExit(int code, Datum arg);
 static void Exec_ListenPreCommit(void);
 static void Exec_ListenCommit(const char *channel);
+static void Exec_ListenAllCommit(void);
 static void Exec_UnlistenCommit(const char *channel);
 static void Exec_UnlistenAllCommit(void);
+static bool NoActiveListens(void);
+static void ResetListens(void);
 static bool IsListeningOn(const char *channel);
 static void asyncQueueUnregister(void);
 static bool asyncQueueIsFull(void);
@@ -709,8 +720,8 @@ Async_Notify(const char *channel, const char *payload)
  *		Common code for listen, unlisten, unlisten all commands.
  *
  *		Adds the request to the list of pending actions.
- *		Actual update of the listenChannels list happens during transaction
- *		commit.
+ *		Actual update of the listenChannels data structure happens during
+ *		transaction commit.
  */
 static void
 queue_listen(ListenActionKind action, const char *channel)
@@ -769,6 +780,20 @@ Async_Listen(const char *channel)
 	queue_listen(LISTEN_LISTEN, channel);
 }
 
+/*
+ * Async_ListenAll
+ *
+ *		This is executed by the SQL LISTEN * command.
+ */
+void
+Async_ListenAll(void)
+{
+	if (Trace_notify)
+		elog(DEBUG1, "Async_ListenAll(%d)", MyProcPid);
+
+	queue_listen(LISTEN_LISTEN_ALL, "");
+}
+
 /*
  * Async_Unlisten
  *
@@ -826,7 +851,7 @@ pg_listening_channels(PG_FUNCTION_ARGS)
 		if (listenChannels)
 		{
 			funcctx->user_fctx = MemoryContextAlloc(funcctx->multi_call_memory_ctx, sizeof(listen_iterator));
-			listen_start_iterate(listenChannels, (listen_iterator *) funcctx->user_fctx);
+			listen_start_iterate(listenChannels->exceptions, (listen_iterator *) funcctx->user_fctx);
 		}
 	}
 
@@ -836,7 +861,7 @@ pg_listening_channels(PG_FUNCTION_ARGS)
 	if (listenChannels)
 	{
 		/* next item from the hash table iterator */
-		item = listen_iterate(listenChannels, (listen_iterator *) funcctx->user_fctx);
+		item = listen_iterate(listenChannels->exceptions, (listen_iterator *) funcctx->user_fctx);
 		/* iterate has skipped over all empty slots for us */
 		if (item)
 			SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(item->channel));
@@ -911,6 +936,7 @@ PreCommit_Notify(void)
 			switch (actrec->action)
 			{
 				case LISTEN_LISTEN:
+				case LISTEN_LISTEN_ALL:
 					Exec_ListenPreCommit();
 					break;
 				case LISTEN_UNLISTEN:
@@ -1024,6 +1050,9 @@ AtCommit_Notify(void)
 				case LISTEN_LISTEN:
 					Exec_ListenCommit(actrec->channel);
 					break;
+				case LISTEN_LISTEN_ALL:
+					Exec_ListenAllCommit();
+					break;
 				case LISTEN_UNLISTEN:
 					Exec_UnlistenCommit(actrec->channel);
 					break;
@@ -1035,7 +1064,7 @@ AtCommit_Notify(void)
 	}
 
 	/* If no longer listening to anything, get out of listener array */
-	if (amRegisteredListener && (listenChannels->members == 0))
+	if (amRegisteredListener && NoActiveListens())
 		asyncQueueUnregister();
 
 	/*
@@ -1146,7 +1175,8 @@ Exec_ListenPreCommit(void)
 	LWLockRelease(NotifyQueueLock);
 
 	/* Create the structures to manage listens */
-	listenChannels = listen_create(TopMemoryContext, LISTEN_HASH_START_SIZE, NULL);
+	listenChannels = MemoryContextAllocZero(TopMemoryContext, sizeof(ListenChannels));
+	listenChannels->exceptions = listen_create(TopMemoryContext, LISTEN_HASH_START_SIZE, NULL);
 
 	/* Now we are listed in the global array, so remember we're listening */
 	amRegisteredListener = true;
@@ -1175,6 +1205,9 @@ Exec_ListenCommit(const char *channel)
 
 	Assert(listenChannels != NULL);
 
+	if (Trace_notify)
+		elog(DEBUG1, "Exec_ListenCommit(%s,%d)", channel, MyProcPid);
+
 	/*
 	 * Add the new channel name to listenChannels.
 	 *
@@ -1182,10 +1215,33 @@ Exec_ListenCommit(const char *channel)
 	 * which would be bad because we already committed.  For the moment it
 	 * doesn't seem worth trying to guard against that, but maybe improve this
 	 * later.
-	 *
-	 * Does nothing if we are already listening on this channel.
 	 */
-	listen_insert(listenChannels, channel, &found);
+	if (listenChannels->wantAll)
+	{
+		/* remove any existing exception */
+		listen_delete(listenChannels->exceptions, channel);
+	}
+	else
+	{
+		/* add an explicit exception */
+		(void) listen_insert(listenChannels->exceptions, channel, &found);
+	}
+}
+
+/*
+ * Exec_ListenAllCommit --- subroutine for AtCommit_Notify
+ *
+ * Listen to ALL of the channels.
+ */
+static void
+Exec_ListenAllCommit(void)
+{
+	if (Trace_notify)
+		elog(DEBUG1, "Exec_ListenAlllistenCommit(%d)", MyProcPid);
+
+	/* By definition, 'LISTEN *' resets the world. */
+	ResetListens();
+	listenChannels->wantAll = true;
 }
 
 /*
@@ -1196,6 +1252,8 @@ Exec_ListenCommit(const char *channel)
 static void
 Exec_UnlistenCommit(const char *channel)
 {
+	bool		found;
+
 	if (Trace_notify)
 		elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
 
@@ -1203,7 +1261,16 @@ Exec_UnlistenCommit(const char *channel)
 	if (!listenChannels)
 		return;
 
-	listen_delete(listenChannels, channel);
+	if (!listenChannels->wantAll)
+	{
+		/* remove any existing exception */
+		listen_delete(listenChannels->exceptions, channel);
+	}
+	else
+	{
+		/* add an explicit exception */
+		(void) listen_insert(listenChannels->exceptions, channel, &found);
+	}
 
 	/*
 	 * We do not complain about unlistening something not being listened;
@@ -1226,7 +1293,23 @@ Exec_UnlistenAllCommit(void)
 	if (!listenChannels)
 		return;
 
-	listen_reset(listenChannels);
+	ResetListens();
+	listenChannels->wantAll = false;
+}
+
+/*
+ * Test if we have any active listens.
+ */
+static bool
+NoActiveListens(void)
+{
+	/* Determine if we have any active listens, then invert that. */
+	return !(
+	/* Missing data structure? No listens possible. */
+			 (listenChannels != NULL) &&
+	/* Want everything? No, want anything? */
+			 ((listenChannels->wantAll) || (listenChannels->exceptions->members != 0))
+		);
 }
 
 /*
@@ -1237,9 +1320,36 @@ Exec_UnlistenAllCommit(void)
 static bool
 IsListeningOn(const char *channel)
 {
+	bool		found;
+
 	if (listenChannels == NULL)
 		return false;
-	return listen_lookup(listenChannels, channel) != NULL;
+
+	/* determine if we have an entry for it */
+	found = listen_lookup(listenChannels->exceptions, channel) != NULL;
+	/*---------------------
+	 * wantAll found result
+	 *    f      f     f
+	 *    f      t     t
+	 *    t      f     t
+	 *    t      t     f
+	 *---------------------
+	 */
+	return listenChannels->wantAll ^ found;
+}
+
+/* Reset the listen exceptions hash.
+ *
+ * Because we allocate the channel key separate from the table we
+ * must free them all before resetting the table.
+ */
+static void
+ResetListens(void)
+{
+	if ((listenChannels == NULL) || (listenChannels->exceptions == NULL))
+		return;
+
+	listen_reset(listenChannels->exceptions);
 }
 
 /*
@@ -1249,13 +1359,14 @@ IsListeningOn(const char *channel)
 static void
 asyncQueueUnregister(void)
 {
-	Assert((listenChannels == NULL) || (listenChannels->members == 0)); /* else caller error */
+	Assert(NoActiveListens());	/* else caller error */
 
 	if (!amRegisteredListener)	/* nothing to do */
 		return;
 
 	/* Release our listen_hash data structures */
-	listen_destroy(listenChannels);
+	listen_destroy(listenChannels->exceptions);
+	pfree(listenChannels);
 	listenChannels = NULL;
 
 	/*
@@ -1698,7 +1809,7 @@ AtAbort_Notify(void)
 	 * we have registered as a listener but have not made any entry in
 	 * listenChannels.  In that case, deregister again.
 	 */
-	if (amRegisteredListener && (listenChannels->members == 0))
+	if (amRegisteredListener && NoActiveListens())
 		asyncQueueUnregister();
 
 	/* And clean up */
@@ -2209,7 +2320,7 @@ ProcessIncomingNotify(bool flush)
 	notifyInterruptPending = false;
 
 	/* Do nothing else if we aren't actively listening */
-	if (listenChannels == NULL)
+	if (NoActiveListens())
 		return;
 
 	if (Trace_notify)
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 67eb96396a..c190b29efc 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -10992,13 +10992,21 @@ notify_payload:
 			| /*EMPTY*/							{ $$ = NULL; }
 		;
 
-ListenStmt: LISTEN ColId
+ListenStmt:
+			LISTEN ColId
 				{
 					ListenStmt *n = makeNode(ListenStmt);
 
 					n->conditionname = $2;
 					$$ = (Node *) n;
 				}
+			| LISTEN '*'
+				{
+					ListenStmt *n = makeNode(ListenStmt);
+
+					n->conditionname = NULL;
+					$$ = (Node *) n;
+				}
 		;
 
 UnlistenStmt:
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index c2ed8214ef..f630251265 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -824,7 +824,10 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 							 errmsg("cannot execute %s within a background process",
 									"LISTEN")));
 
-				Async_Listen(stmt->conditionname);
+				if (stmt->conditionname)
+					Async_Listen(stmt->conditionname);
+				else
+					Async_ListenAll();
 			}
 			break;
 
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index 78daa25fa0..ac6cb6348a 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -29,6 +29,7 @@ extern void NotifyMyFrontEnd(const char *channel,
 /* notify-related SQL statements */
 extern void Async_Notify(const char *channel, const char *payload);
 extern void Async_Listen(const char *channel);
+extern void Async_ListenAll(void);
 extern void Async_Unlisten(const char *channel);
 extern void Async_UnlistenAll(void);
 
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 0f9462493e..58696297f1 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3661,7 +3661,8 @@ typedef struct NotifyStmt
 typedef struct ListenStmt
 {
 	NodeTag		type;
-	char	   *conditionname;	/* condition name to listen on */
+	char	   *conditionname;	/* condition name to listen on, or NULL for
+								 * all */
 } ListenStmt;
 
 /* ----------------------
diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out
index e01c953ae9..0b91857343 100644
--- a/src/test/regress/expected/async.out
+++ b/src/test/regress/expected/async.out
@@ -41,6 +41,12 @@ UNLISTEN notify_async2;
 UNLISTEN notify_async2; -- no-op
 --Should work. Ignore unlisten all with no channels
 UNLISTEN *;
+--Should work. Allow listening on everything
+LISTEN *;
+LISTEN *;
+UNLISTEN notify_async3;
+LISTEN notify_async3;
+UNLISTEN *;
 -- Should return zero while there are no pending notifications.
 -- src/test/isolation/specs/async-notify.spec tests for actual usage.
 SELECT pg_notification_queue_usage();
diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql
index 382c80ac2a..4e90750f52 100644
--- a/src/test/regress/sql/async.sql
+++ b/src/test/regress/sql/async.sql
@@ -22,6 +22,13 @@ UNLISTEN notify_async2; -- no-op
 --Should work. Ignore unlisten all with no channels
 UNLISTEN *;
 
+--Should work. Allow listening on everything
+LISTEN *;
+LISTEN *;
+UNLISTEN notify_async3;
+LISTEN notify_async3;
+UNLISTEN *;
+
 -- Should return zero while there are no pending notifications.
 -- src/test/isolation/specs/async-notify.spec tests for actual usage.
 SELECT pg_notification_queue_usage();
-- 
2.43.0

