On Wed, Oct 15, 2025, at 16:16, Tom Lane wrote:
> Arseniy Mukhin <[email protected]> writes:
>> I think "Direct advancement" is a good idea. But the way it's
>> implemented now has a concurrency bug. Listeners store its current
>> position in the local variable 'pos' during the reading in
>> asyncQueueReadAllNotifications() and don't hold NotifyQueueLock. It
>> means that some notifier can directly advance the listener's position
>> while the listener has an old value in the local variable. The same
>> time we use listener positions to find out the limit we can truncate
>> the queue in asyncQueueAdvanceTail().
>
> Good catch!

I've implemented the three ideas presented below, attached as .txt files
that are diffs on top of v19, which has these changes since v17:

0002-optimize_listen_notify-v19.patch:
* Improve wording of top comment per request from Chao Li.
* Add initChannelHash call to top of SignalBackends,
  to fix bug reported by Arseniy Mukhin.

> I think we can perhaps salvage the idea if we invent a separate
> "advisory" queue position field, which tells its backend "hey,
> you could skip as far as here if you want", but is not used for
> purposes of SLRU truncation.

Above idea is implemented in 0002-optimize_listen_notify-v19-alt1.txt

> Alternatively, split the queue pos
> into "this is where to read next" and "this is as much as I'm
> definitively done with", where the second field gets advanced at
> the end of asyncQueueReadAllNotifications.  Not sure which
> view would be less confusing (in the end I guess they're nearly
> the same thing, differently explained).

Above idea is implemented in 0002-optimize_listen_notify-v19-alt2.txt

> A different line of thought could be to get rid of
> asyncQueueReadAllNotifications's optimization of moving the
> queue pos only once, per
>
>        * (We could alternatively retake NotifyQueueLock and move the position
>        * before handling each individual message, but that seems like too much
>        * lock traffic.)
>
> Since we only need shared lock to advance our own queue pos,
> maybe that wouldn't be too awful.  Not sure.

Above idea is implemented in 0002-optimize_listen_notify-v19-alt3.txt

/Joel

Attachment: 0001-optimize_listen_notify-v19.patch
Description: Binary data

Attachment: 0002-optimize_listen_notify-v19.patch
Description: Binary data

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 90a530cfc61..44442e927ff 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -264,6 +264,11 @@ typedef struct QueuePosition
         (x).page != (y).page ? (x) : \
         (x).offset > (y).offset ? (x) : (y))
 
+/* returns true if x comes before y in queue order */
+#define QUEUE_POS_PRECEDES(x,y) \
+       (asyncQueuePagePrecedes((x).page, (y).page) || \
+        ((x).page == (y).page && (x).offset < (y).offset))
+
 /*
  * Parameter determining how often we try to advance the tail pointer:
  * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
@@ -286,6 +291,7 @@ typedef struct QueueBackendStatus
        Oid                     dboid;                  /* backend's database 
OID, or InvalidOid */
        ProcNumber      nextListener;   /* id of next listener, or 
INVALID_PROC_NUMBER */
        QueuePosition pos;                      /* backend has read queue up to 
here */
+       QueuePosition advisoryPos;      /* backend could skip queue to here */
        bool            wakeupPending;  /* signal sent but not yet processed */
 } QueueBackendStatus;
 
@@ -347,6 +353,7 @@ static dshash_table *channelHash = NULL;
 #define QUEUE_BACKEND_DBOID(i)         (asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)         
(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)           (asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_ADVISORY_POS(i)  
(asyncQueueControl->backend[i].advisoryPos)
 #define QUEUE_BACKEND_WAKEUP_PENDING(i)        
(asyncQueueControl->backend[i].wakeupPending)
 
 /*
@@ -668,6 +675,7 @@ AsyncShmemInit(void)
                        QUEUE_BACKEND_DBOID(i) = InvalidOid;
                        QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
                        SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+                       SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0);
                        QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
                }
        }
@@ -2009,9 +2017,14 @@ SignalBackends(void)
         * Even though we may take and release NotifyQueueLock multiple times
         * while writing, the heavyweight lock guarantees this region contains
         * only our messages.  Therefore, any backend still positioned at the
-        * queue head from before our write can be safely advanced to the 
current
+        * queue head from before our write can be advised to skip to the 
current
         * queue head without waking it.
         *
+        * We use the advisoryPos field rather than directly modifying pos,
+        * because the listening backend might be concurrently reading
+        * notifications using its local copy of pos.  The backend controls its
+        * own pos field and will check advisoryPos when it's safe to do so.
+        *
         * False-positive possibility: if a backend was previously signaled but
         * hasn't yet awoken, we'll skip advancing it (because wakeupPending is
         * true).  This is safe - the backend will advance its pointer when it
@@ -2038,7 +2051,7 @@ SignalBackends(void)
                if (pendingNotifies != NULL &&
                        QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite))
                {
-                       QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+                       QUEUE_BACKEND_ADVISORY_POS(i) = queueHeadAfterWrite;
                        pos = queueHeadAfterWrite;
                }
 
@@ -2297,6 +2310,26 @@ asyncQueueReadAllNotifications(void)
        QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
        pos = QUEUE_BACKEND_POS(MyProcNumber);
        head = QUEUE_HEAD;
+
+       /*
+        * Check if another backend has set an advisory position for us.
+        * If so, and if we haven't yet read past that point, we can safely
+        * adopt the advisory position and skip the intervening notifications.
+        * This is safe because the advisory position is only set when we're
+        * positioned at a known point and the skipped region contains only
+        * notifications we're not interested in.
+        */
+       {
+               QueuePosition advisoryPos = 
QUEUE_BACKEND_ADVISORY_POS(MyProcNumber);
+
+               if (!QUEUE_POS_EQUAL(advisoryPos, pos) &&
+                       QUEUE_POS_PRECEDES(pos, advisoryPos))
+               {
+                       pos = advisoryPos;
+                       QUEUE_BACKEND_POS(MyProcNumber) = pos;
+               }
+       }
+
        LWLockRelease(NotifyQueueLock);
 
        if (QUEUE_POS_EQUAL(pos, head))
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 90a530cfc61..e201deb5e54 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -70,14 +70,14 @@
  *       CommitTransaction() which will then do the actual transaction commit.
  *
  *       After commit we are called another time (AtCommit_Notify()). Here we
- *       make any actual updates to the local listen state (listenChannels) and
- *       shared channel hash table (channelHash).  Then we signal any backends
- *       that may be interested in our messages (including our own backend,
- *       if listening).  This is done by SignalBackends(), which consults the
- *       shared channel hash table to identify listeners for the channels that
- *       have pending notifications in the current database.  Each selected
- *       backend is marked as having a wakeup pending to avoid duplicate 
signals,
- *       and a PROCSIG_NOTIFY_INTERRUPT signal is sent to it.
+ *       make any actual updates to the effective listen state (channelHash).
+ *       Then we signal any backends that may be interested in our messages
+ *       (including our own backend, if listening).  This is done by
+ *       SignalBackends(), which consults the shared channel hash table to
+ *       identify listeners for the channels that have pending notifications
+ *       in the current database.  Each selected backend is marked as having a
+ *       wakeup pending to avoid duplicate signals, and a 
PROCSIG_NOTIFY_INTERRUPT
+ *       signal is sent to it.
  *
  *       When writing notifications, PreCommit_Notify() records the queue head
  *       position both before and after the write.  Because all writers 
serialize
@@ -2282,6 +2282,7 @@ asyncQueueReadAllNotifications(void)
        volatile QueuePosition pos;
        QueuePosition head;
        Snapshot        snapshot;
+       bool            reachedStop;
 
        /* page_buffer must be adequately aligned, so use a union */
        union
@@ -2350,77 +2351,83 @@ asyncQueueReadAllNotifications(void)
         * It is possible that we fail while trying to send a message to our
         * frontend (for example, because of encoding conversion failure).  If
         * that happens it is critical that we not try to send the same message
-        * over and over again.  Therefore, we place a PG_TRY block here that 
will
-        * forcibly advance our queue position before we lose control to an 
error.
-        * (We could alternatively retake NotifyQueueLock and move the position
-        * before handling each individual message, but that seems like too much
-        * lock traffic.)
+        * over and over again.  Therefore, we must advance our queue position
+        * regularly as we process messages.
+        *
+        * We must also be careful about concurrency: SignalBackends() can
+        * directly advance our position while we're reading.  To prevent
+        * overwriting such an advancement with a stale value, we update our
+        * position in shared memory after processing messages from each page,
+        * while holding NotifyQueueLock.  Shared lock is sufficient since we're
+        * only updating our own position.
         */
-       PG_TRY();
+       do
        {
-               bool            reachedStop;
+               int64           curpage = QUEUE_POS_PAGE(pos);
+               int                     curoffset = QUEUE_POS_OFFSET(pos);
+               int                     slotno;
+               int                     copysize;
 
-               do
+               /*
+                * We copy the data from SLRU into a local buffer, so as to 
avoid
+                * holding the SLRU lock while we are examining the entries and
+                * possibly transmitting them to our frontend.  Copy only the 
part
+                * of the page we will actually inspect.
+                */
+               slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
+                                                                               
        InvalidTransactionId);
+               if (curpage == QUEUE_POS_PAGE(head))
                {
-                       int64           curpage = QUEUE_POS_PAGE(pos);
-                       int                     curoffset = 
QUEUE_POS_OFFSET(pos);
-                       int                     slotno;
-                       int                     copysize;
+                       /* we only want to read as far as head */
+                       copysize = QUEUE_POS_OFFSET(head) - curoffset;
+                       if (copysize < 0)
+                               copysize = 0;   /* just for safety */
+               }
+               else
+               {
+                       /* fetch all the rest of the page */
+                       copysize = QUEUE_PAGESIZE - curoffset;
+               }
+               memcpy(page_buffer.buf + curoffset,
+                          NotifyCtl->shared->page_buffer[slotno] + curoffset,
+                          copysize);
+               /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
+               LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
 
-                       /*
-                        * We copy the data from SLRU into a local buffer, so 
as to avoid
-                        * holding the SLRU lock while we are examining the 
entries and
-                        * possibly transmitting them to our frontend.  Copy 
only the part
-                        * of the page we will actually inspect.
-                        */
-                       slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
-                                                                               
                InvalidTransactionId);
-                       if (curpage == QUEUE_POS_PAGE(head))
-                       {
-                               /* we only want to read as far as head */
-                               copysize = QUEUE_POS_OFFSET(head) - curoffset;
-                               if (copysize < 0)
-                                       copysize = 0;   /* just for safety */
-                       }
-                       else
-                       {
-                               /* fetch all the rest of the page */
-                               copysize = QUEUE_PAGESIZE - curoffset;
-                       }
-                       memcpy(page_buffer.buf + curoffset,
-                                  NotifyCtl->shared->page_buffer[slotno] + 
curoffset,
-                                  copysize);
-                       /* Release lock that we got from 
SimpleLruReadPage_ReadOnly() */
-                       LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+               /*
+                * Process messages up to the stop position, end of page, or an
+                * uncommitted message.
+                *
+                * Our stop position is what we found to be the head's position
+                * when we entered this function. It might have changed already.
+                * But if it has, we will receive (or have already received and
+                * queued) another signal and come here again.
+                *
+                * We are not holding NotifyQueueLock here! The queue can only
+                * extend beyond the head pointer (see above).  We update our
+                * backend's position after processing messages from each page 
to
+                * ensure we don't reprocess messages if we fail partway 
through,
+                * and to avoid overwriting any direct advancement that
+                * SignalBackends() might perform concurrently.
+                */
+               reachedStop = asyncQueueProcessPageEntries(&pos, head,
+                                                                               
                   page_buffer.buf,
+                                                                               
                   snapshot);
 
-                       /*
-                        * Process messages up to the stop position, end of 
page, or an
-                        * uncommitted message.
-                        *
-                        * Our stop position is what we found to be the head's 
position
-                        * when we entered this function. It might have changed 
already.
-                        * But if it has, we will receive (or have already 
received and
-                        * queued) another signal and come here again.
-                        *
-                        * We are not holding NotifyQueueLock here! The queue 
can only
-                        * extend beyond the head pointer (see above) and we 
leave our
-                        * backend's pointer where it is so nobody will 
truncate or
-                        * rewrite pages under us. Especially we don't want to 
hold a lock
-                        * while sending the notifications to the frontend.
-                        */
-                       reachedStop = asyncQueueProcessPageEntries(&pos, head,
-                                                                               
                           page_buffer.buf,
-                                                                               
                           snapshot);
-               } while (!reachedStop);
-       }
-       PG_FINALLY();
-       {
-               /* Update shared state */
+               /*
+                * Update our position in shared memory.  The 'pos' variable now
+                * holds our new position (advanced past all messages we just
+                * processed).  This ensures that if we fail while processing
+                * messages from the next page, we won't reprocess the ones we
+                * just handled.  It also prevents us from overwriting any 
direct
+                * advancement that another backend might have done while we 
were
+                * processing messages.
+                */
                LWLockAcquire(NotifyQueueLock, LW_SHARED);
                QUEUE_BACKEND_POS(MyProcNumber) = pos;
                LWLockRelease(NotifyQueueLock);
-       }
-       PG_END_TRY();
+
+       } while (!reachedStop);
 
        /* Done with snapshot */
        UnregisterSnapshot(snapshot);
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 90a530cfc61..751400b8315 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -285,7 +285,8 @@ typedef struct QueueBackendStatus
        int32           pid;                    /* either a PID or InvalidPid */
        Oid                     dboid;                  /* backend's database 
OID, or InvalidOid */
        ProcNumber      nextListener;   /* id of next listener, or 
INVALID_PROC_NUMBER */
-       QueuePosition pos;                      /* backend has read queue up to 
here */
+       QueuePosition pos;                      /* next position to read from */
+       QueuePosition donePos;          /* backend has definitively processed 
up to here */
        bool            wakeupPending;  /* signal sent but not yet processed */
 } QueueBackendStatus;
 
@@ -347,6 +348,7 @@ static dshash_table *channelHash = NULL;
 #define QUEUE_BACKEND_DBOID(i)         (asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)         
(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)           (asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_DONEPOS(i)       (asyncQueueControl->backend[i].donePos)
 #define QUEUE_BACKEND_WAKEUP_PENDING(i)        
(asyncQueueControl->backend[i].wakeupPending)
 
 /*
@@ -668,6 +670,7 @@ AsyncShmemInit(void)
                        QUEUE_BACKEND_DBOID(i) = InvalidOid;
                        QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
                        SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+                       SET_QUEUE_POS(QUEUE_BACKEND_DONEPOS(i), 0, 0);
                        QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
                }
        }
@@ -1290,6 +1293,7 @@ Exec_ListenPreCommit(void)
                        prevListener = i;
        }
        QUEUE_BACKEND_POS(MyProcNumber) = max;
+       QUEUE_BACKEND_DONEPOS(MyProcNumber) = max;
        QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
        QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
        /* Insert backend into list of listeners at correct position */
@@ -2415,9 +2419,19 @@ asyncQueueReadAllNotifications(void)
        }
        PG_FINALLY();
        {
-               /* Update shared state */
+               /*
+                * Update shared state.
+                *
+                * We update donePos to what we actually read (the local pos 
variable),
+                * as this is used for truncation safety.  For the read 
position (pos),
+                * we use the maximum of our local position and the current 
shared
+                * position, in case another backend used direct advancement to 
skip us
+                * ahead while we were reading.  This prevents us from going 
backwards
+                * and potentially pointing to a truncated page.
+                */
                LWLockAcquire(NotifyQueueLock, LW_SHARED);
-               QUEUE_BACKEND_POS(MyProcNumber) = pos;
+               QUEUE_BACKEND_DONEPOS(MyProcNumber) = pos;
+               QUEUE_BACKEND_POS(MyProcNumber) = QUEUE_POS_MAX(pos, 
QUEUE_BACKEND_POS(MyProcNumber));
                LWLockRelease(NotifyQueueLock);
        }
        PG_END_TRY();
@@ -2567,7 +2581,13 @@ asyncQueueAdvanceTail(void)
        for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = 
QUEUE_NEXT_LISTENER(i))
        {
                Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
-               min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
+               /*
+                * Use donePos rather than pos for truncation safety.  The 
donePos
+                * field represents what the backend has definitively 
processed, while
+                * pos can be advanced by other backends via direct 
advancement.  This
+                * prevents truncating pages that a backend is still reading 
from.
+                */
+               min = QUEUE_POS_MIN(min, QUEUE_BACKEND_DONEPOS(i));
        }
        QUEUE_TAIL = min;
        oldtailpage = QUEUE_STOP_PAGE;

Reply via email to