On Thu, Nov 6, 2025, at 09:33, Joel Jacobson wrote: > On Thu, Nov 6, 2025, at 00:21, Chao Li wrote: >> That’s what we don’t know. We now lack a performance test for >> evaluating how “direct advancement” efficiently helps if it only >> handles sleeping listeners. So what I was suggesting is that we should >> first create some tests, maybe also add a few more statistics, so that >> we can evaluate different solutions. If a simple implementation that >> only handles sleeping listeners would have performed good enough, of >> course we can take it; otherwise we may need to either pursue a better >> solution.
Changes since v23:
* The advancingPos flag has been split into two fields:
bool isAdvancing: indicates if a backend is currently advancing
QueuePosition advancingPos: the target position the backend will advance to
* The logic in SignalBackends has been reworked and simplified,
thanks to the new isAdvancing and advancingPos fields.
I now think it's finally easy to reason about why each branch
in SignalBackends must be correct.
I've also attached 0003-optimize_listen_notify-v24.txt that adds
instrumentation that can then be used together with the
benchmark/correctness tool pg_async_notify_test-v24.c.
This has been very helpful to me, to develop an intuition for its
concurrency behavior. I hope it can help others as well.
The 0003 patch is only for testing and not part of the patchset,
hence the .txt.
% gcc -Wall -Wextra -O2 -pthread \
-I$(pg_config --includedir-server) \
-I$(pg_config --includedir) \
-L$(pg_config --libdir) \
-o pg_async_notify_test-v24 pg_async_notify_test-v24.c \
-lpq -pthread -lm
% ./pg_async_notify_test-v24 --listeners 10 --notifiers 10 --channels 10
--duration 10
10 s: 301622 sent (29409/s), 3015940 received (294185/s)
Notification Latency Distribution:
0.00-0.01ms 0 (0.0%) avg: 0.000ms
0.01-0.10ms # 25 (0.0%) avg: 0.074ms
0.10-1.00ms # 289 (0.0%) avg: 0.461ms
1.00-10.00ms ######### 2824257 (93.6%) avg: 4.193ms
10.00-100.00ms # 189923 (6.3%) avg: 24.893ms
>100.00ms # 1453 (0.0%) avg: 109.662ms
SignalBackends Statistics:
signaled_targeted # 1251569 (9.4%)
advancing_behind # 108505 (0.8%)
advancing_ahead # 207494 (1.6%)
idle_behind # 408641 (3.1%)
avoided_wakeups ####### 10589740 (79.6%)
already_ahead # 744087 (5.6%)
asyncQueueReadAllNotifications Statistics:
necessary_wakeups ######## 1525695 (86.3%)
unnecessary_wakeups # 242106 (13.7%)
/Joel
From 9f2da84a9c58df155961481aa0802ffb95460811 Mon Sep 17 00:00:00 2001 From: Joel Jacobson <[email protected]> Date: Fri, 7 Nov 2025 19:24:37 +0100 Subject: [PATCH 3/3] Add instrumentation for analyzing LISTEN/NOTIFY wakeup behavior This commit adds a set of atomic counters and SQL-accessible functions to help understand how SignalBackends and asyncQueueReadAllNotifications interact under various workloads. The instrumentation is intended only for development and performance analysis and will not be included in the final patch. Specifically: * Added several pg_atomic_uint32 counters in AsyncQueueControl tracking wakeup categories such as signaled backends, advancing vs. idle positions, direct advancements, and unnecessary wakeups. * Incremented these counters in SignalBackends() and asyncQueueReadAllNotifications() to classify wakeup decisions. * Added SQL functions pg_get_async_wakeup_stats() and pg_reset_async_wakeup_stats() for reading and resetting these counters during test runs. * Modified asyncQueueProcessPageEntries() to report whether any notifications were of interest to the backend, allowing differentiation between necessary and unnecessary wakeups. This is purely diagnostic code to help reason about backend wakeup patterns and validate assumptions during optimization. It introduces no user-visible or behavioral changes and is not intended for commit to the main tree. --- src/backend/commands/async.c | 171 +++++++++++++++++++++++++++----- src/include/catalog/pg_proc.dat | 13 ++- 2 files changed, 159 insertions(+), 25 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 9f7b8a3324a..2ec6b6b9e2b 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -136,6 +136,7 @@ #include <unistd.h> #include <signal.h> +#include "access/htup_details.h" #include "access/parallel.h" #include "access/slru.h" #include "access/transam.h" @@ -332,6 +333,14 @@ typedef struct AsyncQueueControl TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ dsa_handle channelHashDSA; dshash_table_handle channelHashDSH; + pg_atomic_uint32 signaledTargeted; /* listening to some of the channels; signal needed */ + pg_atomic_uint32 advancingBehind; /* advancing, position behind queue head before write */ + pg_atomic_uint32 advancingAhead; /* advancing, position ahead of queue head after write */ + pg_atomic_uint32 idleBehind; /* stationary at a position behind queue head before write */ + pg_atomic_uint32 avoidedWakeups; /* directly advanced */ + pg_atomic_uint32 alreadyAhead; /* already caught up or ahead, no action needed */ + pg_atomic_uint32 necessaryWakeups; /* wakeups where at least one message was interesting */ + pg_atomic_uint32 unnecessaryWakeups; /* wakeups where we had no interest in any of the messages */ QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; } AsyncQueueControl; @@ -518,7 +527,8 @@ static void asyncQueueReadAllNotifications(void); static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, - Snapshot snapshot); + Snapshot snapshot, + bool *interested); static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(bool flush); static bool AsyncExistsPendingNotify(Notification *n); @@ -684,6 +694,15 @@ AsyncShmemInit(void) asyncQueueControl->lastQueueFillWarn = 0; asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID; asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID; + pg_atomic_init_u32(&asyncQueueControl->signaledTargeted, 0); + pg_atomic_init_u32(&asyncQueueControl->advancingBehind, 0); + pg_atomic_init_u32(&asyncQueueControl->advancingAhead, 0); + pg_atomic_init_u32(&asyncQueueControl->idleBehind, 0); + pg_atomic_init_u32(&asyncQueueControl->avoidedWakeups, 0); + pg_atomic_init_u32(&asyncQueueControl->alreadyAhead, 0); + pg_atomic_init_u32(&asyncQueueControl->necessaryWakeups, 0); + pg_atomic_init_u32(&asyncQueueControl->unnecessaryWakeups, 0); + for (int i = 0; i < MaxBackends; i++) { QUEUE_BACKEND_PID(i) = InvalidPid; @@ -998,6 +1017,85 @@ pg_listening_channels(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } +/* + * SQL function: return statistics about NOTIFY wakeups + * + * This function returns a single row with: + * - necessary_wakeups: wakeups where at least one message was interesting + * - unnecessary_wakeups: wakeups where no messages were interesting + * - direct_advancements_success: directly advanced + * - already_advancing: already advancing its position + * - signaled_uncertain: signaled due to uncertain need + * - already_ahead: already ahead, no action needed + */ +Datum +pg_get_async_wakeup_stats(PG_FUNCTION_ARGS) +{ + TupleDesc tupdesc; + Datum values[8]; + bool nulls[8]; + HeapTuple tuple; + uint32 signaled_targeted; + uint32 advancing_behind; + uint32 advancing_ahead; + uint32 idle_behind; + uint32 avoided_wakeups; + uint32 already_ahead; + uint32 necessary_wakeups; + uint32 unnecessary_wakeups; + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context that cannot accept type record"))); + + /* Read the atomic counters */ + signaled_targeted = pg_atomic_read_u32(&asyncQueueControl->signaledTargeted); + advancing_behind = pg_atomic_read_u32(&asyncQueueControl->advancingBehind); + advancing_ahead = pg_atomic_read_u32(&asyncQueueControl->advancingAhead); + idle_behind = pg_atomic_read_u32(&asyncQueueControl->idleBehind); + avoided_wakeups = pg_atomic_read_u32(&asyncQueueControl->avoidedWakeups); + already_ahead = pg_atomic_read_u32(&asyncQueueControl->alreadyAhead); + necessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->necessaryWakeups); + unnecessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->unnecessaryWakeups); + + /* Fill in the values */ + memset(nulls, 0, sizeof(nulls)); + values[0] = Int64GetDatum((int64) signaled_targeted); + values[1] = Int64GetDatum((int64) advancing_behind); + values[2] = Int64GetDatum((int64) advancing_ahead); + values[3] = Int64GetDatum((int64) idle_behind); + values[4] = Int64GetDatum((int64) avoided_wakeups); + values[5] = Int64GetDatum((int64) already_ahead); + values[6] = Int64GetDatum((int64) necessary_wakeups); + values[7] = Int64GetDatum((int64) unnecessary_wakeups); + + tuple = heap_form_tuple(tupdesc, values, nulls); + PG_RETURN_DATUM(HeapTupleGetDatum(tuple)); +} + +/* + * SQL function: reset NOTIFY wakeup statistics + * + * This function resets all the async wakeup counters to zero. + */ +Datum +pg_reset_async_wakeup_stats(PG_FUNCTION_ARGS) +{ + /* Reset all the atomic counters to zero */ + pg_atomic_write_u32(&asyncQueueControl->signaledTargeted, 0); + pg_atomic_write_u32(&asyncQueueControl->advancingBehind, 0); + pg_atomic_write_u32(&asyncQueueControl->advancingAhead, 0); + pg_atomic_write_u32(&asyncQueueControl->idleBehind, 0); + pg_atomic_write_u32(&asyncQueueControl->avoidedWakeups, 0); + pg_atomic_write_u32(&asyncQueueControl->alreadyAhead, 0); + pg_atomic_write_u32(&asyncQueueControl->necessaryWakeups, 0); + pg_atomic_write_u32(&asyncQueueControl->unnecessaryWakeups, 0); + + PG_RETURN_VOID(); +} + /* * Async_UnlistenOnExit * @@ -2016,6 +2114,7 @@ SignalBackends(void) Assert(pid != InvalidPid); + pg_atomic_fetch_add_u32(&asyncQueueControl->signaledTargeted, 1); QUEUE_BACKEND_WAKEUP_PENDING(i) = true; pids[count] = pid; procnos[count] = i; @@ -2037,6 +2136,7 @@ SignalBackends(void) { QueuePosition pos; int32 pid; + bool need_signal = false; if (QUEUE_BACKEND_WAKEUP_PENDING(i)) continue; @@ -2044,21 +2144,39 @@ SignalBackends(void) pos = QUEUE_BACKEND_POS(i); pid = QUEUE_BACKEND_PID(i); - /* - * We need to signal advancing listening backends that would get - * stuck at a position before the new queue head. We also need to - * signal listening backends that are idle at a position before - * the old queue head since they could be interested in the - * messages in-between. - * - * Listening backends that are not advancing and are stationary at - * a position somewhere in the range we just wrote, can safely be - * direct advanced to the new queue head, since we know that they - * are not interested in our messages. - */ - if (QUEUE_BACKEND_IS_ADVANCING(i) ? - QUEUE_POS_PRECEDES(QUEUE_BACKEND_ADVANCING_POS(i), queueHeadAfterWrite) : - QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite)) + if (QUEUE_BACKEND_IS_ADVANCING(i)) + { + if (QUEUE_POS_PRECEDES(QUEUE_BACKEND_ADVANCING_POS(i), queueHeadAfterWrite)) + { + need_signal = true; + pg_atomic_fetch_add_u32(&asyncQueueControl->advancingBehind, 1); + } + else + { + pg_atomic_fetch_add_u32(&asyncQueueControl->advancingAhead, 1); + } + } + else + { + if (QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite)) + { + need_signal = true; + pg_atomic_fetch_add_u32(&asyncQueueControl->idleBehind, 1); + } + else if (QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite)) + { + QUEUE_BACKEND_POS(i) = queueHeadAfterWrite; + pg_atomic_fetch_add_u32(&asyncQueueControl->avoidedWakeups, 1); + } + else + { + Assert(QUEUE_POS_EQUAL(pos, queueHeadAfterWrite) || + QUEUE_POS_PRECEDES(queueHeadAfterWrite, pos)); + pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAhead, 1); + } + } + + if (need_signal) { Assert(pid != InvalidPid); @@ -2067,13 +2185,7 @@ SignalBackends(void) procnos[count] = i; count++; } - else if (!QUEUE_BACKEND_IS_ADVANCING(i) && - QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite)) - { - Assert(!QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite)); - QUEUE_BACKEND_POS(i) = queueHeadAfterWrite; - } } } LWLockRelease(NotifyQueueLock); @@ -2302,6 +2414,7 @@ asyncQueueReadAllNotifications(void) volatile QueuePosition pos; QueuePosition head; Snapshot snapshot; + bool interested = false; /* page_buffer must be adequately aligned, so use a union */ union @@ -2438,7 +2551,8 @@ asyncQueueReadAllNotifications(void) */ reachedStop = asyncQueueProcessPageEntries(&pos, head, page_buffer.buf, - snapshot); + snapshot, + &interested); } while (!reachedStop); } PG_FINALLY(); @@ -2452,6 +2566,11 @@ asyncQueueReadAllNotifications(void) } PG_END_TRY(); + if (interested) + pg_atomic_fetch_add_u32(&asyncQueueControl->necessaryWakeups, 1); + else + pg_atomic_fetch_add_u32(&asyncQueueControl->unnecessaryWakeups, 1); + /* Done with snapshot */ UnregisterSnapshot(snapshot); } @@ -2476,7 +2595,8 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, - Snapshot snapshot) + Snapshot snapshot, + bool *interested) { bool reachedStop = false; bool reachedEndOfPage; @@ -2537,6 +2657,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, char *payload = qe->data + strlen(channel) + 1; NotifyMyFrontEnd(channel, payload, qe->srcPid); + + /* Mark were interested in at least one message */ + *interested = true; } } else diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9121a382f76..b259bccfa4b 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -8571,7 +8571,18 @@ proname => 'pg_notification_queue_usage', provolatile => 'v', proparallel => 'r', prorettype => 'float8', proargtypes => '', prosrc => 'pg_notification_queue_usage' }, - +{ oid => '9315', + descr => 'get statistics about NOTIFY wakeups', + proname => 'pg_get_async_wakeup_stats', provolatile => 'v', + proparallel => 'r', prorettype => 'record', proargtypes => '', + proallargtypes => '{int8,int8,int8,int8,int8,int8,int8,int8}', proargmodes => '{o,o,o,o,o,o,o,o}', + proargnames => '{signaled_targeted,advancing_behind,advancing_ahead,idle_behind,avoided_wakeups,already_ahead,necessary_wakeups,unnecessary_wakeups}', + prosrc => 'pg_get_async_wakeup_stats' }, +{ oid => '9316', + descr => 'reset statistics about NOTIFY wakeups', + proname => 'pg_reset_async_wakeup_stats', provolatile => 'v', + proparallel => 'r', prorettype => 'void', proargtypes => '', + prosrc => 'pg_reset_async_wakeup_stats' }, # shared memory usage { oid => '5052', descr => 'allocations from the main shared memory segment', proname => 'pg_get_shmem_allocations', prorows => '50', proretset => 't', -- 2.50.1
0002-optimize_listen_notify-v24.patch
Description: Binary data
0001-optimize_listen_notify-v24.patch
Description: Binary data
pg_async_notify_test-v24.c
Description: Binary data
