On Fri, Oct 10, 2025, at 20:46, Joel Jacobson wrote: > On Wed, Oct 8, 2025, at 20:46, Tom Lane wrote: >> "Joel Jacobson" <j...@compiler.org> writes: >>> On Tue, Oct 7, 2025, at 22:15, Tom Lane wrote: >>>> 5. ChannelHashAddListener: "already registered" case is not reached, >>>> which surprises me a bit, and neither is the "grow the array" stanza. >> >>> I've added a test for the "grow the array" stanza. >> >>> The "already registered" case seems impossible to reach, since the >>> caller, Exec_ListenCommit, returns early if IsListeningOn. >> >> Maybe we should remove the check for "already registered" then, >> or reduce it to an Assert? Seems pointless to check twice. >> >> Or thinking a little bigger: why are we maintaining the set of >> channels-listened-to both as a list and a hash? Could we remove >> the list form? > > Yes, it was indeed possible to remove the list form. > > Some functions got a bit more complex, but I think it's worth it since a > single source of truth seems like an important design goal. > > This also made LISTEN faster when a backend is listening on plenty of > channels, since we can now lookup the channel in the hash, instead of > having to go through the list as before. The additional linear scan of > the listenersArray didn't add any noticeable extra cost even with > thousands of listening backends for the channel. > > I also tried to keep listenersArray sorted and binary-search it, but > even with thousands of listening backends, I couldn't measure any > overall latency difference of LISTEN, so I kept the linear scan to keep > it simple. > > In Exec_ListenCommit, I've now inlined code that is similar to > IsListeningOn. I didn't want to use IsListeningOn since it felt wasteful > having to do dshash_find, when we instead can just use > dshash_find_or_insert, to handle both cases. > > I also added a static int numChannelsListeningOn variable, to avoid the > possibly expensive operation of going through the entire hash, to be > able to check `numChannelsListeningOn == 0` instead of the now removed > `listenChannels == NIL`. It's of course critical to keep > numChannelsListeningOn in sync, but I think it should be safe? Would of > course be better to avoid this variable. Maybe the extra cycles that > would cost would be worth it?
In addition to previously suggested optimization, there is another major one that seems doable, that would mean a great improvement for workload having large traffic differences between channels, i.e. some low traffic and some high traffic. I'm not entirely sure this approach is correct though, I've might misunderstood the guarantees of the heavyweight lock. My assumption is based on that there can only be one backend that is currently running the code in PreCommit_Notify after having aquired the heavyweight lock. If this is not true, then it doesn't work. What made me worried is the exclusive lock we also take inside the same function, I don't see the point of it since we're already holding the heavyweight lock, but maybe this is just to "allows deadlocks to be detected" like the comment says? --- Patches: * 0001-optimize_listen_notify-v14.patch: Just adds additional test coverage of async.c * 0002-optimize_listen_notify-v14.patch: Adds the shared channel hash. Unchanged since 0002-optimize_listen_notify-v13.patch. * 0003-optimize_listen_notify-v14.patch: Optimize LISTEN/NOTIFY by advancing idle backends directly Building on the previous channel-specific listener tracking optimization, this patch further reduces context switching by detecting idle listening backends that don't listen to any of the channels being notified and advancing their queue positions directly without waking them up. When a backend commits notifications, it now saves both the queue head position before and after writing. In SignalBackends(), backends that are at the old queue head and weren't marked for wakeup (meaning they don't listen to any of the notified channels) are advanced directly to the new queue head. This eliminates unnecessary wakeups for these backends, which would otherwise wake up, scan through all the notifications, skip each one, and advance to the same position anyway. The implementation carefully handles the race condition where other backends may write notifications after the heavyweight lock is released but before SignalBackends() is called. By saving queueHeadAfterWrite immediately after writing (before releasing the lock), we ensure backends are only advanced over the exact notifications we wrote, not notifications from other concurrent backends. --- Benchmark: % ./pgbench_patched --listen-notify-benchmark --notify-round-trips=10000 --notify-idle-step=10 pgbench_patched: starting LISTEN/NOTIFY round-trip benchmark pgbench_patched: round-trips per iteration: 10000 pgbench_patched: idle listeners added per iteration: 10 master: idle_listeners round_trips_per_sec max_latency_usec 0 33592.9 2278 10 14251.1 1041 20 9258.7 1367 30 6144.2 2277 40 4653.1 1690 50 3780.7 2869 60 3234.9 3215 70 2818.9 3652 80 2458.7 3219 90 2203.1 3505 100 1951.9 1739 0002-optimize_listen_notify-v14.patch: idle_listeners round_trips_per_sec max_latency_usec 0 33936.2 889 10 30631.9 1233 20 22404.7 7862 30 19446.2 9539 40 16013.3 13963 50 14310.1 16983 60 12827.0 21363 70 11271.9 24775 80 10764.4 28703 90 9568.1 31693 100 9241.3 32724 0003-optimize_listen_notify-v14.patch: idle_listeners round_trips_per_sec max_latency_usec 0 33236.8 1090 10 34681.0 1338 20 34530.4 1372 30 34061.6 1339 40 33084.5 913 50 33847.5 955 60 33675.8 1239 70 28857.4 20443 80 33324.9 786 90 33612.3 758 100 31259.2 7706 As we can see, with 0002, the ping-pong round-trips per second degrades much slower than master, but the wakeup of idle listening backends still needs to happen at some point, much fewer wakeups, and staggered over time, but still makes it go down from 33k to 9k due to 100 idle listening backends. With 0003, the round-trips per second is sustained, unaffected by additional idle listening backends. I've also attached the pgbench patch as a .txt in pgbench-listen-notify-benchmark-patch.txt, since it's not part of this patch, it's just provided to help others verify the results. /Joel
0001-optimize_listen_notify-v14.patch
Description: Binary data
0002-optimize_listen_notify-v14.patch
Description: Binary data
0003-optimize_listen_notify-v14.patch
Description: Binary data
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 1515ed405ba..b462dcc8348 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -35,6 +35,7 @@ #include <ctype.h> #include <float.h> +#include <inttypes.h> #include <limits.h> #include <math.h> #include <signal.h> @@ -237,6 +238,11 @@ static const char *const PARTITION_METHOD[] = {"none", "range", "hash"}; /* random seed used to initialize base_random_sequence */ static int64 random_seed = -1; +/* LISTEN/NOTIFY benchmark mode parameters */ +static bool listen_notify_mode = false; /* enable LISTEN/NOTIFY benchmark */ +static int notify_round_trips = 100; /* number of round-trips per iteration */ +static int notify_idle_step = 10; /* idle listeners to add per iteration */ + /* * end of configurable parameters *********************************************************************/ @@ -930,6 +936,10 @@ usage(void) " (same as \"-b simple-update\")\n" " -S, --select-only perform SELECT-only transactions\n" " (same as \"-b select-only\")\n" + " --listen-notify-benchmark\n" + " run LISTEN/NOTIFY round-trip benchmark\n" + " --notify-round-trips=NUM number of round-trips per iteration (default: 100)\n" + " --notify-idle-step=NUM idle listeners to add per iteration (default: 10)\n" "\nBenchmarking options:\n" " -c, --client=NUM number of concurrent database clients (default: 1)\n" " -C, --connect establish new connection for each transaction\n" @@ -6689,6 +6699,216 @@ set_random_seed(const char *seed) return true; } +/* + * Run LISTEN/NOTIFY round-trip benchmark + * + * This benchmark measures the round-trip time between two processes that + * ping-pong NOTIFY messages while adding idle listening connections. + */ +static void +runListenNotifyBenchmark(void) +{ + PGconn *conn1 = NULL; + PGconn *conn2 = NULL; + PGconn **idle_conns = NULL; + int num_idle = 0; + int max_idle = 10000; /* reasonable upper limit */ + PGresult *res; + char channel1[] = "pgbench_channel_1"; + char channel2[] = "pgbench_channel_2"; + char notify_cmd[256]; + bool first_failure = false; + + pg_log_info("starting LISTEN/NOTIFY round-trip benchmark"); + pg_log_info("round-trips per iteration: %d", notify_round_trips); + pg_log_info("idle listeners added per iteration: %d", notify_idle_step); + printf("\n%14s %19s %19s\n", "idle_listeners", "round_trips_per_sec", "max_latency_usec"); + + /* Allocate array for idle connections */ + idle_conns = (PGconn **) pg_malloc0(max_idle * sizeof(PGconn *)); + + /* Create two active connections for ping-pong */ + conn1 = doConnect(); + if (conn1 == NULL) + pg_fatal("failed to create connection 1"); + + conn2 = doConnect(); + if (conn2 == NULL) + pg_fatal("failed to create connection 2"); + + /* Set up LISTEN on both connections */ + snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", channel1); + res = PQexec(conn1, notify_cmd); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("LISTEN failed on connection 1: %s", PQerrorMessage(conn1)); + PQclear(res); + + snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", channel2); + res = PQexec(conn2, notify_cmd); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("LISTEN failed on connection 2: %s", PQerrorMessage(conn2)); + PQclear(res); + + /* Main benchmark loop: measure round-trips then add idle connections */ + while (num_idle < max_idle) + { + int i; + int64 total_latency = 0; + int64 max_latency = 0; + + /* Perform round-trip measurements */ + for (i = 0; i < notify_round_trips; i++) + { + pg_time_usec_t start_time, + end_time; + int64 latency; + PGnotify *notify; + int sock; + fd_set input_mask; + struct timeval tv; + + /* Clear any pending notifications */ + PQconsumeInput(conn1); + while ((notify = PQnotifies(conn1)) != NULL) + PQfreemem(notify); + PQconsumeInput(conn2); + while ((notify = PQnotifies(conn2)) != NULL) + PQfreemem(notify); + + /* Start timer and send notification from conn1 */ + start_time = pg_time_now(); + snprintf(notify_cmd, sizeof(notify_cmd), "NOTIFY %s", channel2); + res = PQexec(conn1, notify_cmd); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("NOTIFY failed: %s", PQerrorMessage(conn1)); + PQclear(res); + + /* Wait for notification on conn2 */ + sock = PQsocket(conn2); + notify = NULL; + while (notify == NULL) + { + PQconsumeInput(conn2); + notify = PQnotifies(conn2); + if (notify == NULL) + { + /* Wait for data on socket */ + FD_ZERO(&input_mask); + FD_SET(sock, &input_mask); + tv.tv_sec = 10; /* 10 second timeout */ + tv.tv_usec = 0; + if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0) + pg_fatal("select() failed: %m"); + } + } + PQfreemem(notify); + + /* Send notification back from conn2 */ + snprintf(notify_cmd, sizeof(notify_cmd), "NOTIFY %s", channel1); + res = PQexec(conn2, notify_cmd); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("NOTIFY failed: %s", PQerrorMessage(conn2)); + PQclear(res); + + /* Wait for notification on conn1 */ + sock = PQsocket(conn1); + notify = NULL; + while (notify == NULL) + { + PQconsumeInput(conn1); + notify = PQnotifies(conn1); + if (notify == NULL) + { + /* Wait for data on socket */ + FD_ZERO(&input_mask); + FD_SET(sock, &input_mask); + tv.tv_sec = 10; /* 10 second timeout */ + tv.tv_usec = 0; + if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0) + pg_fatal("select() failed: %m"); + } + } + PQfreemem(notify); + + /* End timer */ + end_time = pg_time_now(); + + /* Calculate individual round-trip latency */ + latency = end_time - start_time; + + /* Accumulate total latency and track maximum */ + total_latency += latency; + if (latency > max_latency) + max_latency = latency; + } + + /* Calculate and report round-trips per second and max latency */ + fprintf(stdout, "%14d %19.1f %19" PRId64 "\n", + num_idle, + 1000000.0 * notify_round_trips / total_latency, + max_latency); + fflush(stdout); + + /* Stop if we hit connection limit */ + if (first_failure) + break; + + /* Add idle listening connections */ + for (i = 0; i < notify_idle_step && num_idle < max_idle; i++) + { + PGconn *idle_conn; + char idle_channel[256]; + + idle_conn = doConnect(); + if (idle_conn == NULL) + { + if (!first_failure) + { + pg_log_info("reached max_connections at %d idle listeners", num_idle); + first_failure = true; + } + break; + } + + /* Each idle connection listens on a unique channel */ + snprintf(idle_channel, sizeof(idle_channel), "idle_%d", num_idle); + snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", idle_channel); + + res = PQexec(idle_conn, notify_cmd); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_warning("LISTEN failed on idle connection %d: %s", + num_idle, PQerrorMessage(idle_conn)); + PQfinish(idle_conn); + PQclear(res); + first_failure = true; + break; + } + PQclear(res); + + idle_conns[num_idle] = idle_conn; + num_idle++; + } + + /* Stop if we couldn't add any connections */ + if (first_failure && i == 0) + break; + } + + /* Clean up */ + pg_log_info("cleaning up connections"); + PQfinish(conn1); + PQfinish(conn2); + for (int i = 0; i < num_idle; i++) + { + if (idle_conns[i]) + PQfinish(idle_conns[i]); + } + pg_free(idle_conns); + + pg_log_info("LISTEN/NOTIFY benchmark completed"); +} + int main(int argc, char **argv) { @@ -6739,6 +6959,9 @@ main(int argc, char **argv) {"verbose-errors", no_argument, NULL, 15}, {"exit-on-abort", no_argument, NULL, 16}, {"debug", no_argument, NULL, 17}, + {"listen-notify-benchmark", no_argument, NULL, 18}, + {"notify-round-trips", required_argument, NULL, 19}, + {"notify-idle-step", required_argument, NULL, 20}, {NULL, 0, NULL, 0} }; @@ -7092,6 +7315,22 @@ main(int argc, char **argv) case 17: /* debug */ pg_logging_increase_verbosity(); break; + case 18: /* listen-notify-benchmark */ + listen_notify_mode = true; + benchmarking_option_set = true; + break; + case 19: /* notify-round-trips */ + benchmarking_option_set = true; + if (!option_parse_int(optarg, "--notify-round-trips", 1, INT_MAX, + ¬ify_round_trips)) + exit(1); + break; + case 20: /* notify-idle-step */ + benchmarking_option_set = true; + if (!option_parse_int(optarg, "--notify-idle-step", 1, INT_MAX, + ¬ify_idle_step)) + exit(1); + break; default: /* getopt_long already emitted a complaint */ pg_log_error_hint("Try \"%s --help\" for more information.", progname); @@ -7210,6 +7449,20 @@ main(int argc, char **argv) pg_fatal("some of the specified options cannot be used in benchmarking mode"); } + /* Handle LISTEN/NOTIFY benchmark mode */ + if (listen_notify_mode) + { + /* Establish a database connection for setup */ + if ((con = doConnect()) == NULL) + pg_fatal("could not connect to database"); + + /* Run the LISTEN/NOTIFY benchmark */ + runListenNotifyBenchmark(); + + PQfinish(con); + exit(0); + } + if (nxacts > 0 && duration > 0) pg_fatal("specify either a number of transactions (-t) or a duration (-T), not both");