On Sat, Oct 11, 2025, at 08:43, Joel Jacobson wrote:
> In addition to previously suggested optimization, there is another major
> one that seems doable, that would mean a great improvement for workload
> having large traffic differences between channels, i.e. some low traffic
> and some high traffic.
>
> I'm not entirely sure this approach is correct though, I've might
> misunderstood the guarantees of the heavyweight lock. My assumption is
> based on that there can only be one backend that is currently running
> the code in PreCommit_Notify after having aquired the heavyweight lock.
> If this is not true, then it doesn't work. What made me worried is the
> exclusive lock we also take inside the same function, I don't see the
> point of it since we're already holding the heavyweight lock, but maybe
> this is just to "allows deadlocks to be detected" like the comment says?
..,
> * 0003-optimize_listen_notify-v14.patch:
>
> Optimize LISTEN/NOTIFY by advancing idle backends directly
>
> Building on the previous channel-specific listener tracking
> optimization, this patch further reduces context switching by detecting
> idle listening backends that don't listen to any of the channels being
> notified and advancing their queue positions directly without waking
> them up.
...
> 0003-optimize_listen_notify-v14.patch:
>
> idle_listeners  round_trips_per_sec     max_latency_usec
>              0              33236.8                 1090
>             10              34681.0                 1338
>             20              34530.4                 1372
>             30              34061.6                 1339
>             40              33084.5                  913
>             50              33847.5                  955
>             60              33675.8                 1239
>             70              28857.4                20443
>             80              33324.9                  786
>             90              33612.3                  758
>            100              31259.2                 7706

I noticed the strange data point at idle_listeners=70.
This made me think about the "wake tail only" trick,
and realized this is now unnecessary with the new 0003 idea.

New version attached that removes that part from the 0003 patch.

This also of course improved the stability of max_latency_usec,
since in this specific benchmark all other listeners are always idle,
so they don't need to be woken up ever:

idle_listeners  round_trips_per_sec     max_latency_usec
             0              33631.8                  546
            10              34318.0                  586
            20              34813.0                  596
            30              35073.4                  574
            40              34646.1                  569
            50              33755.5                  634
            60              33991.6                  561
            70              34049.0                  550
            80              33886.0                  541
            90              33545.0                  540
           100              33163.1                  660

/Joel

Attachment: 0001-optimize_listen_notify-v15.patch
Description: Binary data

Attachment: 0002-optimize_listen_notify-v15.patch
Description: Binary data

Attachment: 0003-optimize_listen_notify-v15.patch
Description: Binary data

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1515ed405ba..b462dcc8348 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -35,6 +35,7 @@
 
 #include <ctype.h>
 #include <float.h>
+#include <inttypes.h>
 #include <limits.h>
 #include <math.h>
 #include <signal.h>
@@ -237,6 +238,11 @@ static const char *const PARTITION_METHOD[] = {"none", 
"range", "hash"};
 /* random seed used to initialize base_random_sequence */
 static int64 random_seed = -1;
 
+/* LISTEN/NOTIFY benchmark mode parameters */
+static bool listen_notify_mode = false;        /* enable LISTEN/NOTIFY 
benchmark */
+static int     notify_round_trips = 100;       /* number of round-trips per 
iteration */
+static int     notify_idle_step = 10;          /* idle listeners to add per 
iteration */
+
 /*
  * end of configurable parameters
  *********************************************************************/
@@ -930,6 +936,10 @@ usage(void)
                   "                           (same as \"-b simple-update\")\n"
                   "  -S, --select-only        perform SELECT-only 
transactions\n"
                   "                           (same as \"-b select-only\")\n"
+                  "  --listen-notify-benchmark\n"
+                  "                           run LISTEN/NOTIFY round-trip 
benchmark\n"
+                  "  --notify-round-trips=NUM number of round-trips per 
iteration (default: 100)\n"
+                  "  --notify-idle-step=NUM   idle listeners to add per 
iteration (default: 10)\n"
                   "\nBenchmarking options:\n"
                   "  -c, --client=NUM         number of concurrent database 
clients (default: 1)\n"
                   "  -C, --connect            establish new connection for 
each transaction\n"
@@ -6689,6 +6699,216 @@ set_random_seed(const char *seed)
        return true;
 }
 
+/*
+ * Run LISTEN/NOTIFY round-trip benchmark
+ *
+ * This benchmark measures the round-trip time between two processes that
+ * ping-pong NOTIFY messages while adding idle listening connections.
+ */
+static void
+runListenNotifyBenchmark(void)
+{
+       PGconn     *conn1 = NULL;
+       PGconn     *conn2 = NULL;
+       PGconn    **idle_conns = NULL;
+       int                     num_idle = 0;
+       int                     max_idle = 10000;       /* reasonable upper 
limit */
+       PGresult   *res;
+       char            channel1[] = "pgbench_channel_1";
+       char            channel2[] = "pgbench_channel_2";
+       char            notify_cmd[256];
+       bool            first_failure = false;
+
+       pg_log_info("starting LISTEN/NOTIFY round-trip benchmark");
+       pg_log_info("round-trips per iteration: %d", notify_round_trips);
+       pg_log_info("idle listeners added per iteration: %d", notify_idle_step);
+       printf("\n%14s  %19s  %19s\n", "idle_listeners", "round_trips_per_sec", 
"max_latency_usec");
+
+       /* Allocate array for idle connections */
+       idle_conns = (PGconn **) pg_malloc0(max_idle * sizeof(PGconn *));
+
+       /* Create two active connections for ping-pong */
+       conn1 = doConnect();
+       if (conn1 == NULL)
+               pg_fatal("failed to create connection 1");
+
+       conn2 = doConnect();
+       if (conn2 == NULL)
+               pg_fatal("failed to create connection 2");
+
+       /* Set up LISTEN on both connections */
+       snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", channel1);
+       res = PQexec(conn1, notify_cmd);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pg_fatal("LISTEN failed on connection 1: %s", 
PQerrorMessage(conn1));
+       PQclear(res);
+
+       snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", channel2);
+       res = PQexec(conn2, notify_cmd);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pg_fatal("LISTEN failed on connection 2: %s", 
PQerrorMessage(conn2));
+       PQclear(res);
+
+       /* Main benchmark loop: measure round-trips then add idle connections */
+       while (num_idle < max_idle)
+       {
+               int                     i;
+               int64           total_latency = 0;
+               int64           max_latency = 0;
+
+               /* Perform round-trip measurements */
+               for (i = 0; i < notify_round_trips; i++)
+               {
+                       pg_time_usec_t start_time,
+                                               end_time;
+                       int64           latency;
+                       PGnotify   *notify;
+                       int                     sock;
+                       fd_set          input_mask;
+                       struct timeval tv;
+
+                       /* Clear any pending notifications */
+                       PQconsumeInput(conn1);
+                       while ((notify = PQnotifies(conn1)) != NULL)
+                               PQfreemem(notify);
+                       PQconsumeInput(conn2);
+                       while ((notify = PQnotifies(conn2)) != NULL)
+                               PQfreemem(notify);
+
+                       /* Start timer and send notification from conn1 */
+                       start_time = pg_time_now();
+                       snprintf(notify_cmd, sizeof(notify_cmd), "NOTIFY %s", 
channel2);
+                       res = PQexec(conn1, notify_cmd);
+                       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                               pg_fatal("NOTIFY failed: %s", 
PQerrorMessage(conn1));
+                       PQclear(res);
+
+                       /* Wait for notification on conn2 */
+                       sock = PQsocket(conn2);
+                       notify = NULL;
+                       while (notify == NULL)
+                       {
+                               PQconsumeInput(conn2);
+                               notify = PQnotifies(conn2);
+                               if (notify == NULL)
+                               {
+                                       /* Wait for data on socket */
+                                       FD_ZERO(&input_mask);
+                                       FD_SET(sock, &input_mask);
+                                       tv.tv_sec = 10; /* 10 second timeout */
+                                       tv.tv_usec = 0;
+                                       if (select(sock + 1, &input_mask, NULL, 
NULL, &tv) < 0)
+                                               pg_fatal("select() failed: %m");
+                               }
+                       }
+                       PQfreemem(notify);
+
+                       /* Send notification back from conn2 */
+                       snprintf(notify_cmd, sizeof(notify_cmd), "NOTIFY %s", 
channel1);
+                       res = PQexec(conn2, notify_cmd);
+                       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                               pg_fatal("NOTIFY failed: %s", 
PQerrorMessage(conn2));
+                       PQclear(res);
+
+                       /* Wait for notification on conn1 */
+                       sock = PQsocket(conn1);
+                       notify = NULL;
+                       while (notify == NULL)
+                       {
+                               PQconsumeInput(conn1);
+                               notify = PQnotifies(conn1);
+                               if (notify == NULL)
+                               {
+                                       /* Wait for data on socket */
+                                       FD_ZERO(&input_mask);
+                                       FD_SET(sock, &input_mask);
+                                       tv.tv_sec = 10; /* 10 second timeout */
+                                       tv.tv_usec = 0;
+                                       if (select(sock + 1, &input_mask, NULL, 
NULL, &tv) < 0)
+                                               pg_fatal("select() failed: %m");
+                               }
+                       }
+                       PQfreemem(notify);
+
+                       /* End timer */
+                       end_time = pg_time_now();
+
+                       /* Calculate individual round-trip latency */
+                       latency = end_time - start_time;
+
+                       /* Accumulate total latency and track maximum */
+                       total_latency += latency;
+                       if (latency > max_latency)
+                               max_latency = latency;
+               }
+
+               /* Calculate and report round-trips per second and max latency 
*/
+               fprintf(stdout, "%14d  %19.1f  %19" PRId64 "\n",
+                               num_idle,
+                               1000000.0 * notify_round_trips / total_latency,
+                               max_latency);
+               fflush(stdout);
+
+               /* Stop if we hit connection limit */
+               if (first_failure)
+                       break;
+
+               /* Add idle listening connections */
+               for (i = 0; i < notify_idle_step && num_idle < max_idle; i++)
+               {
+                       PGconn     *idle_conn;
+                       char            idle_channel[256];
+
+                       idle_conn = doConnect();
+                       if (idle_conn == NULL)
+                       {
+                               if (!first_failure)
+                               {
+                                       pg_log_info("reached max_connections at 
%d idle listeners", num_idle);
+                                       first_failure = true;
+                               }
+                               break;
+                       }
+
+                       /* Each idle connection listens on a unique channel */
+                       snprintf(idle_channel, sizeof(idle_channel), "idle_%d", 
num_idle);
+                       snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", 
idle_channel);
+
+                       res = PQexec(idle_conn, notify_cmd);
+                       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                       {
+                               pg_log_warning("LISTEN failed on idle 
connection %d: %s",
+                                                          num_idle, 
PQerrorMessage(idle_conn));
+                               PQfinish(idle_conn);
+                               PQclear(res);
+                               first_failure = true;
+                               break;
+                       }
+                       PQclear(res);
+
+                       idle_conns[num_idle] = idle_conn;
+                       num_idle++;
+               }
+
+               /* Stop if we couldn't add any connections */
+               if (first_failure && i == 0)
+                       break;
+       }
+
+       /* Clean up */
+       pg_log_info("cleaning up connections");
+       PQfinish(conn1);
+       PQfinish(conn2);
+       for (int i = 0; i < num_idle; i++)
+       {
+               if (idle_conns[i])
+                       PQfinish(idle_conns[i]);
+       }
+       pg_free(idle_conns);
+
+       pg_log_info("LISTEN/NOTIFY benchmark completed");
+}
+
 int
 main(int argc, char **argv)
 {
@@ -6739,6 +6959,9 @@ main(int argc, char **argv)
                {"verbose-errors", no_argument, NULL, 15},
                {"exit-on-abort", no_argument, NULL, 16},
                {"debug", no_argument, NULL, 17},
+               {"listen-notify-benchmark", no_argument, NULL, 18},
+               {"notify-round-trips", required_argument, NULL, 19},
+               {"notify-idle-step", required_argument, NULL, 20},
                {NULL, 0, NULL, 0}
        };
 
@@ -7092,6 +7315,22 @@ main(int argc, char **argv)
                        case 17:                        /* debug */
                                pg_logging_increase_verbosity();
                                break;
+                       case 18:                        /* 
listen-notify-benchmark */
+                               listen_notify_mode = true;
+                               benchmarking_option_set = true;
+                               break;
+                       case 19:                        /* notify-round-trips */
+                               benchmarking_option_set = true;
+                               if (!option_parse_int(optarg, 
"--notify-round-trips", 1, INT_MAX,
+                                                                         
&notify_round_trips))
+                                       exit(1);
+                               break;
+                       case 20:                        /* notify-idle-step */
+                               benchmarking_option_set = true;
+                               if (!option_parse_int(optarg, 
"--notify-idle-step", 1, INT_MAX,
+                                                                         
&notify_idle_step))
+                                       exit(1);
+                               break;
                        default:
                                /* getopt_long already emitted a complaint */
                                pg_log_error_hint("Try \"%s --help\" for more 
information.", progname);
@@ -7210,6 +7449,20 @@ main(int argc, char **argv)
                        pg_fatal("some of the specified options cannot be used 
in benchmarking mode");
        }
 
+       /* Handle LISTEN/NOTIFY benchmark mode */
+       if (listen_notify_mode)
+       {
+               /* Establish a database connection for setup */
+               if ((con = doConnect()) == NULL)
+                       pg_fatal("could not connect to database");
+
+               /* Run the LISTEN/NOTIFY benchmark */
+               runListenNotifyBenchmark();
+
+               PQfinish(con);
+               exit(0);
+       }
+
        if (nxacts > 0 && duration > 0)
                pg_fatal("specify either a number of transactions (-t) or a 
duration (-T), not both");
 

Reply via email to