On Thu, Nov 6, 2025, at 00:21, Chao Li wrote: > That’s what we don’t know. We now lack a performance test for > evaluating how “direct advancement” efficiently helps if it only > handles sleeping listeners. So what I was suggesting is that we should > first create some tests, maybe also add a few more statistics, so that > we can evaluate different solutions. If a simple implementation that > only handles sleeping listeners would have performed good enough, of > course we can take it; otherwise we may need to either pursue a better > solution.
Just for the sake of evaluating this patch, I've added instrumentation of async.c that increments counters for the different branches in asyncQueueReadAllNotifications and SignalBackends. (I'm just using atomics without any locking, but should be fine since this is just statistics.) pg_get_async_wakeup_stats-patch.txt adds the SQL-callable catalog functions pg_reset_async_wakeup_stats() and pg_get_async_wakeup_stats(), which should not be included in the patch, they are just for evaluating. It can be applied on top of the v23 patch. Below is just an example of how to compile and an arbitrary mix of command line options. I've tired a lot of combinations, and we seem to be holding up fine in all cases I've tried. async-notify-test-5.c will detect if the pg_*_async_wakeup_stats() functions exists, and only show the extra histograms if so. % gcc -Wall -Wextra -O2 -pthread -I/Users/joel/pg19/include/postgresql/server -I/Users/joel/pg19/include -o async-notify-test-5 async-notify-test-5.c -L/Users/joel/pg19/lib -lpq -pthread -lm % ./async-notify-test-5 --listeners 10 --notifiers 10 --channels 10 --sleep 0.1 --sleep-exp 2.0 --batch 10 10 s: 38100 sent (3690/s), 381000 received (36900/s) Notification Latency Distribution: 0.00-0.01ms 0 (0.0%) avg: 0.000ms 0.01-0.10ms # 23 (0.0%) avg: 0.092ms 0.10-1.00ms ####### 298002 (78.2%) avg: 0.563ms 1.00-10.00ms ## 82975 (21.8%) avg: 1.506ms 10.00-100.00ms 0 (0.0%) avg: 0.000ms >100.00ms 0 (0.0%) avg: 0.000ms asyncQueueReadAllNotifications Statistics: necessary_wakeups ######## 35469 (88.2%) unnecessary_wakeups # 4762 (11.8%) SignalBackends Statistics: signaled_needed # 34983 (9.5%) avoided_wakeups ######## 325874 (88.9%) already_advancing # 3 (0.0%) signaled_uncertain # 5347 (1.5%) already_ahead # 375 (0.1%) Thoughts on how to interpret results: - Is the notification latency distribution good enough, for the given workload? Naturally, if the workload is too high, we cannot expect to ever achieve sub millisecond latency anyway, so it's a judgement. - Even if the "unnecessary_wakeups" is high relative to "necessary_wakeups", it's not necessarily a problem, if the latency distribution still is good enough. We should also think about the ratio between "unnecessary_wakeups" and "avoided_wakeups", since even if "unnecessary_wakeups" is high in absolute numbers, if the "avoided_wakeups" is magnitudes larger, that means the cost of the context switching has been dramatically reduced already. I think there is always a risk when optimizing to forget what problem one was trying to solve initially, usually a bottleneck. When the bottleneck is gone and is somewhere else instead, then the efforts should IMO usually be spent elsewhere, especially if more optimizations would need a insignificant increase of code complexity. - It's the "signaled_uncertain" that primarily contribute to "unnecessary_wakeups". /Joel
async-notify-test-5.c
Description: Binary data
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 8dac12f8124..7e8e0b14f42 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -137,6 +137,7 @@
#include <signal.h>
#include <string.h>
+#include "access/htup_details.h"
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
@@ -332,6 +333,13 @@ typedef struct AsyncQueueControl
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
dsa_handle channelHashDSA;
dshash_table_handle channelHashDSH;
+ pg_atomic_uint32 signaledNeeded; /* listening to some of the
channels; signal needed */
+ pg_atomic_uint32 avoidedWakeups; /* directly advanced */
+ pg_atomic_uint32 alreadyAdvancing; /* already advancing its
position */
+ pg_atomic_uint32 signaledUncertain; /* signaled due to uncertain
need */
+ pg_atomic_uint32 alreadyAhead; /* already ahead, no action needed */
+ pg_atomic_uint32 necessaryWakeups; /* wakeups where at
least one message was interesting */
+ pg_atomic_uint32 unnecessaryWakeups; /* wakeups where no messages
were interesting */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
} AsyncQueueControl;
@@ -517,7 +525,8 @@ static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
char *page_buffer,
-
Snapshot snapshot);
+
Snapshot snapshot,
+
bool *interested);
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(bool flush);
static bool AsyncExistsPendingNotify(Notification *n);
@@ -683,6 +692,13 @@ AsyncShmemInit(void)
asyncQueueControl->lastQueueFillWarn = 0;
asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID;
asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID;
+ pg_atomic_init_u32(&asyncQueueControl->signaledNeeded, 0);
+ pg_atomic_init_u32(&asyncQueueControl->avoidedWakeups, 0);
+ pg_atomic_init_u32(&asyncQueueControl->alreadyAdvancing, 0);
+ pg_atomic_init_u32(&asyncQueueControl->signaledUncertain, 0);
+ pg_atomic_init_u32(&asyncQueueControl->alreadyAhead, 0);
+ pg_atomic_init_u32(&asyncQueueControl->necessaryWakeups, 0);
+ pg_atomic_init_u32(&asyncQueueControl->unnecessaryWakeups, 0);
for (int i = 0; i < MaxBackends; i++)
{
@@ -997,6 +1013,81 @@ pg_listening_channels(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
+/*
+ * SQL function: return statistics about NOTIFY wakeups
+ *
+ * This function returns a single row with:
+ * - necessary_wakeups: wakeups where at least one message was interesting
+ * - unnecessary_wakeups: wakeups where no messages were interesting
+ * - direct_advancements_success: directly advanced
+ * - already_advancing: already advancing its position
+ * - signaled_uncertain: signaled due to uncertain need
+ * - already_ahead: already ahead, no action needed
+ */
+Datum
+pg_get_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+ TupleDesc tupdesc;
+ Datum values[7];
+ bool nulls[7];
+ HeapTuple tuple;
+ uint32 signaled_needed;
+ uint32 direct_advancements_success;
+ uint32 already_advancing;
+ uint32 signaled_uncertain;
+ uint32 already_ahead;
+ uint32 necessary_wakeups;
+ uint32 unnecessary_wakeups;
+
+ /* Build a tuple descriptor for our result type */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("function returning record called in
context that cannot accept type record")));
+
+ /* Read the atomic counters */
+ signaled_needed =
pg_atomic_read_u32(&asyncQueueControl->signaledNeeded);
+ direct_advancements_success =
pg_atomic_read_u32(&asyncQueueControl->avoidedWakeups);
+ already_advancing =
pg_atomic_read_u32(&asyncQueueControl->alreadyAdvancing);
+ signaled_uncertain =
pg_atomic_read_u32(&asyncQueueControl->signaledUncertain);
+ already_ahead = pg_atomic_read_u32(&asyncQueueControl->alreadyAhead);
+ necessary_wakeups =
pg_atomic_read_u32(&asyncQueueControl->necessaryWakeups);
+ unnecessary_wakeups =
pg_atomic_read_u32(&asyncQueueControl->unnecessaryWakeups);
+
+ /* Fill in the values */
+ memset(nulls, 0, sizeof(nulls));
+ values[0] = Int64GetDatum((int64) signaled_needed);
+ values[1] = Int64GetDatum((int64) direct_advancements_success);
+ values[2] = Int64GetDatum((int64) already_advancing);
+ values[3] = Int64GetDatum((int64) signaled_uncertain);
+ values[4] = Int64GetDatum((int64) already_ahead);
+ values[5] = Int64GetDatum((int64) necessary_wakeups);
+ values[6] = Int64GetDatum((int64) unnecessary_wakeups);
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+ PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
+}
+
+/*
+ * SQL function: reset NOTIFY wakeup statistics
+ *
+ * This function resets all the async wakeup counters to zero.
+ */
+Datum
+pg_reset_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+ /* Reset all the atomic counters to zero */
+ pg_atomic_write_u32(&asyncQueueControl->signaledNeeded, 0);
+ pg_atomic_write_u32(&asyncQueueControl->avoidedWakeups, 0);
+ pg_atomic_write_u32(&asyncQueueControl->alreadyAdvancing, 0);
+ pg_atomic_write_u32(&asyncQueueControl->signaledUncertain, 0);
+ pg_atomic_write_u32(&asyncQueueControl->alreadyAhead, 0);
+ pg_atomic_write_u32(&asyncQueueControl->necessaryWakeups, 0);
+ pg_atomic_write_u32(&asyncQueueControl->unnecessaryWakeups, 0);
+
+ PG_RETURN_VOID();
+}
+
/*
* Async_UnlistenOnExit
*
@@ -2014,6 +2105,7 @@ SignalBackends(void)
Assert(pid != InvalidPid);
+
pg_atomic_fetch_add_u32(&asyncQueueControl->signaledNeeded, 1);
QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
pids[count] = pid;
procnos[count] = i;
@@ -2049,7 +2141,14 @@ SignalBackends(void)
* currently advancing its position.
*/
if (!QUEUE_BACKEND_ADVANCING_POS(i))
+ {
QUEUE_BACKEND_POS(i) =
queueHeadAfterWrite;
+
pg_atomic_fetch_add_u32(&asyncQueueControl->avoidedWakeups, 1);
+ }
+ else
+ {
+
pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAdvancing, 1);
+ }
}
else if (QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite))
{
@@ -2060,6 +2159,7 @@ SignalBackends(void)
*/
Assert(pid != InvalidPid);
+
pg_atomic_fetch_add_u32(&asyncQueueControl->signaledUncertain, 1);
QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
pids[count] = pid;
procnos[count] = i;
@@ -2071,6 +2171,7 @@ SignalBackends(void)
* The backend is already ahead of the
notifications we wrote.
* No need to do anything.
*/
+
pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAhead, 1);
Assert(QUEUE_POS_PRECEDES(queueHeadBeforeWrite,
pos));
}
}
@@ -2301,6 +2402,7 @@ asyncQueueReadAllNotifications(void)
volatile QueuePosition pos;
QueuePosition head;
Snapshot snapshot;
+ bool interested = false;
/* page_buffer must be adequately aligned, so use a union */
union
@@ -2435,7 +2537,8 @@ asyncQueueReadAllNotifications(void)
*/
reachedStop = asyncQueueProcessPageEntries(&pos, head,
page_buffer.buf,
-
snapshot);
+
snapshot,
+
&interested);
} while (!reachedStop);
}
PG_FINALLY();
@@ -2450,6 +2553,11 @@ asyncQueueReadAllNotifications(void)
}
PG_END_TRY();
+ if (interested)
+ pg_atomic_fetch_add_u32(&asyncQueueControl->necessaryWakeups,
1);
+ else
+ pg_atomic_fetch_add_u32(&asyncQueueControl->unnecessaryWakeups,
1);
+
/* Done with snapshot */
UnregisterSnapshot(snapshot);
}
@@ -2474,7 +2582,8 @@ static bool
asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
char *page_buffer,
- Snapshot snapshot)
+ Snapshot snapshot,
+ bool *interested)
{
bool reachedStop = false;
bool reachedEndOfPage;
@@ -2535,6 +2644,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition
*current,
char *payload = qe->data +
strlen(channel) + 1;
NotifyMyFrontEnd(channel, payload,
qe->srcPid);
+
+ /* Mark were interested in at least one
message */
+ *interested = true;
}
}
else
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9121a382f76..0bbd7db39c7 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -8571,7 +8571,18 @@
proname => 'pg_notification_queue_usage', provolatile => 'v',
proparallel => 'r', prorettype => 'float8', proargtypes => '',
prosrc => 'pg_notification_queue_usage' },
-
+{ oid => '9315',
+ descr => 'get statistics about NOTIFY wakeups',
+ proname => 'pg_get_async_wakeup_stats', provolatile => 'v',
+ proparallel => 'r', prorettype => 'record', proargtypes => '',
+ proallargtypes => '{int8,int8,int8,int8,int8,int8,int8}', proargmodes =>
'{o,o,o,o,o,o,o}',
+ proargnames =>
'{signaled_needed,avoided_wakeups,already_advancing,signaled_uncertain,already_ahead,necessary_wakeups,unnecessary_wakeups}',
+ prosrc => 'pg_get_async_wakeup_stats' },
+{ oid => '9316',
+ descr => 'reset statistics about NOTIFY wakeups',
+ proname => 'pg_reset_async_wakeup_stats', provolatile => 'v',
+ proparallel => 'r', prorettype => 'void', proargtypes => '',
+ prosrc => 'pg_reset_async_wakeup_stats' },
# shared memory usage
{ oid => '5052', descr => 'allocations from the main shared memory segment',
proname => 'pg_get_shmem_allocations', prorows => '50', proretset => 't',
