From c8ae6295084aa2e95333ac7449b0984e6d819e44 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Sat, 15 Nov 2025 22:18:50 +0100
Subject: [PATCH 2/2] Optimize LISTEN/NOTIFY with shared channel map and direct
 advancement
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

This patch reworks the LISTEN/NOTIFY signaling path to avoid the
long-standing inefficiency where every commit wakes all listening
backends in the same database, even those that are listening on
completely different channels.

Problem
-------

At present, SignalBackends has no central knowledge of which backend
listens on which channel. When a backend commits a transaction that
issued NOTIFY, it simply iterates over all registered listeners in the
same database and sends each one a PROCSIG_NOTIFY_INTERRUPT signal.

That behavior is fine when all listeners are on the same channel, but
when many backends are listening on different channels, each NOTIFY
triggers a storm of unnecessary wakeups and context switches. As the
number of idle listeners grows, this often becomes the bottleneck and
throughput drops sharply.

Overview of the solution
------------------------

This patch introduces a lazily-created dynamic shared hash (dshash)
backed by dynamic shared memory (DSA) that maps (dboid, channel) to
arrays of ListenerEntry structures (containing ProcNumber and active
flag) representing the backends listening on each channel. This allows
the sender to target only those backends actually listening on the
channels for which it has queued notifications.

Critical section safety
-----------------------

To avoid ERROR→PANIC after transaction commit, all risky operations
(DSA allocations, dshash operations, memory allocations) are moved to
PreCommit_Notify where failures can still safely abort.

Each listener entry includes an 'active' flag. During PreCommit, LISTEN
operations insert entries with active=false to pre-allocate all required
space. After commit to clog, AtCommit_Notify simply flips active flags
to true—a boolean assignment that has acceptably low failure risk.

Similarly, signal arrays (notifySignalPids/notifySignalProcs) use a
hybrid allocation strategy: allocated once in TopMemoryContext during
the first NOTIFY (in PreCommit where OOM can still abort), then reused
permanently across all transactions.

At commit time:

* Exec_ListenPreCommit performs all risky allocations with active=false
* Exec_ListenCommit flips active flags to true (post-commit)
* AtCommit_Notify updates the shared channelHash to reflect any LISTEN
  or UNLISTEN actions performed in the transaction.
* SignalBackends consults this hash to find the backends that are
  listening on the channels being notified in the current database, and
  signals only those with active=true.

Each backend's entry in AsyncQueueControl now includes a wakeupPending
flag to prevent duplicate signals while a previous wakeup is still being
processed.

Direct advancement
------------------

A further optimization avoids signaling idle backends that are not
listening on any of the channels notified within the transaction.

While queuing notifications, PreCommit_Notify records the queue head
position both before and after writing its notifications. Because all
writers are serialized by the existing cluster-wide heavyweight lock on
"database 0", no backend (from any database) can insert entries between
those two points. This guarantees that the region [oldHead, newHead)
contains only the entries written by our commit.

SignalBackends uses this fact to directly advance any backend still
positioned at oldHead up to newHead, avoiding a needless wakeup for
listeners that would otherwise not find any notifies of interest.

To handle advancing backends correctly, each backend's entry tracks both
whether it is currently advancing (isAdvancing) and the target position
it is advancing to (advancingPos). This allows SignalBackends to signal
advancing backends only when their target position would leave them
behind the new queue head, while safely direct-advancing idle backends
that would not be interested in the newly written notifications.
Idle backends that are stationary at a position before the old queue
head are signaled, since they might be interested in the notifications
in between their current position and the old queue head.

Other notes
-----------

* Maintains dual data structures: a shared channelHash for determining
  which backends to signal, and a local per-backend listenChannels list
  for fast lock-free lookups during notification processing. This avoids
  contention on the shared hash during the high-frequency IsListeningOn
  checks that occur for every notification read from the queue.
* Backends remain registered in the global listener list as long as
  listenChannels is non-empty.
* AtAbort_Notify cleanly handles both local and shared hash cleanup,
  removing inactive (uncommitted) entries while preserving active
  (committed) entries from previous transactions.
* Adds LWLock tranche NOTIFY_CHANNEL_HASH and wait event
  NotifyChannelHash for visibility.
* No user-visible behavioral changes; this is an internal optimization
  only.
---
 src/backend/commands/async.c                  | 1084 +++++++++++++----
 .../utils/activity/wait_event_names.txt       |    1 +
 src/include/storage/lwlocklist.h              |    1 +
 src/tools/pgindent/typedefs.list              |    3 +
 4 files changed, 876 insertions(+), 213 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index e1cf659485a..2a82f4a0130 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -24,8 +24,10 @@
  *	  All notification messages are placed in the queue and later read out
  *	  by listening backends.
  *
- *	  There is no central knowledge of which backend listens on which channel;
- *	  every backend has its own list of interesting channels.
+ *	  We also maintain a dynamic shared hash table (dshash) that maps channel
+ *	  names to the set of backends listening on each channel. This table is
+ *	  created lazily on the first LISTEN command and grows dynamically as
+ *	  needed.
  *
  *	  Although there is only one queue, notifications are treated as being
  *	  database-local; this is done by including the sender's database OID
@@ -68,16 +70,24 @@
  *	  CommitTransaction() which will then do the actual transaction commit.
  *
  *	  After commit we are called another time (AtCommit_Notify()). Here we
- *	  make any actual updates to the effective listen state (listenChannels).
- *	  Then we signal any backends that may be interested in our messages
- *	  (including our own backend, if listening).  This is done by
- *	  SignalBackends(), which scans the list of listening backends and sends a
- *	  PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
- *	  know which backend is listening on which channel so we must signal them
- *	  all).  We can exclude backends that are already up to date, though, and
- *	  we can also exclude backends that are in other databases (unless they
- *	  are way behind and should be kicked to make them advance their
- *	  pointers).
+ *	  make any actual updates to the local listen state (listenChannelsHash) and
+ *	  shared channel hash table (channelHash).  Then we signal any backends
+ *	  that may be interested in our messages (including our own backend,
+ *	  if listening).  This is done by SignalBackends(), which consults the
+ *	  shared channel hash table to identify listeners for the channels that
+ *	  have pending notifications in the current database.  Each selected
+ *	  backend is marked as having a wakeup pending to avoid duplicate signals,
+ *	  and a PROCSIG_NOTIFY_INTERRUPT signal is sent to it.
+ *
+ *	  When writing notifications, PreCommit_Notify() records the queue head
+ *	  position both before and after the write.  Because all writers serialize
+ *	  on a cluster-wide heavyweight lock, no backend can insert entries between
+ *	  these two points.  SignalBackends() uses this fact to directly advance any
+ *	  backend that is still positioned at the old head, or within the range
+ *	  written, avoiding unnecessary wakeups for idle listeners that have
+ *	  nothing to read.  Backends that cannot be direct advanced are signaled
+ *	  if they are stuck behind the old queue head, or advancing to a position
+ *	  before the new queue head, since otherwise notifications could be delayed.
  *
  *	  Finally, after we are out of the transaction altogether and about to go
  *	  idle, we scan the queue for messages that need to be sent to our
@@ -137,14 +147,17 @@
 #include "commands/async.h"
 #include "common/hashfn.h"
 #include "funcapi.h"
+#include "lib/dshash.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
+#include "storage/dsm_registry.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/procsignal.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
+#include "utils/dsa.h"
 #include "utils/guc_hooks.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
@@ -162,6 +175,43 @@
  */
 #define NOTIFY_PAYLOAD_MAX_LENGTH	(BLCKSZ - NAMEDATALEN - 128)
 
+/*
+ * Channel hash table definitions
+ *
+ * This hash table maps (database OID, channel name) keys to arrays of
+ * ListenerEntry structs representing the backends listening on each channel.
+ */
+
+#define INITIAL_LISTENERS_ARRAY_SIZE 4
+
+typedef struct ChannelHashKey
+{
+	Oid			dboid;
+	char		channel[NAMEDATALEN];
+} ChannelHashKey;
+
+/*
+ * Individual listener entry in the channel's listeners array.
+ *
+ * We store both the ProcNumber and an active flag.  During PreCommit,
+ * we insert entries with active=false to pre-allocate space and avoid
+ * OOM failures after transaction commit.  Then in AtCommit, we just set
+ * the active flag to true, which has acceptably low risk of failure.
+ */
+typedef struct ListenerEntry
+{
+	ProcNumber	procno;			/* backend's ProcNumber */
+	bool		active;			/* true if actually listening */
+} ListenerEntry;
+
+typedef struct ChannelEntry
+{
+	ChannelHashKey key;
+	dsa_pointer listenersArray; /* DSA pointer to ListenerEntry array */
+	int			numListeners;	/* Number of listeners currently stored */
+	int			allocatedListeners; /* Allocated size of array */
+} ChannelEntry;
+
 /*
  * Struct representing an entry in the global notify queue
  *
@@ -224,11 +274,14 @@ typedef struct QueuePosition
 	 (x).page != (y).page ? (x) : \
 	 (x).offset > (y).offset ? (x) : (y))
 
+/* returns true if x comes before y in queue order */
+#define QUEUE_POS_PRECEDES(x,y) \
+	(asyncQueuePagePrecedes((x).page, (y).page) || \
+	 ((x).page == (y).page && (x).offset < (y).offset))
+
 /*
  * Parameter determining how often we try to advance the tail pointer:
- * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
- * also the distance by which a backend in another database needs to be
- * behind before we'll decide we need to wake it up to advance its pointer.
+ * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.
  *
  * Resist the temptation to make this really large.  While that would save
  * work in some places, it would add cost in others.  In particular, this
@@ -246,6 +299,9 @@ typedef struct QueueBackendStatus
 	Oid			dboid;			/* backend's database OID, or InvalidOid */
 	ProcNumber	nextListener;	/* id of next listener, or INVALID_PROC_NUMBER */
 	QueuePosition pos;			/* backend has read queue up to here */
+	bool		wakeupPending;	/* signal sent but not yet processed */
+	bool		isAdvancing;	/* backend is advancing its position */
+	QueuePosition advancingPos; /* target position backend is advancing to */
 } QueueBackendStatus;
 
 /*
@@ -260,9 +316,10 @@ typedef struct QueueBackendStatus
  * (since no other backend will inspect it).
  *
  * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
- * entries of other backends and also change the head pointer. When holding
- * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends
- * can change the tail pointers.
+ * entries of other backends and also change the head pointer. They can
+ * also advance other backends' queue positions, unless they are not
+ * in the process of doing that themselves. When holding both NotifyQueueLock and
+ * NotifyQueueTailLock in EXCLUSIVE mode, backends can change the tail pointers.
  *
  * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
  * the control lock for the pg_notify SLRU buffers.
@@ -288,11 +345,16 @@ typedef struct AsyncQueueControl
 	ProcNumber	firstListener;	/* id of first listener, or
 								 * INVALID_PROC_NUMBER */
 	TimestampTz lastQueueFillWarn;	/* time of last queue-full msg */
+	dsa_handle	channelHashDSA;
+	dshash_table_handle channelHashDSH;
 	QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
 } AsyncQueueControl;
 
 static AsyncQueueControl *asyncQueueControl;
 
+static dsa_area *channelDSA = NULL;
+static dshash_table *channelHash = NULL;
+
 #define QUEUE_HEAD					(asyncQueueControl->head)
 #define QUEUE_TAIL					(asyncQueueControl->tail)
 #define QUEUE_STOP_PAGE				(asyncQueueControl->stopPage)
@@ -301,6 +363,9 @@ static AsyncQueueControl *asyncQueueControl;
 #define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)		(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_WAKEUP_PENDING(i)	(asyncQueueControl->backend[i].wakeupPending)
+#define QUEUE_BACKEND_IS_ADVANCING(i)	(asyncQueueControl->backend[i].isAdvancing)
+#define QUEUE_BACKEND_ADVANCING_POS(i)	(asyncQueueControl->backend[i].advancingPos)
 
 /*
  * The SLRU buffer area through which we access the notification queue
@@ -313,16 +378,16 @@ static SlruCtlData NotifyCtlData;
 #define QUEUE_FULL_WARN_INTERVAL	5000	/* warn at most once every 5s */
 
 /*
- * listenChannels identifies the channels we are actually listening to
- * (ie, have committed a LISTEN on).  It is a simple list of channel names,
+ * listenChannelsHash identifies the channels we are actually listening to
+ * (ie, have committed a LISTEN on).  It is a hash table of channel names,
  * allocated in TopMemoryContext.
  */
-static List *listenChannels = NIL;	/* list of C strings */
+static HTAB *listenChannelsHash = NULL;
 
 /*
  * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
  * all actions requested in the current transaction.  As explained above,
- * we don't actually change listenChannels until we reach transaction commit.
+ * we don't actually change listenChannelsHash until we reach transaction commit.
  *
  * The list is kept in CurTransactionContext.  In subtransactions, each
  * subtransaction has its own list in its own CurTransactionContext, but
@@ -391,6 +456,7 @@ typedef struct NotificationList
 	int			nestingLevel;	/* current transaction nesting depth */
 	List	   *events;			/* list of Notification structs */
 	HTAB	   *hashtab;		/* hash of NotificationHash structs, or NULL */
+	HTAB	   *channelHashtab; /* hash of unique channel names, or NULL */
 	struct NotificationList *upper; /* details for upper transaction levels */
 } NotificationList;
 
@@ -401,6 +467,11 @@ struct NotificationHash
 	Notification *event;		/* => the actual Notification struct */
 };
 
+struct ChannelHash
+{
+	char		channel[NAMEDATALEN];
+};
+
 static NotificationList *pendingNotifies = NULL;
 
 /*
@@ -418,6 +489,26 @@ static bool unlistenExitRegistered = false;
 /* True if we're currently registered as a listener in asyncQueueControl */
 static bool amRegisteredListener = false;
 
+/*
+ * Queue head positions for direct advancement.
+ * These are captured during PreCommit_Notify while holding the heavyweight
+ * lock on database 0, ensuring no other backend can insert notifications
+ * between them.  SignalBackends uses these to advance idle backends.
+ */
+static QueuePosition queueHeadBeforeWrite;
+static QueuePosition queueHeadAfterWrite;
+
+/*
+ * List of channels with pending notifications in the current transaction.
+ */
+static List *pendingNotifyChannels = NIL;
+
+/*
+ * Arrays for SignalBackends.
+ */
+static int32 *notifySignalPids = NULL;
+static ProcNumber *notifySignalProcs = NULL;
+
 /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
 static bool tryAdvanceTail = false;
 
@@ -428,11 +519,10 @@ bool		Trace_notify = false;
 int			max_notify_queue_pages = 1048576;
 
 /* local function prototypes */
-static inline int64 asyncQueuePageDiff(int64 p, int64 q);
 static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
 static void queue_listen(ListenActionKind action, const char *channel);
 static void Async_UnlistenOnExit(int code, Datum arg);
-static void Exec_ListenPreCommit(void);
+static void Exec_ListenPreCommit(const char *channel);
 static void Exec_ListenCommit(const char *channel);
 static void Exec_UnlistenCommit(const char *channel);
 static void Exec_UnlistenAllCommit(void);
@@ -456,16 +546,9 @@ static void AddEventToPendingNotifies(Notification *n);
 static uint32 notification_hash(const void *key, Size keysize);
 static int	notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
-
-/*
- * Compute the difference between two queue page numbers.
- * Previously this function accounted for a wraparound.
- */
-static inline int64
-asyncQueuePageDiff(int64 p, int64 q)
-{
-	return p - q;
-}
+static inline void ChannelHashPrepareKey(ChannelHashKey *key, Oid dboid, const char *channel);
+static dshash_hash channelHashFunc(const void *key, size_t size, void *arg);
+static void initChannelHash(void);
 
 /*
  * Determines whether p precedes q.
@@ -477,6 +560,126 @@ asyncQueuePagePrecedes(int64 p, int64 q)
 	return p < q;
 }
 
+/*
+ * channelHashFunc
+ *		Hash function for channel keys.
+ */
+static dshash_hash
+channelHashFunc(const void *key, size_t size, void *arg)
+{
+	const ChannelHashKey *k = (const ChannelHashKey *) key;
+	dshash_hash h;
+
+	h = DatumGetUInt32(hash_uint32(k->dboid));
+	h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
+								 strnlen(k->channel, NAMEDATALEN)));
+
+	return h;
+}
+
+/* parameters for the channel hash table */
+static const dshash_parameters channelDSHParams = {
+	sizeof(ChannelHashKey),
+	sizeof(ChannelEntry),
+	dshash_memcmp,
+	channelHashFunc,
+	dshash_memcpy,
+	LWTRANCHE_NOTIFY_CHANNEL_HASH
+};
+
+/*
+ * initChannelHash
+ *		Lazy initialization of the channel hash table.
+ */
+static void
+initChannelHash(void)
+{
+	MemoryContext oldcontext;
+
+	/* Quick exit if we already did this */
+	if (asyncQueueControl->channelHashDSH != DSHASH_HANDLE_INVALID &&
+		channelHash != NULL)
+		return;
+
+	/* Otherwise, use a lock to ensure only one process creates the table */
+	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+	/* Be sure any local memory allocated by DSA routines is persistent */
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+	if (asyncQueueControl->channelHashDSH == DSHASH_HANDLE_INVALID)
+	{
+		/* Initialize dynamic shared hash table for channel hash */
+		channelDSA = dsa_create(LWTRANCHE_NOTIFY_CHANNEL_HASH);
+		dsa_pin(channelDSA);
+		dsa_pin_mapping(channelDSA);
+		channelHash = dshash_create(channelDSA, &channelDSHParams, NULL);
+
+		/* Store handles in shared memory for other backends to use */
+		asyncQueueControl->channelHashDSA = dsa_get_handle(channelDSA);
+		asyncQueueControl->channelHashDSH =
+			dshash_get_hash_table_handle(channelHash);
+	}
+	else if (!channelHash)
+	{
+		/* Attach to existing dynamic shared hash table */
+		channelDSA = dsa_attach(asyncQueueControl->channelHashDSA);
+		dsa_pin_mapping(channelDSA);
+		channelHash = dshash_attach(channelDSA, &channelDSHParams,
+									asyncQueueControl->channelHashDSH,
+									NULL);
+	}
+
+	MemoryContextSwitchTo(oldcontext);
+	LWLockRelease(NotifyQueueLock);
+}
+
+/*
+ * initListenChannelsHash
+ *		Lazy initialization of the local listen channels hash table.
+ */
+static void
+initListenChannelsHash(void)
+{
+	HASHCTL		hash_ctl;
+
+	/* Quick exit if we already did this */
+	if (listenChannelsHash != NULL)
+		return;
+
+	/* Initialize local hash table for this backend's listened channels */
+	memset(&hash_ctl, 0, sizeof(hash_ctl));
+	hash_ctl.keysize = NAMEDATALEN;
+	hash_ctl.entrysize = sizeof(struct ChannelHash);
+
+	listenChannelsHash =
+		hash_create("Listen Channels",
+					64,
+					&hash_ctl,
+					HASH_ELEM | HASH_STRINGS);
+}
+
+/*
+ * initSignalArrays
+ *		Lazy initialization of the signal arrays.
+ */
+static void
+initSignalArrays(void)
+{
+	MemoryContext oldcontext;
+
+	if (notifySignalProcs != NULL)
+		return;
+
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+	if (notifySignalPids == NULL)
+		notifySignalPids = (int32 *) palloc(MaxBackends * sizeof(int32));
+	notifySignalProcs = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
 /*
  * Report space needed for our shared memory area
  */
@@ -520,12 +723,17 @@ AsyncShmemInit(void)
 		QUEUE_STOP_PAGE = 0;
 		QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
 		asyncQueueControl->lastQueueFillWarn = 0;
+		asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID;
+		asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID;
 		for (int i = 0; i < MaxBackends; i++)
 		{
 			QUEUE_BACKEND_PID(i) = InvalidPid;
 			QUEUE_BACKEND_DBOID(i) = InvalidOid;
 			QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+			SET_QUEUE_POS(QUEUE_BACKEND_ADVANCING_POS(i), 0, 0);
+			QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
+			QUEUE_BACKEND_IS_ADVANCING(i) = false;
 		}
 	}
 
@@ -656,6 +864,7 @@ Async_Notify(const char *channel, const char *payload)
 		notifies->events = list_make1(n);
 		/* We certainly don't need a hashtable yet */
 		notifies->hashtab = NULL;
+		notifies->channelHashtab = NULL;
 		notifies->upper = pendingNotifies;
 		pendingNotifies = notifies;
 	}
@@ -682,7 +891,7 @@ Async_Notify(const char *channel, const char *payload)
  *		Common code for listen, unlisten, unlisten all commands.
  *
  *		Adds the request to the list of pending actions.
- *		Actual update of the listenChannels list happens during transaction
+ *		Actual update of the listenChannelsHash happens during transaction
  *		commit.
  */
 static void
@@ -782,30 +991,49 @@ Async_UnlistenAll(void)
  * SQL function: return a set of the channel names this backend is actively
  * listening to.
  *
- * Note: this coding relies on the fact that the listenChannels list cannot
+ * Note: this coding relies on the fact that the listenChannelsHash cannot
  * change within a transaction.
  */
 Datum
 pg_listening_channels(PG_FUNCTION_ARGS)
 {
 	FuncCallContext *funcctx;
+	HASH_SEQ_STATUS *status;
 
 	/* stuff done only on the first call of the function */
 	if (SRF_IS_FIRSTCALL())
 	{
+		MemoryContext oldcontext;
+
 		/* create a function context for cross-call persistence */
 		funcctx = SRF_FIRSTCALL_INIT();
+
+		/* Initialize hash table iteration if we have any channels */
+		if (listenChannelsHash != NULL)
+		{
+			oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+			status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
+			hash_seq_init(status, listenChannelsHash);
+			funcctx->user_fctx = status;
+			MemoryContextSwitchTo(oldcontext);
+		}
+		else
+		{
+			funcctx->user_fctx = NULL;
+		}
 	}
 
 	/* stuff done on every call of the function */
 	funcctx = SRF_PERCALL_SETUP();
+	status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
 
-	if (funcctx->call_cntr < list_length(listenChannels))
+	if (status != NULL)
 	{
-		char	   *channel = (char *) list_nth(listenChannels,
-												funcctx->call_cntr);
+		struct ChannelHash *entry;
 
-		SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
+		entry = (struct ChannelHash *) hash_seq_search(status);
+		if (entry != NULL)
+			SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(entry->channel));
 	}
 
 	SRF_RETURN_DONE(funcctx);
@@ -877,7 +1105,7 @@ PreCommit_Notify(void)
 			switch (actrec->action)
 			{
 				case LISTEN_LISTEN:
-					Exec_ListenPreCommit();
+					Exec_ListenPreCommit(actrec->channel);
 					break;
 				case LISTEN_UNLISTEN:
 					/* there is no Exec_UnlistenPreCommit() */
@@ -893,6 +1121,36 @@ PreCommit_Notify(void)
 	if (pendingNotifies)
 	{
 		ListCell   *nextNotify;
+		bool		firstIteration = true;
+
+		/*
+		 * Build list of unique channels for SignalBackends().
+		 *
+		 * If we have a channelHashtab, use it to efficiently get the unique
+		 * channels.  Otherwise, fall back to the linear approach.
+		 */
+		pendingNotifyChannels = NIL;
+		if (pendingNotifies->channelHashtab != NULL)
+		{
+			HASH_SEQ_STATUS status;
+			struct ChannelHash *channelEntry;
+
+			hash_seq_init(&status, pendingNotifies->channelHashtab);
+			while ((channelEntry = (struct ChannelHash *) hash_seq_search(&status)) != NULL)
+				pendingNotifyChannels = lappend(pendingNotifyChannels, channelEntry->channel);
+		}
+		else
+		{
+			/* Linear approach for small number of notifications */
+			foreach_ptr(Notification, n, pendingNotifies->events)
+			{
+				char	   *channel = n->data;
+
+				/* Add if not already in list */
+				if (!list_member_ptr(pendingNotifyChannels, channel))
+					pendingNotifyChannels = lappend(pendingNotifyChannels, channel);
+			}
+		}
 
 		/*
 		 * Make sure that we have an XID assigned to the current transaction.
@@ -902,6 +1160,14 @@ PreCommit_Notify(void)
 		 */
 		(void) GetCurrentTransactionId();
 
+		/*
+		 * We will be calling SignalBackends() at AtCommit_Notify time, so
+		 * make sure its auxiliary data structures exist now, where an ERROR
+		 * will still abort the transaction cleanly.
+		 */
+		initSignalArrays();
+		initChannelHash();
+
 		/*
 		 * Serialize writers by acquiring a special lock that we hold till
 		 * after commit.  This ensures that queue entries appear in commit
@@ -921,6 +1187,22 @@ PreCommit_Notify(void)
 		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
 						 AccessExclusiveLock);
 
+		/*
+		 * For the direct advancement optimization in SignalBackends(), we
+		 * need to ensure that no other backend can insert queue entries
+		 * between queueHeadBeforeWrite and queueHeadAfterWrite.  The
+		 * heavyweight lock above provides this guarantee, since it serializes
+		 * all writers.
+		 *
+		 * Note: if the heavyweight lock were ever removed for scalability
+		 * reasons, we could achieve the same guarantee by holding
+		 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
+		 * than releasing and reacquiring it for each page as we do below.
+		 */
+
+		/* Initialize queueHeadBeforeWrite to a safe default */
+		SET_QUEUE_POS(queueHeadBeforeWrite, 0, 0);
+
 		/* Now push the notifications into the queue */
 		nextNotify = list_head(pendingNotifies->events);
 		while (nextNotify != NULL)
@@ -938,12 +1220,20 @@ PreCommit_Notify(void)
 			 * point in time we can still roll the transaction back.
 			 */
 			LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+			if (firstIteration)
+			{
+				queueHeadBeforeWrite = QUEUE_HEAD;
+				firstIteration = false;
+			}
+
 			asyncQueueFillWarning();
 			if (asyncQueueIsFull())
 				ereport(ERROR,
 						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 						 errmsg("too many notifications in the NOTIFY queue")));
 			nextNotify = asyncQueueAddEntries(nextNotify);
+			queueHeadAfterWrite = QUEUE_HEAD;
+
 			LWLockRelease(NotifyQueueLock);
 		}
 
@@ -956,7 +1246,7 @@ PreCommit_Notify(void)
  *
  *		This is called at transaction commit, after committing to clog.
  *
- *		Update listenChannels and clear transaction-local state.
+ *		Update listenChannelsHash and clear transaction-local state.
  *
  *		If we issued any notifications in the transaction, send signals to
  *		listening backends (possibly including ourselves) to process them.
@@ -1001,7 +1291,8 @@ AtCommit_Notify(void)
 	}
 
 	/* If no longer listening to anything, get out of listener array */
-	if (amRegisteredListener && listenChannels == NIL)
+	if (amRegisteredListener &&
+		(listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0))
 		asyncQueueUnregister();
 
 	/*
@@ -1037,147 +1328,270 @@ AtCommit_Notify(void)
  * This function must make sure we are ready to catch any incoming messages.
  */
 static void
-Exec_ListenPreCommit(void)
+Exec_ListenPreCommit(const char *channel)
 {
-	QueuePosition head;
-	QueuePosition max;
-	ProcNumber	prevListener;
+	ChannelHashKey key;
+	ChannelEntry *entry;
+	bool		found;
+	ListenerEntry *listeners;
 
 	/*
-	 * Nothing to do if we are already listening to something, nor if we
-	 * already ran this routine in this transaction.
+	 * If this is our first LISTEN in this transaction, register as a
+	 * listener.
 	 */
-	if (amRegisteredListener)
-		return;
-
-	if (Trace_notify)
-		elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
-
-	/*
-	 * Before registering, make sure we will unlisten before dying. (Note:
-	 * this action does not get undone if we abort later.)
-	 */
-	if (!unlistenExitRegistered)
+	if (!amRegisteredListener)
 	{
-		before_shmem_exit(Async_UnlistenOnExit, 0);
-		unlistenExitRegistered = true;
-	}
+		QueuePosition head;
+		QueuePosition max;
+		ProcNumber	prevListener;
 
-	/*
-	 * This is our first LISTEN, so establish our pointer.
-	 *
-	 * We set our pointer to the global tail pointer and then move it forward
-	 * over already-committed notifications.  This ensures we cannot miss any
-	 * not-yet-committed notifications.  We might get a few more but that
-	 * doesn't hurt.
-	 *
-	 * In some scenarios there might be a lot of committed notifications that
-	 * have not yet been pruned away (because some backend is being lazy about
-	 * reading them).  To reduce our startup time, we can look at other
-	 * backends and adopt the maximum "pos" pointer of any backend that's in
-	 * our database; any notifications it's already advanced over are surely
-	 * committed and need not be re-examined by us.  (We must consider only
-	 * backends connected to our DB, because others will not have bothered to
-	 * check committed-ness of notifications in our DB.)
-	 *
-	 * We need exclusive lock here so we can look at other backends' entries
-	 * and manipulate the list links.
-	 */
-	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
-	head = QUEUE_HEAD;
-	max = QUEUE_TAIL;
-	prevListener = INVALID_PROC_NUMBER;
-	for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
-	{
-		if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
-			max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
-		/* Also find last listening backend before this one */
-		if (i < MyProcNumber)
-			prevListener = i;
-	}
-	QUEUE_BACKEND_POS(MyProcNumber) = max;
-	QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
-	QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
-	/* Insert backend into list of listeners at correct position */
-	if (prevListener != INVALID_PROC_NUMBER)
-	{
-		QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener);
-		QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
-	}
-	else
-	{
-		QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
-		QUEUE_FIRST_LISTENER = MyProcNumber;
-	}
-	LWLockRelease(NotifyQueueLock);
+		if (Trace_notify)
+			elog(DEBUG1, "Exec_ListenPreCommit(%s,%d)", channel, MyProcPid);
 
-	/* Now we are listed in the global array, so remember we're listening */
-	amRegisteredListener = true;
+		/*
+		 * Before registering, make sure we will unlisten before dying. (Note:
+		 * this action does not get undone if we abort later.)
+		 */
+		if (!unlistenExitRegistered)
+		{
+			before_shmem_exit(Async_UnlistenOnExit, 0);
+			unlistenExitRegistered = true;
+		}
 
-	/*
-	 * Try to move our pointer forward as far as possible.  This will skip
-	 * over already-committed notifications, which we want to do because they
-	 * might be quite stale.  Note that we are not yet listening on anything,
-	 * so we won't deliver such notifications to our frontend.  Also, although
-	 * our transaction might have executed NOTIFY, those message(s) aren't
-	 * queued yet so we won't skip them here.
-	 */
-	if (!QUEUE_POS_EQUAL(max, head))
-		asyncQueueReadAllNotifications();
-}
+		/*
+		 * This is our first LISTEN, so establish our pointer.
+		 *
+		 * We set our pointer to the global tail pointer and then move it
+		 * forward over already-committed notifications.  This ensures we
+		 * cannot miss any not-yet-committed notifications.  We might get a
+		 * few more but that doesn't hurt.
+		 *
+		 * In some scenarios there might be a lot of committed notifications
+		 * that have not yet been pruned away (because some backend is being
+		 * lazy about reading them).  To reduce our startup time, we can look
+		 * at other backends and adopt the maximum "pos" pointer of any
+		 * backend that's in our database; any notifications it's already
+		 * advanced over are surely committed and need not be re-examined by
+		 * us.  (We must consider only backends connected to our DB, because
+		 * others will not have bothered to check committed-ness of
+		 * notifications in our DB.)
+		 *
+		 * We need exclusive lock here so we can look at other backends'
+		 * entries and manipulate the list links.
+		 */
+		LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+		head = QUEUE_HEAD;
+		max = QUEUE_TAIL;
+		prevListener = INVALID_PROC_NUMBER;
+		for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+		{
+			if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+				max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
+			/* Also find last listening backend before this one */
+			if (i < MyProcNumber)
+				prevListener = i;
+		}
+		QUEUE_BACKEND_POS(MyProcNumber) = max;
+		QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
+		QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
+		/* Insert backend into list of listeners at correct position */
+		if (prevListener != INVALID_PROC_NUMBER)
+		{
+			QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener);
+			QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
+		}
+		else
+		{
+			QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
+			QUEUE_FIRST_LISTENER = MyProcNumber;
+		}
+		LWLockRelease(NotifyQueueLock);
 
-/*
- * Exec_ListenCommit --- subroutine for AtCommit_Notify
- *
- * Add the channel to the list of channels we are listening on.
- */
-static void
-Exec_ListenCommit(const char *channel)
-{
-	MemoryContext oldcontext;
+		/* Now we are listed in the global array, so remember we're listening */
+		amRegisteredListener = true;
+
+		/*
+		 * Try to move our pointer forward as far as possible.  This will skip
+		 * over already-committed notifications, which we want to do because
+		 * they might be quite stale.  Note that we are not yet listening on
+		 * anything, so we won't deliver such notifications to our frontend.
+		 * Also, although our transaction might have executed NOTIFY, those
+		 * message(s) aren't queued yet so we won't skip them here.
+		 */
+		if (!QUEUE_POS_EQUAL(max, head))
+			asyncQueueReadAllNotifications();
+	}
 
 	/* Do nothing if we are already listening on this channel */
 	if (IsListeningOn(channel))
 		return;
 
 	/*
-	 * Add the new channel name to listenChannels.
-	 *
-	 * XXX It is theoretically possible to get an out-of-memory failure here,
-	 * which would be bad because we already committed.  For the moment it
-	 * doesn't seem worth trying to guard against that, but maybe improve this
-	 * later.
-	 */
-	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
-	listenChannels = lappend(listenChannels, pstrdup(channel));
-	MemoryContextSwitchTo(oldcontext);
+	 * Add the channel to listenChannelsHash.  This can OOM, but we're still
+	 * in PreCommit so the transaction can abort safely.
+	 */
+	initListenChannelsHash();
+	(void) hash_search(listenChannelsHash, channel, HASH_ENTER, NULL);
+
+	/*
+	 * Now update the shared channelHash.  We insert an entry with
+	 * active=false, which will be flipped to true in Exec_ListenCommit.
+	 */
+	initChannelHash();
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	/*
+	 * Find or create the channel entry.  For new entries, we initialize
+	 * listenersArray to InvalidDsaPointer as a marker.
+	 */
+	entry = dshash_find_or_insert(channelHash, &key, &found);
+
+	if (!found)
+		entry->listenersArray = InvalidDsaPointer;
+
+	if (!DsaPointerIsValid(entry->listenersArray))
+	{
+		/* First listener for this channel */
+		entry->listenersArray = dsa_allocate(channelDSA,
+											 sizeof(ListenerEntry) * INITIAL_LISTENERS_ARRAY_SIZE);
+		entry->numListeners = 0;
+		entry->allocatedListeners = INITIAL_LISTENERS_ARRAY_SIZE;
+	}
+
+	listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+												  entry->listenersArray);
+
+	if (entry->numListeners >= entry->allocatedListeners)
+	{
+		int			new_size = entry->allocatedListeners * 2;
+		dsa_pointer new_array = dsa_allocate(channelDSA,
+											 sizeof(ListenerEntry) * new_size);
+		ListenerEntry *new_listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+																		 new_array);
+
+		memcpy(new_listeners, listeners,
+			   sizeof(ListenerEntry) * entry->numListeners);
+
+		dsa_free(channelDSA, entry->listenersArray);
+		entry->listenersArray = new_array;
+		entry->allocatedListeners = new_size;
+		listeners = new_listeners;
+	}
+
+	listeners[entry->numListeners].procno = MyProcNumber;
+	listeners[entry->numListeners].active = false;
+	entry->numListeners++;
+
+	dshash_release_lock(channelHash, entry);
+}
+
+/*
+ * Exec_ListenCommit --- subroutine for AtCommit_Notify
+ *
+ * Activate the channel entry that was pre-allocated in Exec_ListenPreCommit.
+ * This is called after commit to clog, so it's important to have very low
+ * probability of failure.  By design, all we do here is set the active
+ * flag.
+ */
+static void
+Exec_ListenCommit(const char *channel)
+{
+	ChannelHashKey key;
+	ChannelEntry *entry;
+	ListenerEntry *listeners;
+	int			i;
+
+	if (Trace_notify)
+		elog(DEBUG1, "Exec_ListenCommit(%s,%d)", channel, MyProcPid);
+
+	/*
+	 * The entry has been created in Exec_ListenPreCommit. If we get
+	 * here, channelHash and the entry must exist.
+	 */
+	Assert(channelHash != NULL);
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+	entry = dshash_find(channelHash, &key, true);
+	Assert(entry != NULL);
+
+	listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+												  entry->listenersArray);
+
+	for (i = 0; i < entry->numListeners; i++)
+	{
+		if (listeners[i].procno == MyProcNumber)
+		{
+			listeners[i].active = true;
+			dshash_release_lock(channelHash, entry);
+			return;
+		}
+	}
+
+	/* If the entry is not found, it's a bug. */
+	Assert(false);
+	dshash_release_lock(channelHash, entry);
 }
 
 /*
  * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
  *
- * Remove the specified channel name from listenChannels.
+ *		Unlisten the specified channel for this backend.
  */
 static void
 Exec_UnlistenCommit(const char *channel)
 {
-	ListCell   *q;
+	ChannelHashKey key;
+	ChannelEntry *entry;
+	ListenerEntry *listeners;
+	int			i;
 
 	if (Trace_notify)
 		elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
 
-	foreach(q, listenChannels)
+	/* Remove from our local cache */
+	if (listenChannelsHash != NULL)
+		(void) hash_search(listenChannelsHash, channel, HASH_REMOVE, NULL);
+
+	/* Now remove from the shared channelHash */
+	if (channelHash == NULL)
+		return;
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	/* Look up the channel with exclusive lock so we can modify it */
+	entry = dshash_find(channelHash, &key, true);
+	if (entry == NULL)
+		return;
+
+	listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+												  entry->listenersArray);
+
+	for (i = 0; i < entry->numListeners; i++)
 	{
-		char	   *lchan = (char *) lfirst(q);
-
-		if (strcmp(lchan, channel) == 0)
+		if (listeners[i].procno == MyProcNumber)
 		{
-			listenChannels = foreach_delete_current(listenChannels, q);
-			pfree(lchan);
-			break;
+			entry->numListeners--;
+			if (i < entry->numListeners)
+				memmove(&listeners[i], &listeners[i + 1],
+						sizeof(ListenerEntry) * (entry->numListeners - i));
+
+			if (entry->numListeners == 0)
+			{
+				/* Last listener for this channel */
+				dsa_free(channelDSA, entry->listenersArray);
+				dshash_delete_entry(channelHash, entry);
+			}
+			else
+			{
+				dshash_release_lock(channelHash, entry);
+			}
+
+			return;
 		}
 	}
 
+	dshash_release_lock(channelHash, entry);
+
 	/*
 	 * We do not complain about unlistening something not being listened;
 	 * should we?
@@ -1192,34 +1606,68 @@ Exec_UnlistenCommit(const char *channel)
 static void
 Exec_UnlistenAllCommit(void)
 {
+	dshash_seq_status status;
+	ChannelEntry *entry;
+
 	if (Trace_notify)
 		elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
 
-	list_free_deep(listenChannels);
-	listenChannels = NIL;
+	/* Clear our local cache */
+	if (listenChannelsHash != NULL)
+	{
+		hash_destroy(listenChannelsHash);
+		listenChannelsHash = NULL;
+	}
+
+	/* Now remove from the shared channelHash */
+	if (channelHash == NULL)
+		return;
+
+	dshash_seq_init(&status, channelHash, true);
+	while ((entry = dshash_seq_next(&status)) != NULL)
+	{
+		if (entry->key.dboid == MyDatabaseId)
+		{
+			ListenerEntry *listeners;
+			int			i;
+
+			listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+														  entry->listenersArray);
+
+			for (i = 0; i < entry->numListeners; i++)
+			{
+				if (listeners[i].procno == MyProcNumber)
+				{
+					entry->numListeners--;
+					if (i < entry->numListeners)
+						memmove(&listeners[i], &listeners[i + 1],
+								sizeof(ListenerEntry) * (entry->numListeners - i));
+
+					if (entry->numListeners == 0)
+					{
+						dsa_free(channelDSA, entry->listenersArray);
+						dshash_delete_current(&status);
+					}
+					break;
+				}
+			}
+		}
+	}
+	dshash_seq_term(&status);
 }
 
 /*
  * Test whether we are actively listening on the given channel name.
  *
  * Note: this function is executed for every notification found in the queue.
- * Perhaps it is worth further optimization, eg convert the list to a sorted
- * array so we can binary-search it.  In practice the list is likely to be
- * fairly short, though.
  */
 static bool
 IsListeningOn(const char *channel)
 {
-	ListCell   *p;
+	if (listenChannelsHash == NULL)
+		return false;
 
-	foreach(p, listenChannels)
-	{
-		char	   *lchan = (char *) lfirst(p);
-
-		if (strcmp(lchan, channel) == 0)
-			return true;
-	}
-	return false;
+	return (hash_search(listenChannelsHash, channel, HASH_FIND, NULL) != NULL);
 }
 
 /*
@@ -1229,7 +1677,7 @@ IsListeningOn(const char *channel)
 static void
 asyncQueueUnregister(void)
 {
-	Assert(listenChannels == NIL);	/* else caller error */
+	Assert(listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0);	/* else caller error */
 
 	if (!amRegisteredListener)	/* nothing to do */
 		return;
@@ -1241,6 +1689,7 @@ asyncQueueUnregister(void)
 	/* Mark our entry as invalid */
 	QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
 	QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
+	QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
 	/* and remove it from the list */
 	if (QUEUE_FIRST_LISTENER == MyProcNumber)
 		QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber);
@@ -1565,12 +2014,21 @@ asyncQueueFillWarning(void)
 /*
  * Send signals to listening backends.
  *
- * Normally we signal only backends in our own database, since only those
- * backends could be interested in notifies we send.  However, if there's
- * notify traffic in our database but no traffic in another database that
- * does have listener(s), those listeners will fall further and further
- * behind.  Waken them anyway if they're far enough behind, so that they'll
- * advance their queue position pointers, allowing the global tail to advance.
+ * Normally we signal only backends in our own database, that are
+ * listening on the channels with pending notifies, since only those
+ * backends are interested in notifies we send.
+ *
+ * Backends that are not interested in our notifies, that are known
+ * to still be positioned at the old queue head, or anywhere in the
+ * queue region we just wrote, can be safely advanced directly to the
+ * new head, since that region is known to contain only our own
+ * notifications.  This avoids unnecessary wakeups when there is
+ * nothing of interest to them.
+ *
+ * Backends that are not interested in our notifies, that are advancing
+ * to a target position before the new queue head, or that are not
+ * advancing and are stationary at a position before the old queue head
+ * needs to be signaled since notifications could otherwise be delayed.
  *
  * Since we know the ProcNumber and the Pid the signaling is quite cheap.
  *
@@ -1580,60 +2038,118 @@ asyncQueueFillWarning(void)
 static void
 SignalBackends(void)
 {
-	int32	   *pids;
-	ProcNumber *procnos;
 	int			count;
+	ListCell   *lc;
 
+	Assert(channelHash != NULL || pendingNotifyChannels == NIL);
+	Assert(notifySignalPids != NULL);
+	Assert(notifySignalProcs != NULL);
 	/*
 	 * Identify backends that we need to signal.  We don't want to send
 	 * signals while holding the NotifyQueueLock, so this loop just builds a
 	 * list of target PIDs.
-	 *
-	 * XXX in principle these pallocs could fail, which would be bad. Maybe
-	 * preallocate the arrays?  They're not that large, though.
 	 */
-	pids = (int32 *) palloc(MaxBackends * sizeof(int32));
-	procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
 	count = 0;
 
 	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
-	for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+	foreach(lc, pendingNotifyChannels)
 	{
-		int32		pid = QUEUE_BACKEND_PID(i);
-		QueuePosition pos;
+		char	   *channel = (char *) lfirst(lc);
+		ChannelEntry *entry = NULL;
+		ListenerEntry *listeners;
 
-		Assert(pid != InvalidPid);
-		pos = QUEUE_BACKEND_POS(i);
-		if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+		if (channelHash != NULL)
 		{
+			ChannelHashKey key;
+
+			ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+			entry = dshash_find(channelHash, &key, false);
+		}
+
+		if (entry == NULL)
+			continue;			/* No listeners registered for this channel */
+
+		listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+													  entry->listenersArray);
+
+		for (int j = 0; j < entry->numListeners; j++)
+		{
+			ProcNumber	i;
+			int32		pid;
+			QueuePosition pos;
+
 			/*
-			 * Always signal listeners in our own database, unless they're
-			 * already caught up (unlikely, but possible).
+			 * Only signal backends that have active=true.  Backends with
+			 * active=false have done LISTEN in PreCommit but not yet
+			 * committed, so they're not really listening yet.
 			 */
+			if (!listeners[j].active)
+				continue;
+
+			i = listeners[j].procno;
+
+			if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+				continue;
+
+			pos = QUEUE_BACKEND_POS(i);
+			pid = QUEUE_BACKEND_PID(i);
+
+			/* Skip if caught up */
 			if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
 				continue;
+
+			Assert(pid != InvalidPid);
+
+			QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+			notifySignalPids[count] = pid;
+			notifySignalProcs[count] = i;
+			count++;
 		}
-		else
+
+		dshash_release_lock(channelHash, entry);
+	}
+
+	if (pendingNotifies != NULL)
+	{
+		for (ProcNumber i = QUEUE_FIRST_LISTENER;
+			 i != INVALID_PROC_NUMBER;
+			 i = QUEUE_NEXT_LISTENER(i))
 		{
-			/*
-			 * Listeners in other databases should be signaled only if they
-			 * are far behind.
-			 */
-			if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
-								   QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
+			QueuePosition pos;
+			int32		pid;
+
+			if (QUEUE_BACKEND_WAKEUP_PENDING(i))
 				continue;
+
+			pos = QUEUE_BACKEND_POS(i);
+			pid = QUEUE_BACKEND_PID(i);
+
+			if (QUEUE_BACKEND_IS_ADVANCING(i) ?
+				QUEUE_POS_PRECEDES(QUEUE_BACKEND_ADVANCING_POS(i), queueHeadAfterWrite) :
+				QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite))
+			{
+				Assert(pid != InvalidPid);
+
+				QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+				notifySignalPids[count] = pid;
+				notifySignalProcs[count] = i;
+				count++;
+			}
+			else if (!QUEUE_BACKEND_IS_ADVANCING(i) &&
+					 QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite))
+			{
+				Assert(!QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite));
+
+				QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+			}
 		}
-		/* OK, need to signal this one */
-		pids[count] = pid;
-		procnos[count] = i;
-		count++;
 	}
 	LWLockRelease(NotifyQueueLock);
 
 	/* Now send signals */
 	for (int i = 0; i < count; i++)
 	{
-		int32		pid = pids[i];
+		int32		pid = notifySignalPids[i];
 
 		/*
 		 * If we are signaling our own process, no need to involve the kernel;
@@ -1651,12 +2167,10 @@ SignalBackends(void)
 		 * NotifyQueueLock; which is unlikely but certainly possible. So we
 		 * just log a low-level debug message if it happens.
 		 */
-		if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
+		if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, notifySignalProcs[i]) < 0)
 			elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
 	}
 
-	pfree(pids);
-	pfree(procnos);
 }
 
 /*
@@ -1673,12 +2187,97 @@ AtAbort_Notify(void)
 	/*
 	 * If we LISTEN but then roll back the transaction after PreCommit_Notify,
 	 * we have registered as a listener but have not made any entry in
-	 * listenChannels.  In that case, deregister again.
+	 * listenChannelsHash.  In that case, deregister again.
 	 */
-	if (amRegisteredListener && listenChannels == NIL)
+	if (amRegisteredListener &&
+		(listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0))
 		asyncQueueUnregister();
 
-	/* And clean up */
+	/*
+	 * Remove any channels we added during Exec_ListenPreCommit.  We need to
+	 * clean up both the local listenChannelsHash and any inactive entries in
+	 * the shared channelHash to avoid accumulating stale data.
+	 */
+	if (pendingActions != NULL)
+	{
+		ListCell   *p;
+
+		foreach(p, pendingActions->actions)
+		{
+			ListenAction *actrec = (ListenAction *) lfirst(p);
+			ChannelHashKey key;
+			ChannelEntry *entry;
+			ListenerEntry *listeners;
+			bool		removeFromLocal;
+			bool		found;
+			int			i;
+
+			if (actrec->action != LISTEN_LISTEN)
+				continue;
+
+			/*
+			 * For each LISTEN action, determine if we should clean up the
+			 * local and/or shared hash entries.  If we have an active=true
+			 * entry in the shared hash, we were already listening from a
+			 * previous transaction, so leave everything alone.  Otherwise,
+			 * clean up what this transaction added.
+			 */
+			removeFromLocal = true;
+			found = false;
+
+			if (channelHash != NULL)
+			{
+				ChannelHashPrepareKey(&key, MyDatabaseId, actrec->channel);
+				entry = dshash_find(channelHash, &key, true);
+
+				if (entry != NULL)
+				{
+					listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+																  entry->listenersArray);
+
+					for (i = 0; i < entry->numListeners; i++)
+					{
+						if (listeners[i].procno == MyProcNumber)
+						{
+							found = true;
+
+							if (listeners[i].active)
+							{
+								/* Already committed - leave both hashes alone */
+								removeFromLocal = false;
+							}
+							else
+							{
+								/* Inactive - remove from shared hash */
+								entry->numListeners--;
+								if (i < entry->numListeners)
+									memmove(&listeners[i], &listeners[i + 1],
+											sizeof(ListenerEntry) * (entry->numListeners - i));
+
+								if (entry->numListeners == 0)
+								{
+									dsa_free(channelDSA, entry->listenersArray);
+									dshash_delete_entry(channelHash, entry);
+								}
+								else
+									dshash_release_lock(channelHash, entry);
+							}
+							break;
+						}
+					}
+
+					if (!found)
+						dshash_release_lock(channelHash, entry);
+				}
+			}
+
+			/* Remove from local hash if appropriate */
+			if (removeFromLocal && listenChannelsHash != NULL)
+				(void) hash_search(listenChannelsHash, actrec->channel,
+								   HASH_REMOVE, NULL);
+		}
+	}
+
 	ClearPendingActionsAndNotifies();
 }
 
@@ -1854,20 +2453,29 @@ asyncQueueReadAllNotifications(void)
 	QueuePosition head;
 	Snapshot	snapshot;
 
-	/* Fetch current state */
+	/*
+	 * Fetch current state, indicate to others that we have woken up, and that
+	 * we now will be advancing our position.
+	 */
 	LWLockAcquire(NotifyQueueLock, LW_SHARED);
 	/* Assert checks that we have a valid state entry */
 	Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
+	QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
+	head = QUEUE_HEAD;
 	pos = QUEUE_BACKEND_POS(MyProcNumber);
-	head = QUEUE_HEAD;
-	LWLockRelease(NotifyQueueLock);
 
 	if (QUEUE_POS_EQUAL(pos, head))
 	{
 		/* Nothing to do, we have read all notifications already. */
+		LWLockRelease(NotifyQueueLock);
 		return;
 	}
 
+	QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = true;
+	QUEUE_BACKEND_ADVANCING_POS(MyProcNumber) = head;
+
+	LWLockRelease(NotifyQueueLock);
+
 	/*----------
 	 * Get snapshot we'll use to decide which xacts are still in progress.
 	 * This is trickier than it might seem, because of race conditions.
@@ -1954,6 +2562,8 @@ asyncQueueReadAllNotifications(void)
 
 		/* Update shared state */
 		LWLockAcquire(NotifyQueueLock, LW_SHARED);
+
+		QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
 		QUEUE_BACKEND_POS(MyProcNumber) = pos;
 		LWLockRelease(NotifyQueueLock);
 
@@ -2055,7 +2665,7 @@ asyncQueueProcessPageEntries(QueuePosition *current,
 			 * over it on the first LISTEN in a session, and not get stuck on
 			 * it indefinitely.
 			 */
-			if (listenChannels == NIL)
+			if (listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0)
 				continue;
 
 			if (TransactionIdDidCommit(qe->xid))
@@ -2310,7 +2920,7 @@ ProcessIncomingNotify(bool flush)
 	notifyInterruptPending = false;
 
 	/* Do nothing else if we aren't actively listening */
-	if (listenChannels == NIL)
+	if (listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0)
 		return;
 
 	if (Trace_notify)
@@ -2414,13 +3024,15 @@ AddEventToPendingNotifies(Notification *n)
 {
 	Assert(pendingNotifies->events != NIL);
 
-	/* Create the hash table if it's time to */
+	/* Create the hash tables if it's time to */
 	if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
 		pendingNotifies->hashtab == NULL)
 	{
 		HASHCTL		hash_ctl;
 		ListCell   *l;
 
+		Assert(pendingNotifies->channelHashtab == NULL);
+
 		/* Create the hash table */
 		hash_ctl.keysize = sizeof(Notification *);
 		hash_ctl.entrysize = sizeof(struct NotificationHash);
@@ -2433,10 +3045,22 @@ AddEventToPendingNotifies(Notification *n)
 						&hash_ctl,
 						HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
 
+		/* Create the channel hash table */
+		memset(&hash_ctl, 0, sizeof(hash_ctl));
+		hash_ctl.keysize = NAMEDATALEN;
+		hash_ctl.entrysize = sizeof(struct ChannelHash);
+		hash_ctl.hcxt = CurTransactionContext;
+		pendingNotifies->channelHashtab =
+			hash_create("Pending Notify Channels",
+						64L,
+						&hash_ctl,
+						HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
+
 		/* Insert all the already-existing events */
 		foreach(l, pendingNotifies->events)
 		{
 			Notification *oldn = (Notification *) lfirst(l);
+			char	   *channel = oldn->data;
 			bool		found;
 
 			(void) hash_search(pendingNotifies->hashtab,
@@ -2444,22 +3068,42 @@ AddEventToPendingNotifies(Notification *n)
 							   HASH_ENTER,
 							   &found);
 			Assert(!found);
+
+			/* Insert channel into channelHashtab */
+			(void) hash_search(pendingNotifies->channelHashtab,
+							   channel,
+							   HASH_ENTER,
+							   &found);
+			/* found may be true if multiple events on same channel */
 		}
 	}
 
 	/* Add new event to the list, in order */
 	pendingNotifies->events = lappend(pendingNotifies->events, n);
 
-	/* Add event to the hash table if needed */
+	/* Add event to the hash tables if needed */
 	if (pendingNotifies->hashtab != NULL)
 	{
 		bool		found;
 
+		Assert(pendingNotifies->channelHashtab != NULL);
+
 		(void) hash_search(pendingNotifies->hashtab,
 						   &n,
 						   HASH_ENTER,
 						   &found);
 		Assert(!found);
+
+		/* Add channel to channelHashtab */
+		{
+			char	   *channel = n->data;
+
+			(void) hash_search(pendingNotifies->channelHashtab,
+							   channel,
+							   HASH_ENTER,
+							   &found);
+			/* found may be true if we already have an event on this channel */
+		}
 	}
 }
 
@@ -2497,7 +3141,7 @@ notification_match(const void *key1, const void *key2, Size keysize)
 	return 1;					/* not equal */
 }
 
-/* Clear the pendingActions and pendingNotifies lists. */
+/* Clear the pendingActions, pendingNotifies, and pendingNotifyChannels lists. */
 static void
 ClearPendingActionsAndNotifies(void)
 {
@@ -2509,6 +3153,7 @@ ClearPendingActionsAndNotifies(void)
 	 */
 	pendingActions = NULL;
 	pendingNotifies = NULL;
+	pendingNotifyChannels = NIL;
 }
 
 /*
@@ -2519,3 +3164,16 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
 {
 	return check_slru_buffers("notify_buffers", newval);
 }
+
+
+/*
+ * ChannelHashPrepareKey
+ *		Prepare a channel key for use as a hash key.
+ */
+static inline void
+ChannelHashPrepareKey(ChannelHashKey *key, Oid dboid, const char *channel)
+{
+	memset(key, 0, sizeof(ChannelHashKey));
+	key->dboid = dboid;
+	strlcpy(key->channel, channel, NAMEDATALEN);
+}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index c1ac71ff7f2..7c2cf960093 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -369,6 +369,7 @@ SubtransBuffer	"Waiting for I/O on a sub-transaction SLRU buffer."
 MultiXactOffsetBuffer	"Waiting for I/O on a multixact offset SLRU buffer."
 MultiXactMemberBuffer	"Waiting for I/O on a multixact member SLRU buffer."
 NotifyBuffer	"Waiting for I/O on a <command>NOTIFY</command> message SLRU buffer."
+NotifyChannelHash	"Waiting to access the <command>NOTIFY</command> channel hash table."
 SerialBuffer	"Waiting for I/O on a serializable transaction conflict SLRU buffer."
 WALInsert	"Waiting to insert WAL data into a memory buffer."
 BufferContent	"Waiting to access a data page in memory."
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 5b0ce383408..4236965e72a 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -101,6 +101,7 @@ PG_LWLOCKTRANCHE(SUBTRANS_BUFFER, SubtransBuffer)
 PG_LWLOCKTRANCHE(MULTIXACTOFFSET_BUFFER, MultiXactOffsetBuffer)
 PG_LWLOCKTRANCHE(MULTIXACTMEMBER_BUFFER, MultiXactMemberBuffer)
 PG_LWLOCKTRANCHE(NOTIFY_BUFFER, NotifyBuffer)
+PG_LWLOCKTRANCHE(NOTIFY_CHANNEL_HASH, NotifyChannelHash)
 PG_LWLOCKTRANCHE(SERIAL_BUFFER, SerialBuffer)
 PG_LWLOCKTRANCHE(WAL_INSERT, WALInsert)
 PG_LWLOCKTRANCHE(BUFFER_CONTENT, BufferContent)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index c751c25a04d..3d371c2808d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -412,6 +412,8 @@ CatalogIdMapEntry
 CatalogIndexState
 ChangeVarNodes_callback
 ChangeVarNodes_context
+ChannelEntry
+ChannelHashKey
 CheckPoint
 CheckPointStmt
 CheckpointStatsData
@@ -1564,6 +1566,7 @@ ListDictionary
 ListParsedLex
 ListenAction
 ListenActionKind
+ListenerEntry
 ListenStmt
 LoInfo
 LoadStmt
-- 
2.50.1

