On Mon Jun 8, 2026 at 7:16 PM -03, Tom Lane wrote:
> "Matheus Alcantara" <[email protected]> writes:
>> I'd like to propose for PG 20 adding glob-style pattern matching support
>> for LISTEN channel names, allowing clients to subscribe to multiple
>> channels with a single LISTEN command using wildcards.
>
> I'm fairly skeptical that this is worth supporting at the server-code
> level.  You can already accomplish equivalent things by listening on
> a single shared channel and pattern-matching against identifiers
> provided in the message payloads.  Yes, doing that in C would reduce
> the cost of skipping uninteresting messages, but is that enough of a
> gain to justify adding this to our support workload?
>

Fair point. I think that the main appeal is reducing the need to
maintain dynamic LISTEN lists. For example, a multi-tenant app using
different schemas may want to monitor all tenants and could LISTEN
"tenant_*" instead of issuing separate LISTEN commands as tenants are
created and deleted. Similarly, an app using hierarchical channel names
like orders_created, orders_updated, orders_shipped could have a
reporting service LISTEN "orders_*" rather than explicitly listing each
event type. These are some use cases that I've already miss in the past.

That said, I'm also attracted to the idea of filtering on payload
content, which could achieve the same goal. I'd be interested in
thoughts on whether payload filtering would be a better direction.

> The proposed implementation doesn't seem that attractive anyway, since
> it gives up all the benefit of the targeted wakeups introduced in v19.
> If you consider a workload with some high-volume channels that are not
> interesting to the wild-card-using listener, it's not hard to believe
> that this is a net loss compared to the already-usable way.
>

The new attached patch aims to preserves targeted wakeups. Patterns are
stored in two shared memory structures: the globalChannelTable (keyed by
the pattern string itself, storing the listener list just like exact
channels) and a globalPatterns list (an enumerable array of all
registered patterns). When SignalBackends() processes a notification, it
first does an exact match lookup, then iterates globalPatterns testing
each pattern against the notification channel. For matching patterns, it
looks up listeners in globalChannelTable using the pattern as key and
wakes only those backends. A backend listening on "orders_*" won't be
woken for notifications to "user_123".

Of course we may have performance issues if the listen of patterns is
too big, but we may try to apply some optimizations and may try to use
another data structure instead of a list, but I think that the idea
would remain the same.

I've added comments throughout the code explaining the architecture,
particularly in the file header, around GlobalPatternList,
PrepareTableEntriesForListen, and SignalBackends. There are still some
XXX comments marking areas that need more thought, but I believe the
overall architecture remains the same. I'm mainly looking for feedback
on whether this approach makes sense before polishing the remaining
details.

FWIW I'm also studying the previous proposals, still thinking about the
issues already discussed on these.

Thanks.

--
Matheus Alcantara
EDB: https://www.enterprisedb.com
From 01a08074d02e2d39b1bd0778b1345ebc3f996d7f Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <[email protected]>
Date: Mon, 8 Jun 2026 14:54:34 -0300
Subject: [PATCH v1] Add pattern matching support for LISTEN/NOTIFY channels

This adds glob-style pattern matching for LISTEN channel names, allowing
clients to subscribe to multiple channels with a single LISTEN command
using wildcards. The supported wildcards are:
  * - matches zero or more characters
  ? - matches exactly one character
  \ - escapes the next character to match literal * or ?

Example: LISTEN "user_*" will receive notifications sent to channels
like "user_123", "user_alice", etc.

To support targeted wakeups for pattern listeners, patterns are stored
in two shared memory structures:

1. globalChannelTable - keyed by the pattern string itself, storing the
   list of backends listening on that pattern. This allows looking up
   listeners once we know which pattern matched.

2. globalPatterns list - an enumerable array of all registered patterns.
   SignalBackends() iterates this list, matching each pattern against
   the notification channel. For matching patterns, it looks up the
   entry in globalChannelTable to find and wake only those listeners.
---
 src/backend/commands/async.c                 | 564 +++++++++++++++++--
 src/test/isolation/expected/async-notify.out |  89 ++-
 src/test/isolation/specs/async-notify.spec   |  53 ++
 src/test/regress/expected/async.out          |  21 +
 src/test/regress/sql/async.sql               |  27 +
 src/tools/pgindent/typedefs.list             |   1 +
 6 files changed, 711 insertions(+), 44 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index eee8bc29f38..f92a6a99290 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -111,6 +111,22 @@
  *       tracking which channels that backend is listening to.  The local table
  *       serves to reduce the number of accesses needed to the shared table.
  *
+ *       Pattern channels (those containing wildcards like * or ?) are handled
+ *       specially.  A pattern cannot be looked up by exact name match when a
+ *       notification arrives, since we need to match the notification channel
+ *       against the pattern, not the other way around.  To support targeted
+ *       wakeups for pattern listeners, we maintain two structures:
+ *
+ *       (a) The pattern is stored in globalChannelTable keyed by the pattern
+ *               string itself.  This allows us to find the list of backends
+ *               listening on a specific pattern once we know which pattern 
matched.
+ *
+ *       (b) A global pattern list (asyncQueueControl->globalPatterns) holds 
all
+ *               registered patterns.  SignalBackends() iterates this list, 
matching
+ *               each pattern against the notification channel.  When a pattern
+ *               matches, we look up its entry in globalChannelTable to find 
and
+ *               wake the listeners.
+ *
  *       If the current transaction has executed any LISTEN/UNLISTEN actions,
  *       PreCommit_Notify() prepares to commit those.  For LISTEN, it
  *       pre-allocates entries in both the per-backend localChannelTable and 
the
@@ -292,8 +308,36 @@ typedef struct QueueBackendStatus
        QueuePosition pos;                      /* backend has read queue up to 
here */
        bool            wakeupPending;  /* signal sent to backend, not yet 
processed */
        bool            isAdvancing;    /* backend is advancing its position */
+       bool            hasPatterns;    /* backend is listening on pattern 
channels */
 } QueueBackendStatus;
 
+#define INITIAL_PATTERNS_ARRAY_SIZE 8
+
+/*
+ * Global pattern list definitions
+ *
+ * This structure holds all registered pattern channels in shared memory.
+ * It exists because patterns cannot be looked up by exact match in the
+ * globalChannelTable when a notification arrives -- we need to iterate
+ * through all patterns and test each one against the notification channel.
+ *
+ * Each pattern is stored as a GlobalChannelKey, which can be used directly
+ * to look up the corresponding GlobalChannelEntry in globalChannelTable
+ * (where the pattern string is stored as the "channel" field of the key).
+ *
+ * This allows SignalBackends() to:
+ * 1. Iterate through all patterns in the list
+ * 2. Match each pattern against the notification channel being notified
+ * 3. For matching patterns, use the key to find listeners in 
globalChannelTable
+ * 4. Wake only those backends whose patterns actually matched
+ */
+typedef struct GlobalPatternList
+{
+       dsa_pointer patternsArray;      /* DSA pointer to GlobalChannelKey 
array */
+       int                     numPatterns;    /* Number of patterns currently 
stored */
+       int                     allocatedPatterns;      /* Allocated size of 
array */
+} GlobalPatternList;
+
 /*
  * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
  *
@@ -340,6 +384,7 @@ typedef struct AsyncQueueControl
        TimestampTz lastQueueFillWarn;  /* time of last queue-full msg */
        dsa_handle      globalChannelTableDSA;  /* global channel table's DSA 
handle */
        dshash_table_handle globalChannelTableDSH;      /* and its dshash 
handle */
+       GlobalPatternList globalPatterns;       /* shared list of pattern 
channels */
        /* Array with room for MaxBackends entries: */
        QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
 } AsyncQueueControl;
@@ -365,6 +410,7 @@ const ShmemCallbacks AsyncShmemCallbacks = {
 #define QUEUE_BACKEND_POS(i)           (asyncQueueControl->backend[i].pos)
 #define QUEUE_BACKEND_WAKEUP_PENDING(i)        
(asyncQueueControl->backend[i].wakeupPending)
 #define QUEUE_BACKEND_IS_ADVANCING(i)  
(asyncQueueControl->backend[i].isAdvancing)
+#define QUEUE_BACKEND_HAS_PATTERNS(i)  
(asyncQueueControl->backend[i].hasPatterns)
 
 /*
  * The SLRU buffer area through which we access the notification queue
@@ -423,7 +469,8 @@ static HTAB *localChannelTable = NULL;
 
 /* We test this condition to detect that we're not listening at all */
 #define LocalChannelTableIsEmpty() \
-       (localChannelTable == NULL || hash_get_num_entries(localChannelTable) 
== 0)
+       ((localChannelTable == NULL || hash_get_num_entries(localChannelTable) 
== 0) && \
+        localPatternList == NIL)
 
 /*
  * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
@@ -474,6 +521,7 @@ typedef enum
 typedef struct PendingListenEntry
 {
        char            channel[NAMEDATALEN];   /* hash key */
+       bool            is_pattern;             /* channel contains wildcards? 
*/
        PendingListenAction action; /* which action should we perform? */
 } PendingListenEntry;
 
@@ -557,6 +605,13 @@ static bool unlistenExitRegistered = false;
 /* True if we're currently registered as a listener in asyncQueueControl */
 static bool amRegisteredListener = false;
 
+/*
+ * List of pattern channel names. These are stored separately because they
+ * cannot be matched via hash lookup. The list contains palloc'd strings in
+ * TopMemoryContext.
+ */
+static List *localPatternList = NIL;
+
 /*
  * Queue head positions for direct advancement.
  * These are captured during PreCommit_Notify while holding the heavyweight
@@ -599,9 +654,15 @@ static void PrepareTableEntriesForUnlisten(const char 
*channel);
 static void PrepareTableEntriesForUnlistenAll(void);
 static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr,
                                                                          
ListenerEntry *listeners,
-                                                                         int 
idx);
+                                                                         int 
idx,
+                                                                         bool 
is_pattern);
 static void ApplyPendingListenActions(bool isCommit);
 static void CleanupListenersOnExit(void);
+static bool IsPattern(const char *channel);
+static bool MatchPattern(const char *channel, const char *pattern);
+static bool IsListeningOnPattern(const char *channel);
+static void AddPatternToGlobalList(const char *pattern);
+static void RemovePatternFromGlobalList(const char *pattern);
 static bool IsListeningOn(const char *channel);
 static void asyncQueueUnregister(void);
 static bool asyncQueueIsFull(void);
@@ -611,6 +672,7 @@ static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
 static double asyncQueueUsage(void);
 static void asyncQueueFillWarning(void);
 static void SignalBackends(void);
+static void SignalListenersForEntry(GlobalChannelEntry *entry, int *count);
 static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(QueuePosition *current,
                                                                                
 QueuePosition stop,
@@ -843,6 +905,7 @@ AsyncShmemInit(void *arg)
                SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
                QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
                QUEUE_BACKEND_IS_ADVANCING(i) = false;
+               QUEUE_BACKEND_HAS_PATTERNS(i) = false;
        }
 
        /*
@@ -1525,6 +1588,9 @@ BecomeRegisteredListener(void)
  * an entry in localChannelTable, and pre-allocating an entry in the shared
  * globalChannelTable with removeOnAbort set.  AtCommit_Notify will clear
  * removeOnAbort; abort processing will remove entries still marked so.
+ *
+ * For pattern channels, we add them to the local pattern list instead of the
+ * global channel table, since they cannot be looked up by exact name.
  */
 static void
 PrepareTableEntriesForListen(const char *channel)
@@ -1534,6 +1600,7 @@ PrepareTableEntriesForListen(const char *channel)
        bool            found;
        ListenerEntry *listeners;
        PendingListenEntry *pending;
+       bool            is_pattern = IsPattern(channel);
 
        /*
         * Record in local pending hash that we want to LISTEN, overwriting any
@@ -1542,6 +1609,7 @@ PrepareTableEntriesForListen(const char *channel)
        pending = (PendingListenEntry *)
                hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
        pending->action = PENDING_LISTEN;
+       pending->is_pattern = is_pattern;
 
        /*
         * Ensure that there is an entry for the channel in localChannelTable.
@@ -1556,6 +1624,55 @@ PrepareTableEntriesForListen(const char *channel)
         */
        (void) hash_search(localChannelTable, channel, HASH_ENTER, NULL);
 
+       /*
+        * For pattern channels, add to the local pattern list. We also add
+        * patterns to globalChannelTable (keyed by the pattern string itself) 
so
+        * that once SignalBackends() finds a matching pattern, it can look up 
the
+        * listeners for that pattern by using the pattern string as the key.
+        *
+        * Patterns are stored in two places in shared memory:
+        * globalChannelTable: keyed by the pattern string, stores the listener
+        * list (same as exact channels).
+        * globalPatterns list: allows SignalBackends() to iterate all patterns 
and
+        * match them against notification channels being notified.
+        */
+       if (is_pattern)
+       {
+               MemoryContext oldcxt;
+               char       *pattern_copy;
+               bool            exists = false;
+
+               /* Check if we already have this pattern locally */
+               foreach_ptr(char, existing, localPatternList)
+               {
+                       if (strcmp(existing, channel) == 0)
+                       {
+                               exists = true;
+                               break;
+                       }
+               }
+
+               if (!exists)
+               {
+                       /*
+                        * Add pattern to the list in TopMemoryContext so it 
persists
+                        *
+                        * XXX: Should it really use TopMemoryContext?
+                        */
+                       oldcxt = MemoryContextSwitchTo(TopMemoryContext);
+                       pattern_copy = pstrdup(channel);
+                       localPatternList = lappend(localPatternList, 
pattern_copy);
+                       MemoryContextSwitchTo(oldcxt);
+
+                       /* Set the flag so we know we have patterns */
+                       LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+                       QUEUE_BACKEND_HAS_PATTERNS(MyProcNumber) = true;
+                       LWLockRelease(NotifyQueueLock);
+               }
+
+               /* Fall through to also add to globalChannelTable */
+       }
+
        /* Pre-allocate entry in shared globalChannelTable */
        GlobalChannelKeyInit(&key, MyDatabaseId, channel);
        entry = dshash_find_or_insert(globalChannelTable, &key, &found);
@@ -1618,6 +1735,27 @@ PrepareTableEntriesForListen(const char *channel)
        entry->numListeners++;
 
        dshash_release_lock(globalChannelTable, entry);
+
+       /*
+        * If this is a pattern, also add it to the global pattern list.
+        *
+        * The pattern is already stored in globalChannelTable (keyed by the
+        * pattern string) where the listener list is kept. The global pattern
+        * list provides an enumerable collection of all patterns so that
+        * SignalBackends() can iterate through them and match against
+        * notification channels.
+        */
+       if (is_pattern)
+       {
+               /*
+                * XXX: At this point we should only allocate room for the 
entry on
+                * the global pattern list, otherwise it will be visible for 
other
+                * transactions before the running transaction commit.
+                */
+               LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+               AddPatternToGlobalList(channel);
+               LWLockRelease(NotifyQueueLock);
+       }
 }
 
 /*
@@ -1650,6 +1788,7 @@ PrepareTableEntriesForUnlisten(const char *channel)
        pending = (PendingListenEntry *)
                hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
        pending->action = PENDING_UNLISTEN;
+       pending->is_pattern = IsPattern(channel);
 }
 
 /*
@@ -1676,6 +1815,16 @@ PrepareTableEntriesForUnlistenAll(void)
                pending = (PendingListenEntry *)
                        hash_search(pendingListenActions, 
channelEntry->channel, HASH_ENTER, NULL);
                pending->action = PENDING_UNLISTEN;
+               pending->is_pattern = IsPattern(channelEntry->channel);
+       }
+
+       /* Also unlisten from all pattern channels */
+       foreach_ptr(char, pattern, localPatternList)
+       {
+               pending = (PendingListenEntry *)
+                       hash_search(pendingListenActions, pattern, HASH_ENTER, 
NULL);
+               pending->action = PENDING_UNLISTEN;
+               pending->is_pattern = true;
        }
 }
 
@@ -1685,13 +1834,17 @@ PrepareTableEntriesForUnlistenAll(void)
  * Decrements numListeners, compacts the array, and frees the entry if empty.
  * Sets *entry_ptr to NULL if the entry was deleted.
  *
+ * If is_pattern is true and the entry becomes empty, also removes the pattern
+ * from the global pattern list.
+ *
  * We could get the listeners pointer from the entry, but all callers
  * already have it at hand.
  */
 static void
 RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr,
                                                  ListenerEntry *listeners,
-                                                 int idx)
+                                                 int idx,
+                                                 bool is_pattern)
 {
        GlobalChannelEntry *entry = *entry_ptr;
 
@@ -1702,6 +1855,17 @@ RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr,
 
        if (entry->numListeners == 0)
        {
+               /*
+                * If this was a pattern entry, remove it from the global 
pattern list
+                * before deleting the channel entry.
+                */
+               if (is_pattern)
+               {
+                       LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+                       RemovePatternFromGlobalList(entry->key.channel);
+                       LWLockRelease(NotifyQueueLock);
+               }
+
                dsa_free(globalChannelDSA, entry->listenersArray);
                dshash_delete_entry(globalChannelTable, entry);
                /* tells caller not to release the entry's lock: */
@@ -1738,6 +1902,53 @@ ApplyPendingListenActions(bool isCommit)
                bool            removeLocal = true;
                bool            foundListener = false;
 
+               /*
+                * Handle pattern channels' local state. Patterns are also in
+                * globalChannelTable (keyed by the pattern string), so we 
still need
+                * to fall through to the globalChannelTable logic below.
+                */
+               if (pending->is_pattern)
+               {
+                       bool            removeFromLocal = true;
+
+                       if (isCommit && pending->action == PENDING_LISTEN)
+                       {
+                               /* Pattern LISTEN committed - keep in 
localPatternList */
+                               removeFromLocal = false;
+                       }
+
+                       /*
+                        * If removeFromLocal is true, it means we are either 
aborting or
+                        * committing an UNLISTEN. In both cases, remove from 
local list.
+                        */
+                       if (removeFromLocal)
+                       {
+                               ListCell   *lc;
+
+                               foreach(lc, localPatternList)
+                               {
+                                       char       *pattern = (char *) 
lfirst(lc);
+
+                                       if (strcmp(pattern, pending->channel) 
== 0)
+                                       {
+                                               localPatternList = 
foreach_delete_current(localPatternList, lc);
+                                               pfree(pattern);
+                                               break;
+                                       }
+                               }
+
+                               /* Update hasPatterns flag if we have no more 
patterns */
+                               if (localPatternList == NIL)
+                               {
+                                       LWLockAcquire(NotifyQueueLock, 
LW_EXCLUSIVE);
+                                       
QUEUE_BACKEND_HAS_PATTERNS(MyProcNumber) = false;
+                                       LWLockRelease(NotifyQueueLock);
+                               }
+                       }
+
+                       /* Fall through to handle globalChannelTable entry */
+               }
+
                /*
                 * Find the global entry for this channel.  If isCommit, it had 
better
                 * exist (it was created in PreCommit).  In an abort, it might 
not
@@ -1777,7 +1988,8 @@ ApplyPendingListenActions(bool isCommit)
                                                 * UNLISTEN being committed: 
remove pre-allocated
                                                 * entries from both tables.
                                                 */
-                                               
RemoveListenerFromChannel(&entry, listeners, i);
+                                               
RemoveListenerFromChannel(&entry, listeners, i,
+                                                                               
                  pending->is_pattern);
                                        }
                                }
                                else
@@ -1794,7 +2006,8 @@ ApplyPendingListenActions(bool isCommit)
                                                 * Staged LISTEN (or 
LISTEN+UNLISTEN) being aborted,
                                                 * so remove pre-allocated 
entries from both tables.
                                                 */
-                                               
RemoveListenerFromChannel(&entry, listeners, i);
+                                               
RemoveListenerFromChannel(&entry, listeners, i,
+                                                                               
                  pending->is_pattern);
                                        }
                                        else
                                        {
@@ -1855,18 +2068,31 @@ CleanupListenersOnExit(void)
                localChannelTable = NULL;
        }
 
+       /* Clear the pattern list */
+       if (localPatternList != NIL)
+       {
+               list_free_deep(localPatternList);
+               localPatternList = NIL;
+       }
+
        /* Now remove our entries from the shared globalChannelTable */
        if (globalChannelTable == NULL)
                return;
 
+       /* XXX: Only acquire the lock if is_pattern=true. */
+       LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
        dshash_seq_init(&status, globalChannelTable, true);
        while ((entry = dshash_seq_next(&status)) != NULL)
        {
                ListenerEntry *listeners;
+               bool            is_pattern;
 
                if (entry->key.dboid != MyDatabaseId)
                        continue;                       /* not relevant */
 
+               is_pattern = IsPattern(entry->key.channel);
+
                listeners = (ListenerEntry *)
                        dsa_get_address(globalChannelDSA, 
entry->listenersArray);
 
@@ -1881,6 +2107,10 @@ CleanupListenersOnExit(void)
 
                                if (entry->numListeners == 0)
                                {
+                                       /* Remove from global pattern list if 
this was a pattern */
+                                       if (is_pattern)
+                                               
RemovePatternFromGlobalList(entry->key.channel);
+
                                        dsa_free(globalChannelDSA, 
entry->listenersArray);
                                        dshash_delete_current(&status);
                                }
@@ -1889,6 +2119,196 @@ CleanupListenersOnExit(void)
                }
        }
        dshash_seq_term(&status);
+
+       LWLockRelease(NotifyQueueLock);
+}
+
+/*
+ * Check if a channel name contains pattern wildcards (* or ?).
+ */
+static bool
+IsPattern(const char *channel)
+{
+       return (strchr(channel, '*') != NULL || strchr(channel, '?') != NULL);
+}
+
+/*
+ * Match a channel name against a glob-style pattern.
+ * Supports:
+ *   * - matches zero or more characters
+ *   ? - matches exactly one character
+ *   \ - escapes the next character (to match literal * or ?)
+ *
+ * Returns true if the channel matches the pattern.
+ */
+static bool
+MatchPattern(const char *channel, const char *pattern)
+{
+       const char *c = channel;
+       const char *p = pattern;
+
+       /* Position in channel and pattern after last * match */
+       const char *c_star = NULL;
+       const char *p_star = NULL;
+
+       while (*c != '\0')
+       {
+               if (*p == '\\' && *(p + 1) != '\0')
+               {
+                       /* Escaped character - must match literally */
+                       p++;
+                       if (*c == *p)
+                       {
+                               c++;
+                               p++;
+                       }
+                       else if (p_star != NULL)
+                       {
+                               /* Backtrack to last * */
+                               p = p_star + 1;
+                               c = ++c_star;
+                       }
+                       else
+                               return false;
+               }
+               else if (*p == '?')
+               {
+                       /* ? matches any single character */
+                       c++;
+                       p++;
+               }
+               else if (*p == '*')
+               {
+                       /* Remember position for backtracking */
+                       p_star = p++;
+                       c_star = c;
+               }
+               else if (*c == *p)
+               {
+                       /* Literal match */
+                       c++;
+                       p++;
+               }
+               else if (p_star != NULL)
+               {
+                       /* Mismatch, but we have a * to backtrack to */
+                       p = p_star + 1;
+                       c = ++c_star;
+               }
+               else
+               {
+                       /* Mismatch with no * to backtrack to */
+                       return false;
+               }
+       }
+
+       /* Skip trailing *'s in pattern */
+       while (*p == '*')
+               p++;
+
+       return (*p == '\0');
+}
+
+/*
+ * Check if the channel matches any pattern we are listening on.
+ */
+static bool
+IsListeningOnPattern(const char *channel)
+{
+       foreach_ptr(char, pattern, localPatternList)
+       {
+               if (MatchPattern(channel, pattern))
+                       return true;
+       }
+
+       return false;
+}
+
+/*
+ * Add a pattern to the global pattern list in shared memory.
+ *
+ * Caller must hold NotifyQueueLock in exclusive mode.
+ */
+static void
+AddPatternToGlobalList(const char *pattern)
+{
+       GlobalPatternList *gpl = &asyncQueueControl->globalPatterns;
+       GlobalChannelKey *patterns;
+
+       /* Initialize the array if needed */
+       if (!DsaPointerIsValid(gpl->patternsArray))
+       {
+               gpl->patternsArray = dsa_allocate(globalChannelDSA,
+                                                                               
  sizeof(GlobalChannelKey) * INITIAL_PATTERNS_ARRAY_SIZE);
+               gpl->allocatedPatterns = INITIAL_PATTERNS_ARRAY_SIZE;
+               gpl->numPatterns = 0;
+       }
+
+       patterns = (GlobalChannelKey *)
+               dsa_get_address(globalChannelDSA, gpl->patternsArray);
+
+       /* Check if this pattern already exists */
+       for (int i = 0; i < gpl->numPatterns; i++)
+       {
+               if (patterns[i].dboid == MyDatabaseId &&
+                       strcmp(patterns[i].channel, pattern) == 0)
+                       return;                         /* already registered */
+       }
+
+       /* Grow array if necessary */
+       if (gpl->numPatterns >= gpl->allocatedPatterns)
+       {
+               int                     new_size = gpl->allocatedPatterns * 2;
+               dsa_pointer old_array = gpl->patternsArray;
+               dsa_pointer new_array = dsa_allocate(globalChannelDSA,
+                                                                               
         sizeof(GlobalChannelKey) * new_size);
+               GlobalChannelKey *new_patterns = (GlobalChannelKey *)
+                       dsa_get_address(globalChannelDSA, new_array);
+
+               memcpy(new_patterns, patterns, sizeof(GlobalChannelKey) * 
gpl->numPatterns);
+               gpl->patternsArray = new_array;
+               gpl->allocatedPatterns = new_size;
+               dsa_free(globalChannelDSA, old_array);
+               patterns = new_patterns;
+       }
+
+       /* Add the new pattern */
+       strlcpy(patterns[gpl->numPatterns].channel, pattern, NAMEDATALEN);
+       patterns[gpl->numPatterns].dboid = MyDatabaseId;
+       GlobalChannelKeyInit(&patterns[gpl->numPatterns], MyDatabaseId, 
pattern);
+       gpl->numPatterns++;
+}
+
+/*
+ * Remove a pattern from the global pattern list.
+ *
+ * Caller must hold NotifyQueueLock in exclusive mode.
+ */
+static void
+RemovePatternFromGlobalList(const char *pattern)
+{
+       GlobalPatternList *gpl = &asyncQueueControl->globalPatterns;
+       GlobalChannelKey *patterns;
+
+       if (!DsaPointerIsValid(gpl->patternsArray))
+               return;
+
+       patterns = (GlobalChannelKey *)
+               dsa_get_address(globalChannelDSA, gpl->patternsArray);
+
+       for (int i = 0; i < gpl->numPatterns; i++)
+       {
+               if (patterns[i].dboid == MyDatabaseId &&
+                       strcmp(patterns[i].channel, pattern) == 0)
+               {
+                       /* Remove by shifting remaining entries */
+                       gpl->numPatterns--;
+                       if (i < gpl->numPatterns)
+                               memmove(&patterns[i], &patterns[i + 1],
+                                               sizeof(GlobalChannelKey) * 
(gpl->numPatterns - i));
+                       return;
+               }
+       }
 }
 
 /*
@@ -1902,7 +2322,10 @@ IsListeningOn(const char *channel)
        if (localChannelTable == NULL)
                return false;
 
-       return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != 
NULL);
+       if (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL)
+               return true;
+
+       return IsListeningOnPattern(channel);
 }
 
 /*
@@ -1926,6 +2349,7 @@ asyncQueueUnregister(void)
        QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
        QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
        QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
+       QUEUE_BACKEND_HAS_PATTERNS(MyProcNumber) = false;
        /* and remove it from the list */
        if (QUEUE_FIRST_LISTENER == MyProcNumber)
                QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber);
@@ -2274,6 +2698,15 @@ SignalBackends(void)
         * Identify backends that we need to signal.  We don't want to send
         * signals while holding the NotifyQueueLock, so this part just builds a
         * list of target PIDs in signalPids[] and signalProcnos[].
+        *
+        * For each notification channel, we perform two lookups: 1. Exact 
match:
+        * look up the channel name directly in globalChannelTable 2. Pattern
+        * match: iterate the global pattern list and test each pattern against 
the
+        * channel; for matches, look up the pattern's listener list in
+        * globalChannelTable
+        *
+        * Both exact and pattern listeners can receive the same notification, 
so
+        * we process both independently.
         */
        count = 0;
 
@@ -2284,59 +2717,52 @@ SignalBackends(void)
        {
                GlobalChannelKey key;
                GlobalChannelEntry *entry;
-               ListenerEntry *listeners;
 
+               /*
+                * First try exact match lookup. This handles backends that 
issued
+                * LISTEN 'channel_name' for this specific channel.
+                */
                GlobalChannelKeyInit(&key, MyDatabaseId, channel);
                entry = dshash_find(globalChannelTable, &key, false);
-               if (entry == NULL)
-                       continue;                       /* nobody is listening 
*/
-
-               listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA,
-                                                                               
                          entry->listenersArray);
+               if (entry != NULL)
+                       SignalListenersForEntry(entry, &count);
 
                /*
-                * Identify listeners that now need waking, add them to arrays.
-                *
-                * Note that we signal listeners regardless of the state of 
their
-                * removeOnAbort flags.  Hence a new listener that reached 
PreCommit,
-                * but then failed before AtCommit_Notify, can receive a signal 
even
-                * though it was never really listening.  This is okay because 
it will
-                * not do anything in response to that signal.  If we did not 
do it
-                * like this then a new listener might miss some messages due 
to the
-                * direct-advance logic below.
+                * Now check all patterns in the global pattern list. For each 
pattern
+                * that matches the notification channel, look up the pattern's 
entry
+                * in globalChannelTable (keyed by the pattern string) to find 
and
+                * wake its listeners.
                 */
-               for (int j = 0; j < entry->numListeners; j++)
+               if 
(DsaPointerIsValid(asyncQueueControl->globalPatterns.patternsArray))
                {
-                       ProcNumber      i = listeners[j].procNo;
-                       int32           pid;
-                       QueuePosition pos;
-
-                       if (QUEUE_BACKEND_WAKEUP_PENDING(i))
-                               continue;               /* already signaled, no 
need to repeat */
+                       GlobalPatternList *gpl = 
&asyncQueueControl->globalPatterns;
+                       GlobalChannelKey *patterns;
 
-                       pid = QUEUE_BACKEND_PID(i);
-                       pos = QUEUE_BACKEND_POS(i);
+                       patterns = (GlobalChannelKey *)
+                               dsa_get_address(globalChannelDSA, 
gpl->patternsArray);
 
-                       if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
-                               continue;               /* it's fully caught up 
already */
-
-                       Assert(pid != InvalidPid);
+                       /* Check all matching patterns */
+                       for (int i = 0; i < gpl->numPatterns; i++)
+                       {
+                               GlobalChannelKey *pattern = &patterns[i];
 
-                       QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
-                       signalPids[count] = pid;
-                       signalProcnos[count] = i;
-                       count++;
+                               if (pattern->dboid == MyDatabaseId && 
MatchPattern(channel, pattern->channel))
+                               {
+                                       entry = dshash_find(globalChannelTable, 
pattern, false);
+                                       if (entry != NULL)
+                                               SignalListenersForEntry(entry, 
&count);
+                               }
+                       }
                }
-
-               dshash_release_lock(globalChannelTable, entry);
        }
 
        /*
         * Scan all listeners.  Any that are not already pending wakeup must not
         * be interested in our notifications (else we'd have set their wakeup
-        * flags above).  Check to see if we can directly advance their queue
-        * pointers to save a wakeup.  Otherwise, if they are far behind, wake
-        * them anyway so they will catch up.
+        * flags above, including pattern listeners via the global pattern 
list).
+        * Check to see if we can directly advance their queue pointers to save 
a
+        * wakeup.  Otherwise, if they are far behind, wake them anyway so they
+        * will catch up.
         */
        for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = 
QUEUE_NEXT_LISTENER(i))
        {
@@ -3297,3 +3723,55 @@ check_notify_buffers(int *newval, void **extra, 
GucSource source)
 {
        return check_slru_buffers("notify_buffers", newval);
 }
+
+/*
+ * SignalListenersForEntry --- helper for SignalBackends
+ *
+ * Wake all listeners registered in the given GlobalChannelEntry by adding
+ * them to the signalPids/signalProcnos arrays. Releases the dshash lock
+ * on the entry before returning.
+ *
+ * This is called for both exact channel matches and pattern matches,
+ * allowing a single notification to wake listeners from multiple entries.
+ */
+static void
+SignalListenersForEntry(GlobalChannelEntry *entry, int *count)
+{
+       ListenerEntry *listeners = dsa_get_address(globalChannelDSA, 
entry->listenersArray);
+
+       /*
+        * Identify listeners that now need waking, add them to arrays.
+        *
+        * Note that we signal listeners regardless of the state of their
+        * removeOnAbort flags.  Hence a new listener that reached PreCommit, 
but
+        * then failed before AtCommit_Notify, can receive a signal even though 
it
+        * was never really listening.  This is okay because it will not do
+        * anything in response to that signal.  If we did not do it like this
+        * then a new listener might miss some messages due to the 
direct-advance
+        * logic below.
+        */
+       for (int j = 0; j < entry->numListeners; j++)
+       {
+               ProcNumber      i = listeners[j].procNo;
+               int32           pid;
+               QueuePosition pos;
+
+               if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+                       continue;                       /* already signaled, no 
need to repeat */
+
+               pid = QUEUE_BACKEND_PID(i);
+               pos = QUEUE_BACKEND_POS(i);
+
+               if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+                       continue;                       /* it's fully caught up 
already */
+
+               Assert(pid != InvalidPid);
+
+               QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+               signalPids[*count] = pid;
+               signalProcnos[*count] = i;
+               (*count)++;
+       }
+
+       dshash_release_lock(globalChannelTable, entry);
+}
diff --git a/src/test/isolation/expected/async-notify.out 
b/src/test/isolation/expected/async-notify.out
index 55b7cbc6e02..95980ded634 100644
--- a/src/test/isolation/expected/async-notify.out
+++ b/src/test/isolation/expected/async-notify.out
@@ -1,4 +1,4 @@
-Parsed test spec with 6 sessions
+Parsed test spec with 8 sessions
 
 starting permutation: listenc notify1 notify2 notify3 notifyf
 step listenc: LISTEN c1; LISTEN c2;
@@ -238,3 +238,90 @@ nonzero
 t      
 (1 row)
 
+
+starting permutation: plisten_star pnotify_user_login pnotify_user_logout 
pnotify_user_empty pcheck
+step plisten_star: LISTEN "user_*";
+step pnotify_user_login: NOTIFY user_login, 'login payload';
+step pnotify_user_logout: NOTIFY user_logout, 'logout payload';
+step pnotify_user_empty: NOTIFY user_, 'empty suffix';
+step pcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+pattern_listener: NOTIFY "user_login" with payload "login payload" from 
pattern_notifier
+pattern_listener: NOTIFY "user_logout" with payload "logout payload" from 
pattern_notifier
+pattern_listener: NOTIFY "user_" with payload "empty suffix" from 
pattern_notifier
+
+starting permutation: plisten_question pnotify_msg_1 pnotify_msg_a 
pnotify_msg_12 pcheck
+step plisten_question: LISTEN "msg_?";
+step pnotify_msg_1: NOTIFY msg_1, 'message 1';
+step pnotify_msg_a: NOTIFY msg_a, 'message a';
+step pnotify_msg_12: NOTIFY msg_12, 'should not match';
+step pcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+pattern_listener: NOTIFY "msg_1" with payload "message 1" from pattern_notifier
+pattern_listener: NOTIFY "msg_a" with payload "message a" from pattern_notifier
+
+starting permutation: plisten_complex pnotify_prefix_mid_suffix 
pnotify_prefix_suffix pnotify_prefix_a_b_suffix pcheck
+step plisten_complex: LISTEN "prefix_*_suffix";
+step pnotify_prefix_mid_suffix: NOTIFY prefix_mid_suffix, 'complex match';
+step pnotify_prefix_suffix: NOTIFY prefix_suffix, 'no middle';
+step pnotify_prefix_a_b_suffix: NOTIFY prefix_a_b_suffix, 'multi char middle';
+step pcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+pattern_listener: NOTIFY "prefix_mid_suffix" with payload "complex match" from 
pattern_notifier
+pattern_listener: NOTIFY "prefix_a_b_suffix" with payload "multi char middle" 
from pattern_notifier
+
+starting permutation: plisten_multi pnotify_event_created pnotify_alert_x 
pnotify_nomatch pcheck
+step plisten_multi: LISTEN "event_*"; LISTEN "alert_?";
+step pnotify_event_created: NOTIFY event_created, 'event';
+step pnotify_alert_x: NOTIFY alert_x, 'alert';
+step pnotify_nomatch: NOTIFY completely_different, 'no match';
+step pcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+pattern_listener: NOTIFY "event_created" with payload "event" from 
pattern_notifier
+pattern_listener: NOTIFY "alert_x" with payload "alert" from pattern_notifier
+
+starting permutation: plisten_star pnotify_user_login pcheck
+step plisten_star: LISTEN "user_*";
+step pnotify_user_login: NOTIFY user_login, 'login payload';
+step pcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+pattern_listener: NOTIFY "user_login" with payload "login payload" from 
pattern_notifier
+
+starting permutation: llisten plisten_star notify1 pnotify_user_login lcheck 
pcheck
+step llisten: LISTEN c1; LISTEN c2;
+step plisten_star: LISTEN "user_*";
+step notify1: NOTIFY c1;
+step pnotify_user_login: NOTIFY user_login, 'login payload';
+step lcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+step pcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+pattern_listener: NOTIFY "user_login" with payload "login payload" from 
pattern_notifier
diff --git a/src/test/isolation/specs/async-notify.spec 
b/src/test/isolation/specs/async-notify.spec
index 7aef2e8d180..ab4cc2449dc 100644
--- a/src/test/isolation/specs/async-notify.spec
+++ b/src/test/isolation/specs/async-notify.spec
@@ -94,6 +94,30 @@ step lsnotify        { NOTIFY c1, 'subxact_test'; }
 step lsnotify_check    { NOTIFY c1, 'should_not_receive'; }
 teardown               { UNLISTEN *; }
 
+# Session for pattern listening tests
+session pattern_listener
+step plisten_star      { LISTEN "user_*"; }
+step plisten_question  { LISTEN "msg_?"; }
+step plisten_complex   { LISTEN "prefix_*_suffix"; }
+step plisten_multi     { LISTEN "event_*"; LISTEN "alert_?"; }
+step pcheck            { SELECT 1 AS x; }
+teardown               { UNLISTEN *; }
+
+# Session for sending notifications to pattern listeners
+session pattern_notifier
+step pnotify_user_login                { NOTIFY user_login, 'login payload'; }
+step pnotify_user_logout       { NOTIFY user_logout, 'logout payload'; }
+step pnotify_user_empty                { NOTIFY user_, 'empty suffix'; }
+step pnotify_msg_1             { NOTIFY msg_1, 'message 1'; }
+step pnotify_msg_a             { NOTIFY msg_a, 'message a'; }
+step pnotify_msg_12            { NOTIFY msg_12, 'should not match'; }
+step pnotify_prefix_mid_suffix { NOTIFY prefix_mid_suffix, 'complex match'; }
+step pnotify_prefix_suffix     { NOTIFY prefix_suffix, 'no middle'; }
+step pnotify_prefix_a_b_suffix { NOTIFY prefix_a_b_suffix, 'multi char 
middle'; }
+step pnotify_event_created     { NOTIFY event_created, 'event'; }
+step pnotify_alert_x           { NOTIFY alert_x, 'alert'; }
+step pnotify_nomatch           { NOTIFY completely_different, 'no match'; }
+
 
 # Trivial cases.
 permutation listenc notify1 notify2 notify3 notifyf
@@ -145,3 +169,32 @@ permutation lch_listen nch_notify lch_check
 # Hence, this should be the last test in this script.
 
 permutation llisten lbegin usage bignotify usage
+
+#
+# Pattern LISTEN/NOTIFY tests
+#
+
+# Basic pattern with * (zero or more characters)
+# Should receive: user_login, user_logout, user_ (empty suffix matches *)
+permutation plisten_star pnotify_user_login pnotify_user_logout 
pnotify_user_empty pcheck
+
+# Pattern with ? (exactly one character)
+# Should receive: msg_1, msg_a
+# Should NOT receive: msg_12 (two chars after _)
+permutation plisten_question pnotify_msg_1 pnotify_msg_a pnotify_msg_12 pcheck
+
+# Complex pattern with * in the middle
+# Should receive: prefix_mid_suffix, prefix_a_b_suffix
+# Should NOT receive: prefix_suffix (requires at least one char for *)
+permutation plisten_complex pnotify_prefix_mid_suffix pnotify_prefix_suffix 
pnotify_prefix_a_b_suffix pcheck
+
+# Multiple patterns on same session
+# Should receive: event_created (matches event_*), alert_x (matches alert_?)
+# Should NOT receive: completely_different
+permutation plisten_multi pnotify_event_created pnotify_alert_x 
pnotify_nomatch pcheck
+
+# Cross-session pattern test: listener listens before notifier sends
+permutation plisten_star pnotify_user_login pcheck
+
+# Pattern listener with regular listener (mixed)
+permutation llisten plisten_star notify1 pnotify_user_login lcheck pcheck
diff --git a/src/test/regress/expected/async.out 
b/src/test/regress/expected/async.out
index 19cbe38e636..c5236c15fb9 100644
--- a/src/test/regress/expected/async.out
+++ b/src/test/regress/expected/async.out
@@ -40,3 +40,24 @@ SELECT pg_notification_queue_usage();
                            0
 (1 row)
 
+--
+-- Pattern LISTEN tests
+--
+-- Pattern with * (matches zero or more characters)
+LISTEN "user_*";
+UNLISTEN "user_*";
+-- Pattern with ? (matches exactly one character)
+LISTEN "msg_?";
+UNLISTEN "msg_?";
+-- Pattern with wildcards in the middle
+LISTEN "prefix_*_suffix";
+UNLISTEN "prefix_*_suffix";
+-- Escaped wildcards (for literal * and ? matching)
+LISTEN "literal\*star";
+LISTEN "literal\?question";
+UNLISTEN "literal\*star";
+UNLISTEN "literal\?question";
+-- Multiple patterns
+LISTEN "event_*";
+LISTEN "alert_?";
+UNLISTEN *;
diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql
index 40f6e015387..ca666c3019e 100644
--- a/src/test/regress/sql/async.sql
+++ b/src/test/regress/sql/async.sql
@@ -21,3 +21,30 @@ UNLISTEN *;
 -- Should return zero while there are no pending notifications.
 -- src/test/isolation/specs/async-notify.spec tests for actual usage.
 SELECT pg_notification_queue_usage();
+
+--
+-- Pattern LISTEN tests
+--
+
+-- Pattern with * (matches zero or more characters)
+LISTEN "user_*";
+UNLISTEN "user_*";
+
+-- Pattern with ? (matches exactly one character)
+LISTEN "msg_?";
+UNLISTEN "msg_?";
+
+-- Pattern with wildcards in the middle
+LISTEN "prefix_*_suffix";
+UNLISTEN "prefix_*_suffix";
+
+-- Escaped wildcards (for literal * and ? matching)
+LISTEN "literal\*star";
+LISTEN "literal\?question";
+UNLISTEN "literal\*star";
+UNLISTEN "literal\?question";
+
+-- Multiple patterns
+LISTEN "event_*";
+LISTEN "alert_?";
+UNLISTEN *;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 8cf40c87043..13a38f680ab 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1158,6 +1158,7 @@ GistTsVectorOptions
 GistVacState
 GlobalChannelEntry
 GlobalChannelKey
+GlobalPatternList
 GlobalTransaction
 GlobalTransactionData
 GlobalVisHorizonKind
-- 
2.50.1 (Apple Git-155)

Reply via email to