On Wed, Oct 15, 2025, at 16:16, Tom Lane wrote: > Arseniy Mukhin <[email protected]> writes: >> I think "Direct advancement" is a good idea. But the way it's >> implemented now has a concurrency bug. Listeners store its current >> position in the local variable 'pos' during the reading in >> asyncQueueReadAllNotifications() and don't hold NotifyQueueLock. It >> means that some notifier can directly advance the listener's position >> while the listener has an old value in the local variable. The same >> time we use listener positions to find out the limit we can truncate >> the queue in asyncQueueAdvanceTail(). > > Good catch!
I've implemented the three ideas presented below, attached as .txt files that are diffs on top of v19, which has these changes since v17: 0002-optimize_listen_notify-v19.patch: * Improve wording of top comment per request from Chao Li. * Add initChannelHash call to top of SignalBackends, to fix bug reported by Arseniy Mukhin. > I think we can perhaps salvage the idea if we invent a separate > "advisory" queue position field, which tells its backend "hey, > you could skip as far as here if you want", but is not used for > purposes of SLRU truncation. Above idea is implemented in 0002-optimize_listen_notify-v19-alt1.txt > Alternatively, split the queue pos > into "this is where to read next" and "this is as much as I'm > definitively done with", where the second field gets advanced at > the end of asyncQueueReadAllNotifications. Not sure which > view would be less confusing (in the end I guess they're nearly > the same thing, differently explained). Above idea is implemented in 0002-optimize_listen_notify-v19-alt2.txt > A different line of thought could be to get rid of > asyncQueueReadAllNotifications's optimization of moving the > queue pos only once, per > > * (We could alternatively retake NotifyQueueLock and move the position > * before handling each individual message, but that seems like too much > * lock traffic.) > > Since we only need shared lock to advance our own queue pos, > maybe that wouldn't be too awful. Not sure. Above idea is implemented in 0002-optimize_listen_notify-v19-alt3.txt /Joel
0001-optimize_listen_notify-v19.patch
Description: Binary data
0002-optimize_listen_notify-v19.patch
Description: Binary data
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 90a530cfc61..44442e927ff 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -264,6 +264,11 @@ typedef struct QueuePosition
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
+/* returns true if x comes before y in queue order */
+#define QUEUE_POS_PRECEDES(x,y) \
+ (asyncQueuePagePrecedes((x).page, (y).page) || \
+ ((x).page == (y).page && (x).offset < (y).offset))
+
/*
* Parameter determining how often we try to advance the tail pointer:
* we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
@@ -286,6 +291,7 @@ typedef struct QueueBackendStatus
Oid dboid; /* backend's database
OID, or InvalidOid */
ProcNumber nextListener; /* id of next listener, or
INVALID_PROC_NUMBER */
QueuePosition pos; /* backend has read queue up to
here */
+ QueuePosition advisoryPos; /* backend could skip queue to here */
bool wakeupPending; /* signal sent but not yet processed */
} QueueBackendStatus;
@@ -347,6 +353,7 @@ static dshash_table *channelHash = NULL;
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
#define QUEUE_NEXT_LISTENER(i)
(asyncQueueControl->backend[i].nextListener)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_ADVISORY_POS(i)
(asyncQueueControl->backend[i].advisoryPos)
#define QUEUE_BACKEND_WAKEUP_PENDING(i)
(asyncQueueControl->backend[i].wakeupPending)
/*
@@ -668,6 +675,7 @@ AsyncShmemInit(void)
QUEUE_BACKEND_DBOID(i) = InvalidOid;
QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+ SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0);
QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
}
}
@@ -2009,9 +2017,14 @@ SignalBackends(void)
* Even though we may take and release NotifyQueueLock multiple times
* while writing, the heavyweight lock guarantees this region contains
* only our messages. Therefore, any backend still positioned at the
- * queue head from before our write can be safely advanced to the
current
+ * queue head from before our write can be advised to skip to the
current
* queue head without waking it.
*
+ * We use the advisoryPos field rather than directly modifying pos,
+ * because the listening backend might be concurrently reading
+ * notifications using its local copy of pos. The backend controls its
+ * own pos field and will check advisoryPos when it's safe to do so.
+ *
* False-positive possibility: if a backend was previously signaled but
* hasn't yet awoken, we'll skip advancing it (because wakeupPending is
* true). This is safe - the backend will advance its pointer when it
@@ -2038,7 +2051,7 @@ SignalBackends(void)
if (pendingNotifies != NULL &&
QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite))
{
- QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+ QUEUE_BACKEND_ADVISORY_POS(i) = queueHeadAfterWrite;
pos = queueHeadAfterWrite;
}
@@ -2297,6 +2310,26 @@ asyncQueueReadAllNotifications(void)
QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
pos = QUEUE_BACKEND_POS(MyProcNumber);
head = QUEUE_HEAD;
+
+ /*
+ * Check if another backend has set an advisory position for us.
+ * If so, and if we haven't yet read past that point, we can safely
+ * adopt the advisory position and skip the intervening notifications.
+ * This is safe because the advisory position is only set when we're
+ * positioned at a known point and the skipped region contains only
+ * notifications we're not interested in.
+ */
+ {
+ QueuePosition advisoryPos =
QUEUE_BACKEND_ADVISORY_POS(MyProcNumber);
+
+ if (!QUEUE_POS_EQUAL(advisoryPos, pos) &&
+ QUEUE_POS_PRECEDES(pos, advisoryPos))
+ {
+ pos = advisoryPos;
+ QUEUE_BACKEND_POS(MyProcNumber) = pos;
+ }
+ }
+
LWLockRelease(NotifyQueueLock);
if (QUEUE_POS_EQUAL(pos, head))
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 90a530cfc61..e201deb5e54 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -70,14 +70,14 @@
* CommitTransaction() which will then do the actual transaction commit.
*
* After commit we are called another time (AtCommit_Notify()). Here we
- * make any actual updates to the local listen state (listenChannels) and
- * shared channel hash table (channelHash). Then we signal any backends
- * that may be interested in our messages (including our own backend,
- * if listening). This is done by SignalBackends(), which consults the
- * shared channel hash table to identify listeners for the channels that
- * have pending notifications in the current database. Each selected
- * backend is marked as having a wakeup pending to avoid duplicate
signals,
- * and a PROCSIG_NOTIFY_INTERRUPT signal is sent to it.
+ * make any actual updates to the effective listen state (channelHash).
+ * Then we signal any backends that may be interested in our messages
+ * (including our own backend, if listening). This is done by
+ * SignalBackends(), which consults the shared channel hash table to
+ * identify listeners for the channels that have pending notifications
+ * in the current database. Each selected backend is marked as having a
+ * wakeup pending to avoid duplicate signals, and a
PROCSIG_NOTIFY_INTERRUPT
+ * signal is sent to it.
*
* When writing notifications, PreCommit_Notify() records the queue head
* position both before and after the write. Because all writers
serialize
@@ -2282,6 +2282,7 @@ asyncQueueReadAllNotifications(void)
volatile QueuePosition pos;
QueuePosition head;
Snapshot snapshot;
+ bool reachedStop;
/* page_buffer must be adequately aligned, so use a union */
union
@@ -2350,77 +2351,83 @@ asyncQueueReadAllNotifications(void)
* It is possible that we fail while trying to send a message to our
* frontend (for example, because of encoding conversion failure). If
* that happens it is critical that we not try to send the same message
- * over and over again. Therefore, we place a PG_TRY block here that
will
- * forcibly advance our queue position before we lose control to an
error.
- * (We could alternatively retake NotifyQueueLock and move the position
- * before handling each individual message, but that seems like too much
- * lock traffic.)
+ * over and over again. Therefore, we must advance our queue position
+ * regularly as we process messages.
+ *
+ * We must also be careful about concurrency: SignalBackends() can
+ * directly advance our position while we're reading. To prevent
+ * overwriting such an advancement with a stale value, we update our
+ * position in shared memory after processing messages from each page,
+ * while holding NotifyQueueLock. Shared lock is sufficient since we're
+ * only updating our own position.
*/
- PG_TRY();
+ do
{
- bool reachedStop;
+ int64 curpage = QUEUE_POS_PAGE(pos);
+ int curoffset = QUEUE_POS_OFFSET(pos);
+ int slotno;
+ int copysize;
- do
+ /*
+ * We copy the data from SLRU into a local buffer, so as to
avoid
+ * holding the SLRU lock while we are examining the entries and
+ * possibly transmitting them to our frontend. Copy only the
part
+ * of the page we will actually inspect.
+ */
+ slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
+
InvalidTransactionId);
+ if (curpage == QUEUE_POS_PAGE(head))
{
- int64 curpage = QUEUE_POS_PAGE(pos);
- int curoffset =
QUEUE_POS_OFFSET(pos);
- int slotno;
- int copysize;
+ /* we only want to read as far as head */
+ copysize = QUEUE_POS_OFFSET(head) - curoffset;
+ if (copysize < 0)
+ copysize = 0; /* just for safety */
+ }
+ else
+ {
+ /* fetch all the rest of the page */
+ copysize = QUEUE_PAGESIZE - curoffset;
+ }
+ memcpy(page_buffer.buf + curoffset,
+ NotifyCtl->shared->page_buffer[slotno] + curoffset,
+ copysize);
+ /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
+ LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
- /*
- * We copy the data from SLRU into a local buffer, so
as to avoid
- * holding the SLRU lock while we are examining the
entries and
- * possibly transmitting them to our frontend. Copy
only the part
- * of the page we will actually inspect.
- */
- slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
-
InvalidTransactionId);
- if (curpage == QUEUE_POS_PAGE(head))
- {
- /* we only want to read as far as head */
- copysize = QUEUE_POS_OFFSET(head) - curoffset;
- if (copysize < 0)
- copysize = 0; /* just for safety */
- }
- else
- {
- /* fetch all the rest of the page */
- copysize = QUEUE_PAGESIZE - curoffset;
- }
- memcpy(page_buffer.buf + curoffset,
- NotifyCtl->shared->page_buffer[slotno] +
curoffset,
- copysize);
- /* Release lock that we got from
SimpleLruReadPage_ReadOnly() */
- LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+ /*
+ * Process messages up to the stop position, end of page, or an
+ * uncommitted message.
+ *
+ * Our stop position is what we found to be the head's position
+ * when we entered this function. It might have changed already.
+ * But if it has, we will receive (or have already received and
+ * queued) another signal and come here again.
+ *
+ * We are not holding NotifyQueueLock here! The queue can only
+ * extend beyond the head pointer (see above). We update our
+ * backend's position after processing messages from each page
to
+ * ensure we don't reprocess messages if we fail partway
through,
+ * and to avoid overwriting any direct advancement that
+ * SignalBackends() might perform concurrently.
+ */
+ reachedStop = asyncQueueProcessPageEntries(&pos, head,
+
page_buffer.buf,
+
snapshot);
- /*
- * Process messages up to the stop position, end of
page, or an
- * uncommitted message.
- *
- * Our stop position is what we found to be the head's
position
- * when we entered this function. It might have changed
already.
- * But if it has, we will receive (or have already
received and
- * queued) another signal and come here again.
- *
- * We are not holding NotifyQueueLock here! The queue
can only
- * extend beyond the head pointer (see above) and we
leave our
- * backend's pointer where it is so nobody will
truncate or
- * rewrite pages under us. Especially we don't want to
hold a lock
- * while sending the notifications to the frontend.
- */
- reachedStop = asyncQueueProcessPageEntries(&pos, head,
-
page_buffer.buf,
-
snapshot);
- } while (!reachedStop);
- }
- PG_FINALLY();
- {
- /* Update shared state */
+ /*
+ * Update our position in shared memory. The 'pos' variable now
+ * holds our new position (advanced past all messages we just
+ * processed). This ensures that if we fail while processing
+ * messages from the next page, we won't reprocess the ones we
+ * just handled. It also prevents us from overwriting any
direct
+ * advancement that another backend might have done while we
were
+ * processing messages.
+ */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyProcNumber) = pos;
LWLockRelease(NotifyQueueLock);
- }
- PG_END_TRY();
+
+ } while (!reachedStop);
/* Done with snapshot */
UnregisterSnapshot(snapshot);
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 90a530cfc61..751400b8315 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -285,7 +285,8 @@ typedef struct QueueBackendStatus
int32 pid; /* either a PID or InvalidPid */
Oid dboid; /* backend's database
OID, or InvalidOid */
ProcNumber nextListener; /* id of next listener, or
INVALID_PROC_NUMBER */
- QueuePosition pos; /* backend has read queue up to
here */
+ QueuePosition pos; /* next position to read from */
+ QueuePosition donePos; /* backend has definitively processed
up to here */
bool wakeupPending; /* signal sent but not yet processed */
} QueueBackendStatus;
@@ -347,6 +348,7 @@ static dshash_table *channelHash = NULL;
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
#define QUEUE_NEXT_LISTENER(i)
(asyncQueueControl->backend[i].nextListener)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_DONEPOS(i) (asyncQueueControl->backend[i].donePos)
#define QUEUE_BACKEND_WAKEUP_PENDING(i)
(asyncQueueControl->backend[i].wakeupPending)
/*
@@ -668,6 +670,7 @@ AsyncShmemInit(void)
QUEUE_BACKEND_DBOID(i) = InvalidOid;
QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+ SET_QUEUE_POS(QUEUE_BACKEND_DONEPOS(i), 0, 0);
QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
}
}
@@ -1290,6 +1293,7 @@ Exec_ListenPreCommit(void)
prevListener = i;
}
QUEUE_BACKEND_POS(MyProcNumber) = max;
+ QUEUE_BACKEND_DONEPOS(MyProcNumber) = max;
QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
/* Insert backend into list of listeners at correct position */
@@ -2415,9 +2419,19 @@ asyncQueueReadAllNotifications(void)
}
PG_FINALLY();
{
- /* Update shared state */
+ /*
+ * Update shared state.
+ *
+ * We update donePos to what we actually read (the local pos
variable),
+ * as this is used for truncation safety. For the read
position (pos),
+ * we use the maximum of our local position and the current
shared
+ * position, in case another backend used direct advancement to
skip us
+ * ahead while we were reading. This prevents us from going
backwards
+ * and potentially pointing to a truncated page.
+ */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
- QUEUE_BACKEND_POS(MyProcNumber) = pos;
+ QUEUE_BACKEND_DONEPOS(MyProcNumber) = pos;
+ QUEUE_BACKEND_POS(MyProcNumber) = QUEUE_POS_MAX(pos,
QUEUE_BACKEND_POS(MyProcNumber));
LWLockRelease(NotifyQueueLock);
}
PG_END_TRY();
@@ -2567,7 +2581,13 @@ asyncQueueAdvanceTail(void)
for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i =
QUEUE_NEXT_LISTENER(i))
{
Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
- min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
+ /*
+ * Use donePos rather than pos for truncation safety. The
donePos
+ * field represents what the backend has definitively
processed, while
+ * pos can be advanced by other backends via direct
advancement. This
+ * prevents truncating pages that a backend is still reading
from.
+ */
+ min = QUEUE_POS_MIN(min, QUEUE_BACKEND_DONEPOS(i));
}
QUEUE_TAIL = min;
oldtailpage = QUEUE_STOP_PAGE;
