Hi hackers,
I'd like to propose for PG 20 adding glob-style pattern matching support
for LISTEN channel names, allowing clients to subscribe to multiple
channels with a single LISTEN command using wildcards.
Currently, if an application wants to receive notifications from multiple
related channels (e.g., user_1, user_2, ..., user_N), it must issue a
separate LISTEN command for each channel. This becomes cumbersome for
applications that need to monitor a dynamic or large set of channels with
a common naming convention.
With pattern support, a client could simply do LISTEN "user_*" and receive
notifications from any channel matching that pattern such as user_login,
user_logout, user_123, and so on.
The attached patch adds support for glob-style wildcards in channel
names. The * matches zero or more characters, the ? matches exactly one
character, and \ escapes the next character to match literal * or ?.
For example, LISTEN "events_*" matches events_created, events_deleted, etc.
LISTEN "sensor_?" matches sensor_1 or sensor_A but not sensor_12.
LISTEN "app_*_status" matches app_web_status, app_worker_status, and similar.
LISTEN "literal\*star" matches the literal channel name "literal*star".
This is a proof-of-concept implementation using the simplest approach I could
think of to validate the behavior and gather feedback before investing in
optimizations.
Pattern channels are stored in a backend-local list rather than the
global shared hash table, since they cannot be looked up by exact
channel name. A new hasPatterns flag in QueueBackendStatus tracks
whether a backend is listening on any patterns. When signaling backends,
those with hasPatterns=true are always woken for notifications in their
database, so they can check if any notifications match their patterns
locally. The MatchPattern() function implements a backtracking algorithm
to handle the * wildcard matching.
Performance Considerations:
The current implementation has O(PxM) complexity for pattern matching,
where P is the number of patterns and M is the pattern matching cost.
For each notification, backends with patterns must check all their
patterns against the channel name.
I ran some benchmarks comparing exact channel matching vs pattern matching
(Python script created with Claude help attached). Results on my machine:
Test 1: Exact Channel vs Pattern Channel
----------------------------------------------------------------------
Benchmark Notif/sec
Exact channel (LISTEN "user_123") 13176 (baseline)
Pattern channel (LISTEN "user_*") 13153 (-0.2%)
Test 2: Pattern Count Scaling
----------------------------------------------------------------------
Benchmark Notif/sec
1 pattern(s) 13289 (baseline)
5 pattern(s) 13279 (-0.1%)
10 pattern(s) 13268 (-0.2%)
25 pattern(s) 13301 (+0.1%)
50 pattern(s) 13287 (-0.0%)
100 pattern(s) 13211 (-0.6%)
Test 3: Pattern Complexity
----------------------------------------------------------------------
Benchmark Notif/sec
Prefix pattern (user_*) 13446 (baseline)
Single char pattern (user_?) 13179 (-2.0%)
Infix pattern (prefix_*_suffix) 13469 (+0.2%)
Multi-wildcard (a_*_b_*_c) 13200 (-1.8%)
Test 4: Non-Matching Pattern Overhead
----------------------------------------------------------------------
Benchmark Notif/sec
Exact only (baseline) 13141 (baseline)
Exact + 10 non-matching patterns 13098 (-0.3%)
Exact + 50 non-matching patterns 13130 (-0.1%)
Exact + 100 non-matching patterns 12742 (-3.0%)
The benchmark spawns a listener connection and a notifier connection,
then measures how many notifications per second can be delivered. Each
measurement runs for 15 seconds and is repeated 3 times.
The results don't show a huge degradation but perhaps more comprehensive
benchmarks with higher concurrency and more realistic workloads would be
valuable to better understand the performance behavior.
Future Improvements:
If this idea seems reasonable, the pattern match can be optimized using
a more sophisticated data structure or improve the current patch idea.
One option is to follow the current architecture, keeping patterns
separate from exact-match channels, but finding optimizations to make
pattern matching faster. I'm wondering if we could have a sorted list
and try to do some kind of binary search with patterns, but I dind't
explore this idea further.
Another option is a Trie (prefix tree) [1]. You walk the trie with the
channel name and any node marked as a wildcard indicates a match. For
mid-pattern wildcards like "prefix_*_suffix", when you hit the * node
you search for the remaining suffix in the input. A Trie could
potentially unify exact-match channels and patterns in the same
structure, where exact channels are simply terminal nodes without the
wildcard flag. But memory overhead can be significant for sparse
patterns and also I'm not sure how this would fit using shared memory,
so perhaps a trie for each backend listening on patterns alongside with
the global shared memory for non-pattern channels will be better.
Feedback welcome!
[1] https://en.wikipedia.org/wiki/Trie
--
Matheus Alcantara
EDB: https://www.enterprisedb.com
From e75751ccc1cc5c201320b809d3d99f517dda5074 Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <[email protected]>
Date: Mon, 8 Jun 2026 14:54:34 -0300
Subject: [PATCH v0] Add pattern matching support for LISTEN/NOTIFY
This adds glob-style pattern matching for LISTEN channel names, allowing
clients to listen on multiple channels with a single LISTEN command
using wildcards. The supported wildcards are * (matches zero or more
characters), ? (matches exactly one character), and \ (escapes the next
character to match literal * or ?).
Pattern channels are stored in a backend-local list rather than the
global hash table, since they cannot be looked up by exact name. A new
hasPatterns flag in QueueBackendStatus tracks whether a backend is
listening on any patterns. When signaling backends, those with
hasPatterns set are always woken for notifications in their database so
they can check if any notifications match their patterns locally using
the MatchPattern function.
---
src/backend/commands/async.c | 249 ++++++++++++++++++-
src/test/isolation/expected/async-notify.out | 89 ++++++-
src/test/isolation/specs/async-notify.spec | 53 ++++
src/test/regress/expected/async.out | 21 ++
src/test/regress/sql/async.sql | 27 ++
5 files changed, 436 insertions(+), 3 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index eee8bc29f38..0823db944db 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -292,6 +292,7 @@ typedef struct QueueBackendStatus
QueuePosition pos; /* backend has read queue up to
here */
bool wakeupPending; /* signal sent to backend, not yet
processed */
bool isAdvancing; /* backend is advancing its position */
+ bool hasPatterns; /* backend is listening on pattern
channels */
} QueueBackendStatus;
/*
@@ -365,6 +366,7 @@ const ShmemCallbacks AsyncShmemCallbacks = {
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
#define QUEUE_BACKEND_WAKEUP_PENDING(i)
(asyncQueueControl->backend[i].wakeupPending)
#define QUEUE_BACKEND_IS_ADVANCING(i)
(asyncQueueControl->backend[i].isAdvancing)
+#define QUEUE_BACKEND_HAS_PATTERNS(i)
(asyncQueueControl->backend[i].hasPatterns)
/*
* The SLRU buffer area through which we access the notification queue
@@ -423,7 +425,8 @@ static HTAB *localChannelTable = NULL;
/* We test this condition to detect that we're not listening at all */
#define LocalChannelTableIsEmpty() \
- (localChannelTable == NULL || hash_get_num_entries(localChannelTable)
== 0)
+ ((localChannelTable == NULL || hash_get_num_entries(localChannelTable)
== 0) && \
+ localPatternList == NIL)
/*
* State for pending LISTEN/UNLISTEN actions consists of an ordered list of
@@ -474,6 +477,7 @@ typedef enum
typedef struct PendingListenEntry
{
char channel[NAMEDATALEN]; /* hash key */
+ bool is_pattern; /* channel contains wildcards?
*/
PendingListenAction action; /* which action should we perform? */
} PendingListenEntry;
@@ -557,6 +561,13 @@ static bool unlistenExitRegistered = false;
/* True if we're currently registered as a listener in asyncQueueControl */
static bool amRegisteredListener = false;
+/*
+ * List of pattern channel names. These are stored separately because they
+ * cannot be matched via hash lookup. The list contains palloc'd strings in
+ * TopMemoryContext.
+ */
+static List *localPatternList = NIL;
+
/*
* Queue head positions for direct advancement.
* These are captured during PreCommit_Notify while holding the heavyweight
@@ -602,6 +613,9 @@ static void RemoveListenerFromChannel(GlobalChannelEntry
**entry_ptr,
int
idx);
static void ApplyPendingListenActions(bool isCommit);
static void CleanupListenersOnExit(void);
+static bool IsPattern(const char *channel);
+static bool MatchPattern(const char *channel, const char *pattern);
+static bool IsListeningOnPattern(const char *channel);
static bool IsListeningOn(const char *channel);
static void asyncQueueUnregister(void);
static bool asyncQueueIsFull(void);
@@ -843,6 +857,7 @@ AsyncShmemInit(void *arg)
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
QUEUE_BACKEND_IS_ADVANCING(i) = false;
+ QUEUE_BACKEND_HAS_PATTERNS(i) = false;
}
/*
@@ -1525,6 +1540,9 @@ BecomeRegisteredListener(void)
* an entry in localChannelTable, and pre-allocating an entry in the shared
* globalChannelTable with removeOnAbort set. AtCommit_Notify will clear
* removeOnAbort; abort processing will remove entries still marked so.
+ *
+ * For pattern channels, we add them to the local pattern list instead of the
+ * global channel table, since they cannot be looked up by exact name.
*/
static void
PrepareTableEntriesForListen(const char *channel)
@@ -1534,6 +1552,7 @@ PrepareTableEntriesForListen(const char *channel)
bool found;
ListenerEntry *listeners;
PendingListenEntry *pending;
+ bool is_pattern = IsPattern(channel);
/*
* Record in local pending hash that we want to LISTEN, overwriting any
@@ -1542,6 +1561,7 @@ PrepareTableEntriesForListen(const char *channel)
pending = (PendingListenEntry *)
hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
pending->action = PENDING_LISTEN;
+ pending->is_pattern = is_pattern;
/*
* Ensure that there is an entry for the channel in localChannelTable.
@@ -1556,6 +1576,38 @@ PrepareTableEntriesForListen(const char *channel)
*/
(void) hash_search(localChannelTable, channel, HASH_ENTER, NULL);
+ /*
+ * For pattern channels, add to the local pattern list and set the
+ * hasPatterns flag so we get signaled for all notifications. We don't
add
+ * patterns to globalChannelTable since they can't be matched by exact
+ * channel name lookup.
+ */
+ if (is_pattern)
+ {
+ MemoryContext oldcxt;
+ char *pattern_copy;
+
+ /* Check if we already have this pattern */
+ foreach_ptr(char, existing, localPatternList)
+ {
+ if (strcmp(existing, channel) == 0)
+ return; /* already have it */
+ }
+
+ /* Add pattern to the list in TopMemoryContext so it persists */
+ oldcxt = MemoryContextSwitchTo(TopMemoryContext);
+ pattern_copy = pstrdup(channel);
+ localPatternList = lappend(localPatternList, pattern_copy);
+ MemoryContextSwitchTo(oldcxt);
+
+ /* Set the flag so we get woken up for any notification */
+ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+ QUEUE_BACKEND_HAS_PATTERNS(MyProcNumber) = true;
+ LWLockRelease(NotifyQueueLock);
+
+ return;
+ }
+
/* Pre-allocate entry in shared globalChannelTable */
GlobalChannelKeyInit(&key, MyDatabaseId, channel);
entry = dshash_find_or_insert(globalChannelTable, &key, &found);
@@ -1650,6 +1702,7 @@ PrepareTableEntriesForUnlisten(const char *channel)
pending = (PendingListenEntry *)
hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
pending->action = PENDING_UNLISTEN;
+ pending->is_pattern = IsPattern(channel);
}
/*
@@ -1676,6 +1729,16 @@ PrepareTableEntriesForUnlistenAll(void)
pending = (PendingListenEntry *)
hash_search(pendingListenActions,
channelEntry->channel, HASH_ENTER, NULL);
pending->action = PENDING_UNLISTEN;
+ pending->is_pattern = IsPattern(channelEntry->channel);
+ }
+
+ /* Also unlisten from all pattern channels */
+ foreach_ptr(char, pattern, localPatternList)
+ {
+ pending = (PendingListenEntry *)
+ hash_search(pendingListenActions, pattern, HASH_ENTER,
NULL);
+ pending->action = PENDING_UNLISTEN;
+ pending->is_pattern = true;
}
}
@@ -1738,6 +1801,54 @@ ApplyPendingListenActions(bool isCommit)
bool removeLocal = true;
bool foundListener = false;
+ /*
+ * Handle pattern channels specially - they're not in
+ * globalChannelTable.
+ */
+ if (pending->is_pattern)
+ {
+ if (isCommit && pending->action == PENDING_LISTEN)
+ {
+ /* Pattern LISTEN committed - keep in
localPatternList */
+ removeLocal = false;
+ }
+
+ /*
+ * If removeLocal is true, it means we are either
aborting or
+ * committing an UNLISTEN. In both cases, remove from
the list.
+ */
+ if (removeLocal)
+ {
+ ListCell *lc;
+
+ foreach(lc, localPatternList)
+ {
+ char *pattern = (char *)
lfirst(lc);
+
+ if (strcmp(pattern, pending->channel)
== 0)
+ {
+ localPatternList =
foreach_delete_current(localPatternList, lc);
+ pfree(pattern);
+ break;
+ }
+ }
+
+ /* Update hasPatterns flag if we have no more
patterns */
+ if (localPatternList == NIL)
+ {
+ LWLockAcquire(NotifyQueueLock,
LW_EXCLUSIVE);
+
QUEUE_BACKEND_HAS_PATTERNS(MyProcNumber) = false;
+ LWLockRelease(NotifyQueueLock);
+ }
+ }
+
+ /* Remove from localChannelTable if needed */
+ if (removeLocal && localChannelTable != NULL)
+ (void) hash_search(localChannelTable,
pending->channel,
+ HASH_REMOVE,
NULL);
+ continue;
+ }
+
/*
* Find the global entry for this channel. If isCommit, it had
better
* exist (it was created in PreCommit). In an abort, it might
not
@@ -1855,6 +1966,13 @@ CleanupListenersOnExit(void)
localChannelTable = NULL;
}
+ /* Clear the pattern list */
+ if (localPatternList != NIL)
+ {
+ list_free_deep(localPatternList);
+ localPatternList = NIL;
+ }
+
/* Now remove our entries from the shared globalChannelTable */
if (globalChannelTable == NULL)
return;
@@ -1891,6 +2009,107 @@ CleanupListenersOnExit(void)
dshash_seq_term(&status);
}
+/*
+ * Check if a channel name contains pattern wildcards (* or ?).
+ */
+static bool
+IsPattern(const char *channel)
+{
+ return (strchr(channel, '*') != NULL || strchr(channel, '?') != NULL);
+}
+
+/*
+ * Match a channel name against a glob-style pattern.
+ * Supports:
+ * * - matches zero or more characters
+ * ? - matches exactly one character
+ * \ - escapes the next character (to match literal * or ?)
+ *
+ * Returns true if the channel matches the pattern.
+ */
+static bool
+MatchPattern(const char *channel, const char *pattern)
+{
+ const char *c = channel;
+ const char *p = pattern;
+
+ /* Position in channel and pattern after last * match */
+ const char *c_star = NULL;
+ const char *p_star = NULL;
+
+ while (*c != '\0')
+ {
+ if (*p == '\\' && *(p + 1) != '\0')
+ {
+ /* Escaped character - must match literally */
+ p++;
+ if (*c == *p)
+ {
+ c++;
+ p++;
+ }
+ else if (p_star != NULL)
+ {
+ /* Backtrack to last * */
+ p = p_star + 1;
+ c = ++c_star;
+ }
+ else
+ return false;
+ }
+ else if (*p == '?')
+ {
+ /* ? matches any single character */
+ c++;
+ p++;
+ }
+ else if (*p == '*')
+ {
+ /* Remember position for backtracking */
+ p_star = p++;
+ c_star = c;
+ }
+ else if (*c == *p)
+ {
+ /* Literal match */
+ c++;
+ p++;
+ }
+ else if (p_star != NULL)
+ {
+ /* Mismatch, but we have a * to backtrack to */
+ p = p_star + 1;
+ c = ++c_star;
+ }
+ else
+ {
+ /* Mismatch with no * to backtrack to */
+ return false;
+ }
+ }
+
+ /* Skip trailing *'s in pattern */
+ while (*p == '*')
+ p++;
+
+ return (*p == '\0');
+}
+
+/*
+ * Check if the channel matches any pattern we are listening on.
+ */
+static bool
+IsListeningOnPattern(const char *channel)
+{
+ foreach_ptr(char, pattern, localPatternList)
+ {
+ if (MatchPattern(channel, pattern))
+ return true;
+ }
+
+ return false;
+}
+
/*
* Test whether we are actively listening on the given channel name.
*
@@ -1902,7 +2121,10 @@ IsListeningOn(const char *channel)
if (localChannelTable == NULL)
return false;
- return (hash_search(localChannelTable, channel, HASH_FIND, NULL) !=
NULL);
+ if (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL)
+ return true;
+
+ return IsListeningOnPattern(channel);
}
/*
@@ -1926,6 +2148,7 @@ asyncQueueUnregister(void)
QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
+ QUEUE_BACKEND_HAS_PATTERNS(MyProcNumber) = false;
/* and remove it from the list */
if (QUEUE_FIRST_LISTENER == MyProcNumber)
QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber);
@@ -2337,6 +2560,10 @@ SignalBackends(void)
* flags above). Check to see if we can directly advance their queue
* pointers to save a wakeup. Otherwise, if they are far behind, wake
* them anyway so they will catch up.
+ *
+ * Also signal backends that are listening on pattern channels, since
they
+ * may match our notifications but weren't found in the per-channel
lookup
+ * above.
*/
for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i =
QUEUE_NEXT_LISTENER(i))
{
@@ -2353,6 +2580,24 @@ SignalBackends(void)
pid = QUEUE_BACKEND_PID(i);
pos = QUEUE_BACKEND_POS(i);
+ /*
+ * If this backend has pattern listeners and is in the same
database,
+ * we need to wake it up so it can check if any notifications
match
+ * its patterns.
+ */
+ if (QUEUE_BACKEND_HAS_PATTERNS(i) &&
+ QUEUE_BACKEND_DBOID(i) == MyDatabaseId &&
+ !QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+ {
+ Assert(pid != InvalidPid);
+
+ QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+ signalPids[count] = pid;
+ signalProcnos[count] = i;
+ count++;
+ continue;
+ }
+
/*
* We can directly advance the other backend's queue pointer if
it's
* not currently advancing (else there are race conditions),
and its
diff --git a/src/test/isolation/expected/async-notify.out
b/src/test/isolation/expected/async-notify.out
index 55b7cbc6e02..95980ded634 100644
--- a/src/test/isolation/expected/async-notify.out
+++ b/src/test/isolation/expected/async-notify.out
@@ -1,4 +1,4 @@
-Parsed test spec with 6 sessions
+Parsed test spec with 8 sessions
starting permutation: listenc notify1 notify2 notify3 notifyf
step listenc: LISTEN c1; LISTEN c2;
@@ -238,3 +238,90 @@ nonzero
t
(1 row)
+
+starting permutation: plisten_star pnotify_user_login pnotify_user_logout
pnotify_user_empty pcheck
+step plisten_star: LISTEN "user_*";
+step pnotify_user_login: NOTIFY user_login, 'login payload';
+step pnotify_user_logout: NOTIFY user_logout, 'logout payload';
+step pnotify_user_empty: NOTIFY user_, 'empty suffix';
+step pcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+pattern_listener: NOTIFY "user_login" with payload "login payload" from
pattern_notifier
+pattern_listener: NOTIFY "user_logout" with payload "logout payload" from
pattern_notifier
+pattern_listener: NOTIFY "user_" with payload "empty suffix" from
pattern_notifier
+
+starting permutation: plisten_question pnotify_msg_1 pnotify_msg_a
pnotify_msg_12 pcheck
+step plisten_question: LISTEN "msg_?";
+step pnotify_msg_1: NOTIFY msg_1, 'message 1';
+step pnotify_msg_a: NOTIFY msg_a, 'message a';
+step pnotify_msg_12: NOTIFY msg_12, 'should not match';
+step pcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+pattern_listener: NOTIFY "msg_1" with payload "message 1" from pattern_notifier
+pattern_listener: NOTIFY "msg_a" with payload "message a" from pattern_notifier
+
+starting permutation: plisten_complex pnotify_prefix_mid_suffix
pnotify_prefix_suffix pnotify_prefix_a_b_suffix pcheck
+step plisten_complex: LISTEN "prefix_*_suffix";
+step pnotify_prefix_mid_suffix: NOTIFY prefix_mid_suffix, 'complex match';
+step pnotify_prefix_suffix: NOTIFY prefix_suffix, 'no middle';
+step pnotify_prefix_a_b_suffix: NOTIFY prefix_a_b_suffix, 'multi char middle';
+step pcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+pattern_listener: NOTIFY "prefix_mid_suffix" with payload "complex match" from
pattern_notifier
+pattern_listener: NOTIFY "prefix_a_b_suffix" with payload "multi char middle"
from pattern_notifier
+
+starting permutation: plisten_multi pnotify_event_created pnotify_alert_x
pnotify_nomatch pcheck
+step plisten_multi: LISTEN "event_*"; LISTEN "alert_?";
+step pnotify_event_created: NOTIFY event_created, 'event';
+step pnotify_alert_x: NOTIFY alert_x, 'alert';
+step pnotify_nomatch: NOTIFY completely_different, 'no match';
+step pcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+pattern_listener: NOTIFY "event_created" with payload "event" from
pattern_notifier
+pattern_listener: NOTIFY "alert_x" with payload "alert" from pattern_notifier
+
+starting permutation: plisten_star pnotify_user_login pcheck
+step plisten_star: LISTEN "user_*";
+step pnotify_user_login: NOTIFY user_login, 'login payload';
+step pcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+pattern_listener: NOTIFY "user_login" with payload "login payload" from
pattern_notifier
+
+starting permutation: llisten plisten_star notify1 pnotify_user_login lcheck
pcheck
+step llisten: LISTEN c1; LISTEN c2;
+step plisten_star: LISTEN "user_*";
+step notify1: NOTIFY c1;
+step pnotify_user_login: NOTIFY user_login, 'login payload';
+step lcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+step pcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+pattern_listener: NOTIFY "user_login" with payload "login payload" from
pattern_notifier
diff --git a/src/test/isolation/specs/async-notify.spec
b/src/test/isolation/specs/async-notify.spec
index 7aef2e8d180..ab4cc2449dc 100644
--- a/src/test/isolation/specs/async-notify.spec
+++ b/src/test/isolation/specs/async-notify.spec
@@ -94,6 +94,30 @@ step lsnotify { NOTIFY c1, 'subxact_test'; }
step lsnotify_check { NOTIFY c1, 'should_not_receive'; }
teardown { UNLISTEN *; }
+# Session for pattern listening tests
+session pattern_listener
+step plisten_star { LISTEN "user_*"; }
+step plisten_question { LISTEN "msg_?"; }
+step plisten_complex { LISTEN "prefix_*_suffix"; }
+step plisten_multi { LISTEN "event_*"; LISTEN "alert_?"; }
+step pcheck { SELECT 1 AS x; }
+teardown { UNLISTEN *; }
+
+# Session for sending notifications to pattern listeners
+session pattern_notifier
+step pnotify_user_login { NOTIFY user_login, 'login payload'; }
+step pnotify_user_logout { NOTIFY user_logout, 'logout payload'; }
+step pnotify_user_empty { NOTIFY user_, 'empty suffix'; }
+step pnotify_msg_1 { NOTIFY msg_1, 'message 1'; }
+step pnotify_msg_a { NOTIFY msg_a, 'message a'; }
+step pnotify_msg_12 { NOTIFY msg_12, 'should not match'; }
+step pnotify_prefix_mid_suffix { NOTIFY prefix_mid_suffix, 'complex match'; }
+step pnotify_prefix_suffix { NOTIFY prefix_suffix, 'no middle'; }
+step pnotify_prefix_a_b_suffix { NOTIFY prefix_a_b_suffix, 'multi char
middle'; }
+step pnotify_event_created { NOTIFY event_created, 'event'; }
+step pnotify_alert_x { NOTIFY alert_x, 'alert'; }
+step pnotify_nomatch { NOTIFY completely_different, 'no match'; }
+
# Trivial cases.
permutation listenc notify1 notify2 notify3 notifyf
@@ -145,3 +169,32 @@ permutation lch_listen nch_notify lch_check
# Hence, this should be the last test in this script.
permutation llisten lbegin usage bignotify usage
+
+#
+# Pattern LISTEN/NOTIFY tests
+#
+
+# Basic pattern with * (zero or more characters)
+# Should receive: user_login, user_logout, user_ (empty suffix matches *)
+permutation plisten_star pnotify_user_login pnotify_user_logout
pnotify_user_empty pcheck
+
+# Pattern with ? (exactly one character)
+# Should receive: msg_1, msg_a
+# Should NOT receive: msg_12 (two chars after _)
+permutation plisten_question pnotify_msg_1 pnotify_msg_a pnotify_msg_12 pcheck
+
+# Complex pattern with * in the middle
+# Should receive: prefix_mid_suffix, prefix_a_b_suffix
+# Should NOT receive: prefix_suffix (requires at least one char for *)
+permutation plisten_complex pnotify_prefix_mid_suffix pnotify_prefix_suffix
pnotify_prefix_a_b_suffix pcheck
+
+# Multiple patterns on same session
+# Should receive: event_created (matches event_*), alert_x (matches alert_?)
+# Should NOT receive: completely_different
+permutation plisten_multi pnotify_event_created pnotify_alert_x
pnotify_nomatch pcheck
+
+# Cross-session pattern test: listener listens before notifier sends
+permutation plisten_star pnotify_user_login pcheck
+
+# Pattern listener with regular listener (mixed)
+permutation llisten plisten_star notify1 pnotify_user_login lcheck pcheck
diff --git a/src/test/regress/expected/async.out
b/src/test/regress/expected/async.out
index 19cbe38e636..c5236c15fb9 100644
--- a/src/test/regress/expected/async.out
+++ b/src/test/regress/expected/async.out
@@ -40,3 +40,24 @@ SELECT pg_notification_queue_usage();
0
(1 row)
+--
+-- Pattern LISTEN tests
+--
+-- Pattern with * (matches zero or more characters)
+LISTEN "user_*";
+UNLISTEN "user_*";
+-- Pattern with ? (matches exactly one character)
+LISTEN "msg_?";
+UNLISTEN "msg_?";
+-- Pattern with wildcards in the middle
+LISTEN "prefix_*_suffix";
+UNLISTEN "prefix_*_suffix";
+-- Escaped wildcards (for literal * and ? matching)
+LISTEN "literal\*star";
+LISTEN "literal\?question";
+UNLISTEN "literal\*star";
+UNLISTEN "literal\?question";
+-- Multiple patterns
+LISTEN "event_*";
+LISTEN "alert_?";
+UNLISTEN *;
diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql
index 40f6e015387..ca666c3019e 100644
--- a/src/test/regress/sql/async.sql
+++ b/src/test/regress/sql/async.sql
@@ -21,3 +21,30 @@ UNLISTEN *;
-- Should return zero while there are no pending notifications.
-- src/test/isolation/specs/async-notify.spec tests for actual usage.
SELECT pg_notification_queue_usage();
+
+--
+-- Pattern LISTEN tests
+--
+
+-- Pattern with * (matches zero or more characters)
+LISTEN "user_*";
+UNLISTEN "user_*";
+
+-- Pattern with ? (matches exactly one character)
+LISTEN "msg_?";
+UNLISTEN "msg_?";
+
+-- Pattern with wildcards in the middle
+LISTEN "prefix_*_suffix";
+UNLISTEN "prefix_*_suffix";
+
+-- Escaped wildcards (for literal * and ? matching)
+LISTEN "literal\*star";
+LISTEN "literal\?question";
+UNLISTEN "literal\*star";
+UNLISTEN "literal\?question";
+
+-- Multiple patterns
+LISTEN "event_*";
+LISTEN "alert_?";
+UNLISTEN *;
--
2.50.1 (Apple Git-155)
#!/usr/bin/env python3
"""
Benchmark script for LISTEN/NOTIFY pattern matching feature.
Tests:
1. Exact channel vs pattern channel overhead - exact_vs_pattern
2. Pattern count scaling (how many patterns a backend listens on) - scaling
3. Pattern complexity (prefix vs infix patterns) - complexity
4. Mixed workload (some exact, some pattern listeners) - non_matching
Test names:
1. exact_vs_pattern
2. scaling
3. complexity
4. non_matching
Usage:
./bench_pattern_notify.py [--dsn DSN] [--duration SECONDS] [--warmup
SECONDS] [--test TEST_NAME]
Requirements:
pip install psycopg
"""
import argparse
import select
import statistics
import sys
import time
from dataclasses import dataclass
from typing import Callable
import psycopg
@dataclass
class BenchmarkResult:
name: str
notifications_per_sec: float
total_notifications: int
duration: float
stddev: float = 0.0
def wait_for_notify(conn, timeout: float = 1.0) -> int:
"""Wait for notifications and return count received."""
count = 0
gen = conn.notifies(timeout=timeout)
for notify in gen:
count += 1
# Stop after receiving one batch
break
return count
def run_benchmark(
dsn: str,
name: str,
setup_listener: Callable,
channel_name: str,
duration: float = 5.0,
warmup: float = 1.0,
runs: int = 3,
) -> BenchmarkResult:
"""
Run a benchmark measuring notification throughput.
Args:
dsn: PostgreSQL connection string
name: Benchmark name for reporting
setup_listener: Function that takes a connection and sets up LISTEN
channel_name: Channel name to NOTIFY
duration: How long to run each measurement (seconds)
warmup: Warmup period before measuring (seconds)
runs: Number of runs to average
"""
results = []
for run in range(runs):
# Create listener and notifier connections
listener_conn = psycopg.connect(dsn, autocommit=True)
notifier_conn = psycopg.connect(dsn, autocommit=True)
# Setup listener
setup_listener(listener_conn)
# Warmup
warmup_end = time.monotonic() + warmup
while time.monotonic() < warmup_end:
notifier_conn.execute(f"NOTIFY {channel_name}, 'warmup'")
wait_for_notify(listener_conn, timeout=0.1)
# Drain any pending notifications
listener_conn.execute("SELECT 1")
time.sleep(0.1)
# Benchmark
total_notifications = 0
start_time = time.monotonic()
end_time = start_time + duration
while time.monotonic() < end_time:
notifier_conn.execute(f"NOTIFY {channel_name}, 'bench'")
received = wait_for_notify(listener_conn, timeout=0.1)
total_notifications += received
elapsed = time.monotonic() - start_time
results.append(total_notifications / elapsed)
listener_conn.close()
notifier_conn.close()
return BenchmarkResult(
name=name,
notifications_per_sec=statistics.mean(results),
total_notifications=int(statistics.mean(results) * duration),
duration=duration,
stddev=statistics.stdev(results) if len(results) > 1 else 0.0,
)
def bench_exact_vs_pattern(dsn: str, duration: float, warmup: float) ->
list[BenchmarkResult]:
"""Test 1: Compare exact channel match vs pattern match."""
results = []
# Exact match
def setup_exact(conn):
conn.execute("UNLISTEN *")
conn.execute('LISTEN "user_123"')
results.append(
run_benchmark(
dsn=dsn,
name='Exact channel (LISTEN "user_123")',
setup_listener=setup_exact,
channel_name="user_123",
duration=duration,
warmup=warmup,
)
)
# Pattern match - simple prefix
def setup_pattern(conn):
conn.execute("UNLISTEN *")
conn.execute('LISTEN "user_*"')
results.append(
run_benchmark(
dsn=dsn,
name='Pattern channel (LISTEN "user_*")',
setup_listener=setup_pattern,
channel_name="user_123",
duration=duration,
warmup=warmup,
)
)
return results
def bench_pattern_count_scaling(dsn: str, duration: float, warmup: float) ->
list[BenchmarkResult]:
"""Test 2: How does performance scale with number of patterns?"""
results = []
for num_patterns in [1, 5, 10, 25, 50, 100]:
def setup_patterns(conn, n=num_patterns):
conn.execute("UNLISTEN *")
# Listen on multiple patterns, only one will match
for i in range(n):
if i == 0:
conn.execute('LISTEN "target_*"') # This one matches
else:
conn.execute(f'LISTEN "other{i}_*"') # These don't match
results.append(
run_benchmark(
dsn=dsn,
name=f"{num_patterns} pattern(s)",
setup_listener=setup_patterns,
channel_name="target_123",
duration=duration,
warmup=warmup,
)
)
return results
def bench_pattern_complexity(dsn: str, duration: float, warmup: float) ->
list[BenchmarkResult]:
"""Test 3: Compare different pattern complexities."""
results = []
# Simple prefix pattern: user_*
def setup_prefix(conn):
conn.execute("UNLISTEN *")
conn.execute('LISTEN "user_*"')
results.append(
run_benchmark(
dsn=dsn,
name="Prefix pattern (user_*)",
setup_listener=setup_prefix,
channel_name="user_123_extra_stuff",
duration=duration,
warmup=warmup,
)
)
# Single char wildcard: user_?
def setup_single(conn):
conn.execute("UNLISTEN *")
conn.execute('LISTEN "user_?"')
results.append(
run_benchmark(
dsn=dsn,
name="Single char pattern (user_?)",
setup_listener=setup_single,
channel_name="user_X",
duration=duration,
warmup=warmup,
)
)
# Infix pattern: prefix_*_suffix
def setup_infix(conn):
conn.execute("UNLISTEN *")
conn.execute('LISTEN "prefix_*_suffix"')
results.append(
run_benchmark(
dsn=dsn,
name="Infix pattern (prefix_*_suffix)",
setup_listener=setup_infix,
channel_name="prefix_middle_part_suffix",
duration=duration,
warmup=warmup,
)
)
# Multiple wildcards: a_*_b_*_c
def setup_multi(conn):
conn.execute("UNLISTEN *")
conn.execute('LISTEN "a_*_b_*_c"')
results.append(
run_benchmark(
dsn=dsn,
name="Multi-wildcard (a_*_b_*_c)",
setup_listener=setup_multi,
channel_name="a_xxx_b_yyy_c",
duration=duration,
warmup=warmup,
)
)
return results
def bench_non_matching_patterns(dsn: str, duration: float, warmup: float) ->
list[BenchmarkResult]:
"""Test 4: Overhead of checking patterns that don't match."""
results = []
# Baseline: exact match, no patterns
def setup_exact(conn):
conn.execute("UNLISTEN *")
conn.execute('LISTEN "channel"')
results.append(
run_benchmark(
dsn=dsn,
name="Exact only (baseline)",
setup_listener=setup_exact,
channel_name="channel",
duration=duration,
warmup=warmup,
)
)
# Exact match + non-matching patterns
for num_patterns in [10, 50, 100]:
def setup_mixed(conn, n=num_patterns):
conn.execute("UNLISTEN *")
conn.execute('LISTEN "channel"') # Exact match for the notify
for i in range(n):
conn.execute(f'LISTEN "nomatch{i}_*"') # Patterns that won't
match
results.append(
run_benchmark(
dsn=dsn,
name=f"Exact + {num_patterns} non-matching patterns",
setup_listener=setup_mixed,
channel_name="channel",
duration=duration,
warmup=warmup,
)
)
return results
def print_results(title: str, results: list[BenchmarkResult]):
"""Print benchmark results in a formatted table."""
print(f"\n{'=' * 70}")
print(f" {title}")
print("=" * 70)
print(f"{'Benchmark':<45} {'Notif/sec':>12} {'StdDev':>10}")
print("-" * 70)
baseline = results[0].notifications_per_sec if results else 0
for r in results:
pct = ((r.notifications_per_sec / baseline) - 1) * 100 if baseline else 0
pct_str = f"({pct:+.1f}%)" if r != results[0] else "(baseline)"
print(f"{r.name:<45} {r.notifications_per_sec:>10.0f} {pct_str:>10}")
print()
def main():
parser = argparse.ArgumentParser(description="Benchmark LISTEN/NOTIFY
pattern matching performance")
parser.add_argument(
"--dsn",
default="dbname=postgres",
help="PostgreSQL connection string (default: dbname=postgres)",
)
parser.add_argument(
"--duration",
type=float,
default=5.0,
help="Duration of each benchmark run in seconds (default: 5)",
)
parser.add_argument(
"--warmup",
type=float,
default=1.0,
help="Warmup period in seconds (default: 1)",
)
parser.add_argument(
"--test",
choices=["all", "exact_vs_pattern", "scaling", "complexity",
"non_matching", "throughput"],
default="all",
help="Which test to run (default: all)",
)
args = parser.parse_args()
print(f"LISTEN/NOTIFY Pattern Matching Benchmark")
print(f"DSN: {args.dsn}")
print(f"Duration: {args.duration}s per run, Warmup: {args.warmup}s")
# Verify connection
try:
conn = psycopg.connect(args.dsn)
conn.close()
except Exception as e:
print(f"Error connecting to database: {e}", file=sys.stderr)
sys.exit(1)
if args.test in ("all", "exact_vs_pattern"):
results = bench_exact_vs_pattern(args.dsn, args.duration, args.warmup)
print_results("Test 1: Exact Channel vs Pattern Channel", results)
if args.test in ("all", "scaling"):
results = bench_pattern_count_scaling(args.dsn, args.duration,
args.warmup)
print_results("Test 2: Pattern Count Scaling", results)
if args.test in ("all", "complexity"):
results = bench_pattern_complexity(args.dsn, args.duration, args.warmup)
print_results("Test 3: Pattern Complexity", results)
if args.test in ("all", "non_matching"):
results = bench_non_matching_patterns(args.dsn, args.duration,
args.warmup)
print_results("Test 4: Non-Matching Pattern Overhead", results)
if __name__ == "__main__":
main()