On Fri, Oct 10, 2025, at 20:46, Joel Jacobson wrote:
> On Wed, Oct 8, 2025, at 20:46, Tom Lane wrote:
>> "Joel Jacobson" <j...@compiler.org> writes:
>>> On Tue, Oct 7, 2025, at 22:15, Tom Lane wrote:
>>>> 5. ChannelHashAddListener: "already registered" case is not reached,
>>>> which surprises me a bit, and neither is the "grow the array" stanza.
>>
>>> I've added a test for the "grow the array" stanza.
>>
>>> The "already registered" case seems impossible to reach, since the
>>> caller, Exec_ListenCommit, returns early if IsListeningOn.
>>
>> Maybe we should remove the check for "already registered" then,
>> or reduce it to an Assert?  Seems pointless to check twice.
>>
>> Or thinking a little bigger: why are we maintaining the set of
>> channels-listened-to both as a list and a hash?  Could we remove
>> the list form?
>
> Yes, it was indeed possible to remove the list form.
>
> Some functions got a bit more complex, but I think it's worth it since a
> single source of truth seems like an important design goal.
>
> This also made LISTEN faster when a backend is listening on plenty of
> channels, since we can now lookup the channel in the hash, instead of
> having to go through the list as before. The additional linear scan of
> the listenersArray didn't add any noticeable extra cost even with
> thousands of listening backends for the channel.
>
> I also tried to keep listenersArray sorted and binary-search it, but
> even with thousands of listening backends, I couldn't measure any
> overall latency difference of LISTEN, so I kept the linear scan to keep
> it simple.
>
> In Exec_ListenCommit, I've now inlined code that is similar to
> IsListeningOn. I didn't want to use IsListeningOn since it felt wasteful
> having to do dshash_find, when we instead can just use
> dshash_find_or_insert, to handle both cases.
>
> I also added a static int numChannelsListeningOn variable, to avoid the
> possibly expensive operation of going through the entire hash, to be
> able to check `numChannelsListeningOn == 0` instead of the now removed
> `listenChannels == NIL`. It's of course critical to keep
> numChannelsListeningOn in sync, but I think it should be safe? Would of
> course be better to avoid this variable. Maybe the extra cycles that
> would cost would be worth it?

In addition to previously suggested optimization, there is another major
one that seems doable, that would mean a great improvement for workload
having large traffic differences between channels, i.e. some low traffic
and some high traffic.

I'm not entirely sure this approach is correct though, I've might
misunderstood the guarantees of the heavyweight lock. My assumption is
based on that there can only be one backend that is currently running
the code in PreCommit_Notify after having aquired the heavyweight lock.
If this is not true, then it doesn't work. What made me worried is the
exclusive lock we also take inside the same function, I don't see the
point of it since we're already holding the heavyweight lock, but maybe
this is just to "allows deadlocks to be detected" like the comment says?

---

Patches:

* 0001-optimize_listen_notify-v14.patch:
Just adds additional test coverage of async.c

* 0002-optimize_listen_notify-v14.patch:
Adds the shared channel hash.
Unchanged since 0002-optimize_listen_notify-v13.patch.

* 0003-optimize_listen_notify-v14.patch:

Optimize LISTEN/NOTIFY by advancing idle backends directly

Building on the previous channel-specific listener tracking
optimization, this patch further reduces context switching by detecting
idle listening backends that don't listen to any of the channels being
notified and advancing their queue positions directly without waking
them up.

When a backend commits notifications, it now saves both the queue head
position before and after writing. In SignalBackends(), backends that
are at the old queue head and weren't marked for wakeup (meaning they
don't listen to any of the notified channels) are advanced directly to
the new queue head. This eliminates unnecessary wakeups for these
backends, which would otherwise wake up, scan through all the
notifications, skip each one, and advance to the same position anyway.

The implementation carefully handles the race condition where other
backends may write notifications after the heavyweight lock is released
but before SignalBackends() is called. By saving queueHeadAfterWrite
immediately after writing (before releasing the lock), we ensure
backends are only advanced over the exact notifications we wrote, not
notifications from other concurrent backends.

---

Benchmark:

% ./pgbench_patched --listen-notify-benchmark --notify-round-trips=10000 
--notify-idle-step=10
pgbench_patched: starting LISTEN/NOTIFY round-trip benchmark
pgbench_patched: round-trips per iteration: 10000
pgbench_patched: idle listeners added per iteration: 10

master:

idle_listeners  round_trips_per_sec     max_latency_usec
             0              33592.9                 2278
            10              14251.1                 1041
            20               9258.7                 1367
            30               6144.2                 2277
            40               4653.1                 1690
            50               3780.7                 2869
            60               3234.9                 3215
            70               2818.9                 3652
            80               2458.7                 3219
            90               2203.1                 3505
           100               1951.9                 1739

0002-optimize_listen_notify-v14.patch:

idle_listeners  round_trips_per_sec     max_latency_usec
             0              33936.2                  889
            10              30631.9                 1233
            20              22404.7                 7862
            30              19446.2                 9539
            40              16013.3                13963
            50              14310.1                16983
            60              12827.0                21363
            70              11271.9                24775
            80              10764.4                28703
            90               9568.1                31693
           100               9241.3                32724

0003-optimize_listen_notify-v14.patch:

idle_listeners  round_trips_per_sec     max_latency_usec
             0              33236.8                 1090
            10              34681.0                 1338
            20              34530.4                 1372
            30              34061.6                 1339
            40              33084.5                  913
            50              33847.5                  955
            60              33675.8                 1239
            70              28857.4                20443
            80              33324.9                  786
            90              33612.3                  758
           100              31259.2                 7706

As we can see, with 0002, the ping-pong round-trips per second degrades
much slower than master, but the wakeup of idle listening backends still
needs to happen at some point, much fewer wakeups, and staggered over
time, but still makes it go down from 33k to 9k due to 100 idle
listening backends. With 0003, the round-trips per second is sustained,
unaffected by additional idle listening backends.

I've also attached the pgbench patch as a .txt in
pgbench-listen-notify-benchmark-patch.txt, since it's not part of this
patch, it's just provided to help others verify the results.

/Joel

Attachment: 0001-optimize_listen_notify-v14.patch
Description: Binary data

Attachment: 0002-optimize_listen_notify-v14.patch
Description: Binary data

Attachment: 0003-optimize_listen_notify-v14.patch
Description: Binary data

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1515ed405ba..b462dcc8348 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -35,6 +35,7 @@
 
 #include <ctype.h>
 #include <float.h>
+#include <inttypes.h>
 #include <limits.h>
 #include <math.h>
 #include <signal.h>
@@ -237,6 +238,11 @@ static const char *const PARTITION_METHOD[] = {"none", 
"range", "hash"};
 /* random seed used to initialize base_random_sequence */
 static int64 random_seed = -1;
 
+/* LISTEN/NOTIFY benchmark mode parameters */
+static bool listen_notify_mode = false;        /* enable LISTEN/NOTIFY 
benchmark */
+static int     notify_round_trips = 100;       /* number of round-trips per 
iteration */
+static int     notify_idle_step = 10;          /* idle listeners to add per 
iteration */
+
 /*
  * end of configurable parameters
  *********************************************************************/
@@ -930,6 +936,10 @@ usage(void)
                   "                           (same as \"-b simple-update\")\n"
                   "  -S, --select-only        perform SELECT-only 
transactions\n"
                   "                           (same as \"-b select-only\")\n"
+                  "  --listen-notify-benchmark\n"
+                  "                           run LISTEN/NOTIFY round-trip 
benchmark\n"
+                  "  --notify-round-trips=NUM number of round-trips per 
iteration (default: 100)\n"
+                  "  --notify-idle-step=NUM   idle listeners to add per 
iteration (default: 10)\n"
                   "\nBenchmarking options:\n"
                   "  -c, --client=NUM         number of concurrent database 
clients (default: 1)\n"
                   "  -C, --connect            establish new connection for 
each transaction\n"
@@ -6689,6 +6699,216 @@ set_random_seed(const char *seed)
        return true;
 }
 
+/*
+ * Run LISTEN/NOTIFY round-trip benchmark
+ *
+ * This benchmark measures the round-trip time between two processes that
+ * ping-pong NOTIFY messages while adding idle listening connections.
+ */
+static void
+runListenNotifyBenchmark(void)
+{
+       PGconn     *conn1 = NULL;
+       PGconn     *conn2 = NULL;
+       PGconn    **idle_conns = NULL;
+       int                     num_idle = 0;
+       int                     max_idle = 10000;       /* reasonable upper 
limit */
+       PGresult   *res;
+       char            channel1[] = "pgbench_channel_1";
+       char            channel2[] = "pgbench_channel_2";
+       char            notify_cmd[256];
+       bool            first_failure = false;
+
+       pg_log_info("starting LISTEN/NOTIFY round-trip benchmark");
+       pg_log_info("round-trips per iteration: %d", notify_round_trips);
+       pg_log_info("idle listeners added per iteration: %d", notify_idle_step);
+       printf("\n%14s  %19s  %19s\n", "idle_listeners", "round_trips_per_sec", 
"max_latency_usec");
+
+       /* Allocate array for idle connections */
+       idle_conns = (PGconn **) pg_malloc0(max_idle * sizeof(PGconn *));
+
+       /* Create two active connections for ping-pong */
+       conn1 = doConnect();
+       if (conn1 == NULL)
+               pg_fatal("failed to create connection 1");
+
+       conn2 = doConnect();
+       if (conn2 == NULL)
+               pg_fatal("failed to create connection 2");
+
+       /* Set up LISTEN on both connections */
+       snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", channel1);
+       res = PQexec(conn1, notify_cmd);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pg_fatal("LISTEN failed on connection 1: %s", 
PQerrorMessage(conn1));
+       PQclear(res);
+
+       snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", channel2);
+       res = PQexec(conn2, notify_cmd);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pg_fatal("LISTEN failed on connection 2: %s", 
PQerrorMessage(conn2));
+       PQclear(res);
+
+       /* Main benchmark loop: measure round-trips then add idle connections */
+       while (num_idle < max_idle)
+       {
+               int                     i;
+               int64           total_latency = 0;
+               int64           max_latency = 0;
+
+               /* Perform round-trip measurements */
+               for (i = 0; i < notify_round_trips; i++)
+               {
+                       pg_time_usec_t start_time,
+                                               end_time;
+                       int64           latency;
+                       PGnotify   *notify;
+                       int                     sock;
+                       fd_set          input_mask;
+                       struct timeval tv;
+
+                       /* Clear any pending notifications */
+                       PQconsumeInput(conn1);
+                       while ((notify = PQnotifies(conn1)) != NULL)
+                               PQfreemem(notify);
+                       PQconsumeInput(conn2);
+                       while ((notify = PQnotifies(conn2)) != NULL)
+                               PQfreemem(notify);
+
+                       /* Start timer and send notification from conn1 */
+                       start_time = pg_time_now();
+                       snprintf(notify_cmd, sizeof(notify_cmd), "NOTIFY %s", 
channel2);
+                       res = PQexec(conn1, notify_cmd);
+                       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                               pg_fatal("NOTIFY failed: %s", 
PQerrorMessage(conn1));
+                       PQclear(res);
+
+                       /* Wait for notification on conn2 */
+                       sock = PQsocket(conn2);
+                       notify = NULL;
+                       while (notify == NULL)
+                       {
+                               PQconsumeInput(conn2);
+                               notify = PQnotifies(conn2);
+                               if (notify == NULL)
+                               {
+                                       /* Wait for data on socket */
+                                       FD_ZERO(&input_mask);
+                                       FD_SET(sock, &input_mask);
+                                       tv.tv_sec = 10; /* 10 second timeout */
+                                       tv.tv_usec = 0;
+                                       if (select(sock + 1, &input_mask, NULL, 
NULL, &tv) < 0)
+                                               pg_fatal("select() failed: %m");
+                               }
+                       }
+                       PQfreemem(notify);
+
+                       /* Send notification back from conn2 */
+                       snprintf(notify_cmd, sizeof(notify_cmd), "NOTIFY %s", 
channel1);
+                       res = PQexec(conn2, notify_cmd);
+                       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                               pg_fatal("NOTIFY failed: %s", 
PQerrorMessage(conn2));
+                       PQclear(res);
+
+                       /* Wait for notification on conn1 */
+                       sock = PQsocket(conn1);
+                       notify = NULL;
+                       while (notify == NULL)
+                       {
+                               PQconsumeInput(conn1);
+                               notify = PQnotifies(conn1);
+                               if (notify == NULL)
+                               {
+                                       /* Wait for data on socket */
+                                       FD_ZERO(&input_mask);
+                                       FD_SET(sock, &input_mask);
+                                       tv.tv_sec = 10; /* 10 second timeout */
+                                       tv.tv_usec = 0;
+                                       if (select(sock + 1, &input_mask, NULL, 
NULL, &tv) < 0)
+                                               pg_fatal("select() failed: %m");
+                               }
+                       }
+                       PQfreemem(notify);
+
+                       /* End timer */
+                       end_time = pg_time_now();
+
+                       /* Calculate individual round-trip latency */
+                       latency = end_time - start_time;
+
+                       /* Accumulate total latency and track maximum */
+                       total_latency += latency;
+                       if (latency > max_latency)
+                               max_latency = latency;
+               }
+
+               /* Calculate and report round-trips per second and max latency 
*/
+               fprintf(stdout, "%14d  %19.1f  %19" PRId64 "\n",
+                               num_idle,
+                               1000000.0 * notify_round_trips / total_latency,
+                               max_latency);
+               fflush(stdout);
+
+               /* Stop if we hit connection limit */
+               if (first_failure)
+                       break;
+
+               /* Add idle listening connections */
+               for (i = 0; i < notify_idle_step && num_idle < max_idle; i++)
+               {
+                       PGconn     *idle_conn;
+                       char            idle_channel[256];
+
+                       idle_conn = doConnect();
+                       if (idle_conn == NULL)
+                       {
+                               if (!first_failure)
+                               {
+                                       pg_log_info("reached max_connections at 
%d idle listeners", num_idle);
+                                       first_failure = true;
+                               }
+                               break;
+                       }
+
+                       /* Each idle connection listens on a unique channel */
+                       snprintf(idle_channel, sizeof(idle_channel), "idle_%d", 
num_idle);
+                       snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", 
idle_channel);
+
+                       res = PQexec(idle_conn, notify_cmd);
+                       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                       {
+                               pg_log_warning("LISTEN failed on idle 
connection %d: %s",
+                                                          num_idle, 
PQerrorMessage(idle_conn));
+                               PQfinish(idle_conn);
+                               PQclear(res);
+                               first_failure = true;
+                               break;
+                       }
+                       PQclear(res);
+
+                       idle_conns[num_idle] = idle_conn;
+                       num_idle++;
+               }
+
+               /* Stop if we couldn't add any connections */
+               if (first_failure && i == 0)
+                       break;
+       }
+
+       /* Clean up */
+       pg_log_info("cleaning up connections");
+       PQfinish(conn1);
+       PQfinish(conn2);
+       for (int i = 0; i < num_idle; i++)
+       {
+               if (idle_conns[i])
+                       PQfinish(idle_conns[i]);
+       }
+       pg_free(idle_conns);
+
+       pg_log_info("LISTEN/NOTIFY benchmark completed");
+}
+
 int
 main(int argc, char **argv)
 {
@@ -6739,6 +6959,9 @@ main(int argc, char **argv)
                {"verbose-errors", no_argument, NULL, 15},
                {"exit-on-abort", no_argument, NULL, 16},
                {"debug", no_argument, NULL, 17},
+               {"listen-notify-benchmark", no_argument, NULL, 18},
+               {"notify-round-trips", required_argument, NULL, 19},
+               {"notify-idle-step", required_argument, NULL, 20},
                {NULL, 0, NULL, 0}
        };
 
@@ -7092,6 +7315,22 @@ main(int argc, char **argv)
                        case 17:                        /* debug */
                                pg_logging_increase_verbosity();
                                break;
+                       case 18:                        /* 
listen-notify-benchmark */
+                               listen_notify_mode = true;
+                               benchmarking_option_set = true;
+                               break;
+                       case 19:                        /* notify-round-trips */
+                               benchmarking_option_set = true;
+                               if (!option_parse_int(optarg, 
"--notify-round-trips", 1, INT_MAX,
+                                                                         
&notify_round_trips))
+                                       exit(1);
+                               break;
+                       case 20:                        /* notify-idle-step */
+                               benchmarking_option_set = true;
+                               if (!option_parse_int(optarg, 
"--notify-idle-step", 1, INT_MAX,
+                                                                         
&notify_idle_step))
+                                       exit(1);
+                               break;
                        default:
                                /* getopt_long already emitted a complaint */
                                pg_log_error_hint("Try \"%s --help\" for more 
information.", progname);
@@ -7210,6 +7449,20 @@ main(int argc, char **argv)
                        pg_fatal("some of the specified options cannot be used 
in benchmarking mode");
        }
 
+       /* Handle LISTEN/NOTIFY benchmark mode */
+       if (listen_notify_mode)
+       {
+               /* Establish a database connection for setup */
+               if ((con = doConnect()) == NULL)
+                       pg_fatal("could not connect to database");
+
+               /* Run the LISTEN/NOTIFY benchmark */
+               runListenNotifyBenchmark();
+
+               PQfinish(con);
+               exit(0);
+       }
+
        if (nxacts > 0 && duration > 0)
                pg_fatal("specify either a number of transactions (-t) or a 
duration (-T), not both");
 

Reply via email to