On Wed, May 28, 2025 at 5:55 AM Dmitry Dolgov <9erthali...@gmail.com> wrote:
> I probably had to start with a statement that I find the current approach 
> reasonable, and I'm only curious if there is more to get out of it. I haven't 
> benchmarked the patch yet (plan getting to it when I'll get back), and can 
> imagine practical considerations significantly impacting any potential 
> solution.

Here's a rebase.
From fa7aac1bc9c0a47fbdbd9459424f08fa61b71ce2 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 11 Apr 2025 21:17:26 +1200
Subject: [PATCH v2 1/2] aio: Try repeatedly to give batched IOs to workers.

Previously, when the submission queue was full we'd run all remaining
IOs in a batched submissoin synchronously.  Andres rightly pointed out
that we should really try again between synchronous IOs, since the
workers might have made progress in draining the queue.

Suggested-by: Andres Freund <and...@anarazel.de>
Discussion: https://postgr.es/m/CA%2BhUKG%2Bm4xV0LMoH2c%3DoRAdEXuCnh%2BtGBTWa7uFeFMGgTLAw%2BQ%40mail.gmail.com
---
 src/backend/storage/aio/method_worker.c | 30 ++++++++++++++++++++++---
 1 file changed, 27 insertions(+), 3 deletions(-)

diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index bf8f77e6ff6..9a82d5f847d 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -282,12 +282,36 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
 		SetLatch(wakeup);
 
 	/* Run whatever is left synchronously. */
-	if (nsync > 0)
+	for (int i = 0; i < nsync; ++i)
 	{
-		for (int i = 0; i < nsync; ++i)
+		wakeup = NULL;
+
+		/*
+		 * Between synchronous IO operations, try again to enqueue as many as
+		 * we can.
+		 */
+		if (i > 0)
 		{
-			pgaio_io_perform_synchronously(synchronous_ios[i]);
+			wakeup = NULL;
+
+			LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+			while (i < nsync &&
+				   pgaio_worker_submission_queue_insert(synchronous_ios[i]))
+			{
+				if (wakeup == NULL && (worker = pgaio_worker_choose_idle()) >= 0)
+					wakeup = io_worker_control->workers[worker].latch;
+				i++;
+			}
+			LWLockRelease(AioWorkerSubmissionQueueLock);
+
+			if (wakeup)
+				SetLatch(wakeup);
+
+			if (i == nsync)
+				break;
 		}
+
+		pgaio_io_perform_synchronously(synchronous_ios[i]);
 	}
 }
 
-- 
2.47.2

From a0a5fff1f1d21c002bf68d36de9aff21bdf61783 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sat, 22 Mar 2025 00:36:49 +1300
Subject: [PATCH v2 2/2] aio: Adjust IO worker pool size automatically.

Replace the simple io_workers setting with:

  io_min_workers=1
  io_max_workers=8
  io_worker_idle_timeout=60s
  io_worker_launch_interval=500ms

The pool is automatically sized within the configured range according
to demand.

Discussion: https://postgr.es/m/CA%2BhUKG%2Bm4xV0LMoH2c%3DoRAdEXuCnh%2BtGBTWa7uFeFMGgTLAw%2BQ%40mail.gmail.com
---
 doc/src/sgml/config.sgml                      |  70 ++-
 src/backend/postmaster/postmaster.c           |  64 ++-
 src/backend/storage/aio/method_worker.c       | 445 ++++++++++++++----
 .../utils/activity/wait_event_names.txt       |   1 +
 src/backend/utils/misc/guc_tables.c           |  46 +-
 src/backend/utils/misc/postgresql.conf.sample |   5 +-
 src/include/storage/io_worker.h               |   9 +-
 src/include/storage/lwlocklist.h              |   1 +
 src/include/storage/pmsignal.h                |   1 +
 src/test/modules/test_aio/t/002_io_workers.pl |  15 +-
 10 files changed, 535 insertions(+), 122 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index c7acc0f182f..98532e55041 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2787,16 +2787,76 @@ include_dir 'conf.d'
        </listitem>
       </varlistentry>
 
-      <varlistentry id="guc-io-workers" xreflabel="io_workers">
-       <term><varname>io_workers</varname> (<type>integer</type>)
+      <varlistentry id="guc-io-min-workers" xreflabel="io_min_workers">
+       <term><varname>io_min_workers</varname> (<type>integer</type>)
        <indexterm>
-        <primary><varname>io_workers</varname> configuration parameter</primary>
+        <primary><varname>io_min_workers</varname> configuration parameter</primary>
        </indexterm>
        </term>
        <listitem>
         <para>
-         Selects the number of I/O worker processes to use. The default is
-         3. This parameter can only be set in the
+         Sets the minimum number of I/O worker processes to use. The default is
+         1. This parameter can only be set in the
+         <filename>postgresql.conf</filename> file or on the server command
+         line.
+        </para>
+        <para>
+         Only has an effect if <xref linkend="guc-io-method"/> is set to
+         <literal>worker</literal>.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry id="guc-io-max-workers" xreflabel="io_max_workers">
+       <term><varname>io_max_workers</varname> (<type>int</type>)
+       <indexterm>
+        <primary><varname>io_max_workers</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Sets the maximum number of I/O worker processes to use. The default is
+         8. This parameter can only be set in the
+         <filename>postgresql.conf</filename> file or on the server command
+         line.
+        </para>
+        <para>
+         Only has an effect if <xref linkend="guc-io-method"/> is set to
+         <literal>worker</literal>.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry id="guc-io-worker-idle-timeout" xreflabel="io_worker_idle_timeout">
+       <term><varname>io_worker_idle_timeout</varname> (<type>int</type>)
+       <indexterm>
+        <primary><varname>io_worker_idle_timeout</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Sets the time after which idle I/O worker processes will exit, reducing the
+         maximum size of the I/O worker pool towards the minimum.  The default
+         is 1 minute.
+         This parameter can only be set in the
+         <filename>postgresql.conf</filename> file or on the server command
+         line.
+        </para>
+        <para>
+         Only has an effect if <xref linkend="guc-io-method"/> is set to
+         <literal>worker</literal>.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry id="guc-io-worker-launch-interval" xreflabel="io_worker_launch_interval">
+       <term><varname>io_worker_launch_interval</varname> (<type>int</type>)
+       <indexterm>
+        <primary><varname>io_worker_launch_interval</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Sets the minimum time between launching new I/O workers.  This can be used to avoid
+         sudden bursts of new I/O workers.  The default is 100ms.
+         This parameter can only be set in the
          <filename>postgresql.conf</filename> file or on the server command
          line.
         </para>
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index cca9b946e53..a5438fa079d 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -408,6 +408,7 @@ static DNSServiceRef bonjour_sdref = NULL;
 #endif
 
 /* State for IO worker management. */
+static TimestampTz io_worker_launch_delay_until = 0;
 static int	io_worker_count = 0;
 static PMChild *io_worker_children[MAX_IO_WORKERS];
 
@@ -1569,6 +1570,15 @@ DetermineSleepTime(void)
 	if (StartWorkerNeeded)
 		return 0;
 
+	/* If we need a new IO worker, defer until launch delay expires. */
+	if (pgaio_worker_test_new_worker_needed() &&
+		io_worker_count < io_max_workers)
+	{
+		if (io_worker_launch_delay_until == 0)
+			return 0;
+		next_wakeup = io_worker_launch_delay_until;
+	}
+
 	if (HaveCrashedWorker)
 	{
 		dlist_mutable_iter iter;
@@ -3750,6 +3760,15 @@ process_pm_pmsignal(void)
 		StartWorkerNeeded = true;
 	}
 
+	/* Process IO worker start requets. */
+	if (CheckPostmasterSignal(PMSIGNAL_IO_WORKER_CHANGE))
+	{
+		/*
+		 * No local flag, as the state is exposed through pgaio_worker_*()
+		 * functions.  This signal is received on potentially actionable level
+		 * changes, so that maybe_adjust_io_workers() will run.
+		 */
+	}
 	/* Process background worker state changes. */
 	if (CheckPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE))
 	{
@@ -4355,8 +4374,9 @@ maybe_reap_io_worker(int pid)
 /*
  * Start or stop IO workers, to close the gap between the number of running
  * workers and the number of configured workers.  Used to respond to change of
- * the io_workers GUC (by increasing and decreasing the number of workers), as
- * well as workers terminating in response to errors (by starting
+ * the io_{min,max}_workers GUCs (by increasing and decreasing the number of
+ * workers) and requests to start a new one due to submission queue backlog,
+ * as well as workers terminating in response to errors (by starting
  * "replacement" workers).
  */
 static void
@@ -4385,8 +4405,16 @@ maybe_adjust_io_workers(void)
 
 	Assert(pmState < PM_WAIT_IO_WORKERS);
 
-	/* Not enough running? */
-	while (io_worker_count < io_workers)
+	/* Cancel the launch delay when it expires to minimize clock access. */
+	if (io_worker_launch_delay_until != 0 &&
+		io_worker_launch_delay_until <= GetCurrentTimestamp())
+		io_worker_launch_delay_until = 0;
+
+	/* Not enough workers running? */
+	while (io_worker_launch_delay_until == 0 &&
+		   io_worker_count < io_max_workers &&
+		   ((io_worker_count < io_min_workers ||
+			 pgaio_worker_clear_new_worker_needed())))
 	{
 		PMChild    *child;
 		int			i;
@@ -4400,6 +4428,16 @@ maybe_adjust_io_workers(void)
 		if (i == MAX_IO_WORKERS)
 			elog(ERROR, "could not find a free IO worker slot");
 
+		/*
+		 * Apply launch delay even for failures to avoid retrying too fast on
+		 * fork() failure, but not while we're still building the minimum pool
+		 * size.
+		 */
+		if (io_worker_count >= io_min_workers)
+			io_worker_launch_delay_until =
+				TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+											io_worker_launch_interval);
+
 		/* Try to launch one. */
 		child = StartChildProcess(B_IO_WORKER);
 		if (child != NULL)
@@ -4411,19 +4449,11 @@ maybe_adjust_io_workers(void)
 			break;				/* try again next time */
 	}
 
-	/* Too many running? */
-	if (io_worker_count > io_workers)
-	{
-		/* ask the IO worker in the highest slot to exit */
-		for (int i = MAX_IO_WORKERS - 1; i >= 0; --i)
-		{
-			if (io_worker_children[i] != NULL)
-			{
-				kill(io_worker_children[i]->pid, SIGUSR2);
-				break;
-			}
-		}
-	}
+	/*
+	 * If there are too many running because io_max_workers changed, that will
+	 * be handled by the IO workers themselves so they can shut down in
+	 * preferred order.
+	 */
 }
 
 
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index 9a82d5f847d..6d3f5289e18 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -11,9 +11,10 @@
  * infrastructure for reopening the file, and must processed synchronously by
  * the client code when submitted.
  *
- * So that the submitter can make just one system call when submitting a batch
- * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
- * could be improved by using futexes instead of latches to wake N waiters.
+ * When a batch of IOs is submitted, the lowest numbered idle worker is woken
+ * up.  If it sees more work in the queue it wakes a peer to help, and so on
+ * in a chain.  When a backlog is detected, the pool size is increased.  When
+ * the highest numbered worker times out after a period of inactivity.
  *
  * This method of AIO is available in all builds on all operating systems, and
  * is the default.
@@ -40,6 +41,8 @@
 #include "storage/io_worker.h"
 #include "storage/ipc.h"
 #include "storage/latch.h"
+#include "storage/lwlock.h"
+#include "storage/pmsignal.h"
 #include "storage/proc.h"
 #include "tcop/tcopprot.h"
 #include "utils/injection_point.h"
@@ -47,10 +50,8 @@
 #include "utils/ps_status.h"
 #include "utils/wait_event.h"
 
-
-/* How many workers should each worker wake up if needed? */
-#define IO_WORKER_WAKEUP_FANOUT 2
-
+/* Saturation for stats counters used to estimate wakeup:work ratio. */
+#define PGAIO_WORKER_STATS_MAX 64
 
 typedef struct PgAioWorkerSubmissionQueue
 {
@@ -63,17 +64,25 @@ typedef struct PgAioWorkerSubmissionQueue
 
 typedef struct PgAioWorkerSlot
 {
-	Latch	   *latch;
-	bool		in_use;
+	ProcNumber	proc_number;
 } PgAioWorkerSlot;
 
 typedef struct PgAioWorkerControl
 {
+	/* Seen by postmaster */
+	volatile bool new_worker_needed;
+
+	/* Potected by AioWorkerSubmissionQueueLock. */
 	uint64		idle_worker_mask;
+
+	/* Protected by AioWorkerControlLock. */
+	uint64		worker_set;
+	int			nworkers;
+
+	/* Protected by AioWorkerControlLock. */
 	PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
 } PgAioWorkerControl;
 
-
 static size_t pgaio_worker_shmem_size(void);
 static void pgaio_worker_shmem_init(bool first_time);
 
@@ -91,11 +100,14 @@ const IoMethodOps pgaio_worker_ops = {
 
 
 /* GUCs */
-int			io_workers = 3;
+int			io_min_workers = 1;
+int			io_max_workers = 8;
+int			io_worker_idle_timeout = 60000;
+int			io_worker_launch_interval = 500;
 
 
 static int	io_worker_queue_size = 64;
-static int	MyIoWorkerId;
+static int	MyIoWorkerId = -1;
 static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
 static PgAioWorkerControl *io_worker_control;
 
@@ -152,37 +164,172 @@ pgaio_worker_shmem_init(bool first_time)
 						&found);
 	if (!found)
 	{
+		io_worker_control->new_worker_needed = false;
+		io_worker_control->worker_set = 0;
 		io_worker_control->idle_worker_mask = 0;
 		for (int i = 0; i < MAX_IO_WORKERS; ++i)
-		{
-			io_worker_control->workers[i].latch = NULL;
-			io_worker_control->workers[i].in_use = false;
-		}
+			io_worker_control->workers[i].proc_number = INVALID_PROC_NUMBER;
 	}
 }
 
+static void
+pgaio_worker_consider_new_worker(uint32 queue_depth)
+{
+	/*
+	 * This is called from sites that don't hold AioWorkerControlLock, but it
+	 * changes infrequently and an up to date value is not required for this
+	 * heuristic purpose.
+	 */
+	if (!io_worker_control->new_worker_needed &&
+		queue_depth >= io_worker_control->nworkers)
+	{
+		io_worker_control->new_worker_needed = true;
+		SendPostmasterSignal(PMSIGNAL_IO_WORKER_CHANGE);
+	}
+}
+
+/*
+ * Called by a worker when the queue is empty, to try to prevent a delayed
+ * reaction to a brief burst.  This races against the postmaster acting on the
+ * old value if it was recently set to true, but that's OK, the ordering would
+ * be indeterminate anyway even if we could use locks in the postmaster.
+ */
+static void
+pgaio_worker_cancel_new_worker(void)
+{
+	io_worker_control->new_worker_needed = false;
+}
+
+/*
+ * Called by the postmaster to check if a new worker is needed.
+ */
+bool
+pgaio_worker_test_new_worker_needed(void)
+{
+	return io_worker_control->new_worker_needed;
+}
+
+/*
+ * Called by the postmaster to check if a new worker is needed when it's ready
+ * to launch one, and clear the flag.
+ */
+bool
+pgaio_worker_clear_new_worker_needed(void)
+{
+	bool		result;
+
+	result = io_worker_control->new_worker_needed;
+	if (result)
+		io_worker_control->new_worker_needed = false;
+
+	return result;
+}
+
+static uint64
+pgaio_worker_mask(int worker)
+{
+	return UINT64_C(1) << worker;
+}
+
+static void
+pgaio_worker_add(uint64 *set, int worker)
+{
+	*set |= pgaio_worker_mask(worker);
+}
+
+static void
+pgaio_worker_remove(uint64 *set, int worker)
+{
+	*set &= ~pgaio_worker_mask(worker);
+}
+
+#ifdef USE_ASSERT_CHECKING
+static bool
+pgaio_worker_in(uint64 set, int worker)
+{
+	return (set & pgaio_worker_mask(worker)) != 0;
+}
+#endif
+
+static uint64
+pgaio_worker_highest(uint64 set)
+{
+	return pg_leftmost_one_pos64(set);
+}
+
+static uint64
+pgaio_worker_lowest(uint64 set)
+{
+	return pg_rightmost_one_pos64(set);
+}
+
+static int
+pgaio_worker_pop(uint64 *set)
+{
+	int			worker;
+
+	Assert(set != 0);
+	worker = pgaio_worker_lowest(*set);
+	pgaio_worker_remove(set, worker);
+	return worker;
+}
+
 static int
 pgaio_worker_choose_idle(void)
 {
+	uint64		idle_worker_mask;
 	int			worker;
 
-	if (io_worker_control->idle_worker_mask == 0)
+	Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
+	/*
+	 * Workers only wake higher numbered workers, to try to encourage an
+	 * ordering of wakeup:work ratios, reducing spurious wakeups in lower
+	 * numbered workers.
+	 */
+	idle_worker_mask = io_worker_control->idle_worker_mask;
+	if (MyIoWorkerId != -1)
+		idle_worker_mask &= ~(pgaio_worker_mask(MyIoWorkerId) - 1);
+
+	if (idle_worker_mask == 0)
 		return -1;
 
 	/* Find the lowest bit position, and clear it. */
-	worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
-	io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
-	Assert(io_worker_control->workers[worker].in_use);
+	worker = pgaio_worker_lowest(idle_worker_mask);
+	pgaio_worker_remove(&io_worker_control->idle_worker_mask, worker);
 
 	return worker;
 }
 
+/*
+ * Try to wake a worker by setting its latch, to tell it there are IOs to
+ * process in the submission queue.
+ */
+static void
+pgaio_worker_wake(int worker)
+{
+	ProcNumber	proc_number;
+
+	/*
+	 * If the selected worker is concurrently exiting, then pgaio_worker_die()
+	 * had not yet removed it as of when we saw it in idle_worker_mask. That's
+	 * OK, because it will wake all remaining workers to close wakeup-vs-exit
+	 * races: *someone* will see the queued IO.  If there are no workers
+	 * running, the postmaster will start a new one.
+	 */
+	proc_number = io_worker_control->workers[worker].proc_number;
+	if (proc_number != INVALID_PROC_NUMBER)
+		SetLatch(&GetPGProcByNumber(proc_number)->procLatch);
+}
+
 static bool
 pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
 {
 	PgAioWorkerSubmissionQueue *queue;
 	uint32		new_head;
 
+	Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
 	queue = io_worker_submission_queue;
 	new_head = (queue->head + 1) & (queue->size - 1);
 	if (new_head == queue->tail)
@@ -204,6 +351,8 @@ pgaio_worker_submission_queue_consume(void)
 	PgAioWorkerSubmissionQueue *queue;
 	uint32		result;
 
+	Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
 	queue = io_worker_submission_queue;
 	if (queue->tail == queue->head)
 		return UINT32_MAX;		/* empty */
@@ -220,6 +369,8 @@ pgaio_worker_submission_queue_depth(void)
 	uint32		head;
 	uint32		tail;
 
+	Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
 	head = io_worker_submission_queue->head;
 	tail = io_worker_submission_queue->tail;
 
@@ -244,9 +395,9 @@ static void
 pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
 {
 	PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
+	uint32		queue_depth;
+	int			worker = -1;
 	int			nsync = 0;
-	Latch	   *wakeup = NULL;
-	int			worker;
 
 	Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
 
@@ -261,51 +412,48 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
 			 * we can to workers, to maximize concurrency.
 			 */
 			synchronous_ios[nsync++] = staged_ios[i];
-			continue;
 		}
-
-		if (wakeup == NULL)
+		else if (worker == -1)
 		{
 			/* Choose an idle worker to wake up if we haven't already. */
 			worker = pgaio_worker_choose_idle();
-			if (worker >= 0)
-				wakeup = io_worker_control->workers[worker].latch;
 
 			pgaio_debug_io(DEBUG4, staged_ios[i],
 						   "choosing worker %d",
 						   worker);
 		}
 	}
+	queue_depth = pgaio_worker_submission_queue_depth();
 	LWLockRelease(AioWorkerSubmissionQueueLock);
 
-	if (wakeup)
-		SetLatch(wakeup);
+	if (worker != -1)
+		pgaio_worker_wake(worker);
+	else
+		pgaio_worker_consider_new_worker(queue_depth);
 
 	/* Run whatever is left synchronously. */
 	for (int i = 0; i < nsync; ++i)
 	{
-		wakeup = NULL;
-
 		/*
 		 * Between synchronous IO operations, try again to enqueue as many as
 		 * we can.
 		 */
 		if (i > 0)
 		{
-			wakeup = NULL;
+			worker = -1;
 
 			LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
 			while (i < nsync &&
 				   pgaio_worker_submission_queue_insert(synchronous_ios[i]))
 			{
-				if (wakeup == NULL && (worker = pgaio_worker_choose_idle()) >= 0)
-					wakeup = io_worker_control->workers[worker].latch;
+				if (worker == -1)
+					worker = pgaio_worker_choose_idle();
 				i++;
 			}
 			LWLockRelease(AioWorkerSubmissionQueueLock);
 
-			if (wakeup)
-				SetLatch(wakeup);
+			if (worker != -1)
+				pgaio_worker_wake(worker);
 
 			if (i == nsync)
 				break;
@@ -337,14 +485,27 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
 static void
 pgaio_worker_die(int code, Datum arg)
 {
-	LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
-	Assert(io_worker_control->workers[MyIoWorkerId].in_use);
-	Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+	uint64		notify_set;
 
-	io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
-	io_worker_control->workers[MyIoWorkerId].in_use = false;
-	io_worker_control->workers[MyIoWorkerId].latch = NULL;
+	LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+	pgaio_worker_remove(&io_worker_control->idle_worker_mask, MyIoWorkerId);
 	LWLockRelease(AioWorkerSubmissionQueueLock);
+
+	LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
+	Assert(io_worker_control->workers[MyIoWorkerId].proc_number == MyProcNumber);
+	io_worker_control->workers[MyIoWorkerId].proc_number = INVALID_PROC_NUMBER;
+	Assert(pgaio_worker_in(io_worker_control->worker_set, MyIoWorkerId));
+	pgaio_worker_remove(&io_worker_control->worker_set, MyIoWorkerId);
+	notify_set = io_worker_control->worker_set;
+	Assert(io_worker_control->nworkers > 0);
+	io_worker_control->nworkers--;
+	Assert(pg_popcount64(io_worker_control->worker_set) ==
+		   io_worker_control->nworkers);
+	LWLockRelease(AioWorkerControlLock);
+
+	/* Notify other workers on pool change. */
+	while (notify_set != 0)
+		pgaio_worker_wake(pgaio_worker_pop(&notify_set));
 }
 
 /*
@@ -354,33 +515,37 @@ pgaio_worker_die(int code, Datum arg)
 static void
 pgaio_worker_register(void)
 {
-	MyIoWorkerId = -1;
+	uint64		worker_set_inverted;
+	uint64		old_worker_set;
 
-	/*
-	 * XXX: This could do with more fine-grained locking. But it's also not
-	 * very common for the number of workers to change at the moment...
-	 */
-	LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+	MyIoWorkerId = -1;
 
-	for (int i = 0; i < MAX_IO_WORKERS; ++i)
+	LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
+	worker_set_inverted = ~io_worker_control->worker_set;
+	if (worker_set_inverted != 0)
 	{
-		if (!io_worker_control->workers[i].in_use)
-		{
-			Assert(io_worker_control->workers[i].latch == NULL);
-			io_worker_control->workers[i].in_use = true;
-			MyIoWorkerId = i;
-			break;
-		}
-		else
-			Assert(io_worker_control->workers[i].latch != NULL);
+		MyIoWorkerId = pgaio_worker_lowest(worker_set_inverted);
+		if (MyIoWorkerId >= MAX_IO_WORKERS)
+			MyIoWorkerId = -1;
 	}
-
 	if (MyIoWorkerId == -1)
 		elog(ERROR, "couldn't find a free worker slot");
 
-	io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
-	io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
-	LWLockRelease(AioWorkerSubmissionQueueLock);
+	Assert(io_worker_control->workers[MyIoWorkerId].proc_number ==
+		   INVALID_PROC_NUMBER);
+	io_worker_control->workers[MyIoWorkerId].proc_number = MyProcNumber;
+
+	old_worker_set = io_worker_control->worker_set;
+	Assert(!pgaio_worker_in(old_worker_set, MyIoWorkerId));
+	pgaio_worker_add(&io_worker_control->worker_set, MyIoWorkerId);
+	io_worker_control->nworkers++;
+	Assert(pg_popcount64(io_worker_control->worker_set) ==
+		   io_worker_control->nworkers);
+	LWLockRelease(AioWorkerControlLock);
+
+	/* Notify other workers on pool change. */
+	while (old_worker_set != 0)
+		pgaio_worker_wake(pgaio_worker_pop(&old_worker_set));
 
 	on_shmem_exit(pgaio_worker_die, 0);
 }
@@ -406,14 +571,47 @@ pgaio_worker_error_callback(void *arg)
 	errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
 }
 
+/*
+ * Check if this backend is allowed to time out, and thus should use a
+ * non-infinite sleep time.  Only the highest-numbered worker is allowed to
+ * time out, and only if the pool is above io_min_workers.  Serializing
+ * timeouts keeps IDs in a range 0..N without gaps, and avoids undershooting
+ * io_min_workers.
+ *
+ * The result is only instantaneously true and may be temporarily inconsistent
+ * in different workers around transitions, but all workers are woken up on
+ * pool size or GUC changes making the result eventually consistent.
+ */
+static bool
+pgaio_worker_can_timeout(void)
+{
+	uint64		worker_set;
+
+	/* Serialize against pool sized changes. */
+	LWLockAcquire(AioWorkerControlLock, LW_SHARED);
+	worker_set = io_worker_control->worker_set;
+	LWLockRelease(AioWorkerControlLock);
+
+	if (MyIoWorkerId != pgaio_worker_highest(worker_set))
+		return false;
+	if (MyIoWorkerId < io_min_workers)
+		return false;
+
+	return true;
+}
+
 void
 IoWorkerMain(const void *startup_data, size_t startup_data_len)
 {
 	sigjmp_buf	local_sigjmp_buf;
+	TimestampTz idle_timeout_abs = 0;
+	int			timeout_guc_used = 0;
 	PgAioHandle *volatile error_ioh = NULL;
 	ErrorContextCallback errcallback = {0};
 	volatile int error_errno = 0;
 	char		cmd[128];
+	int			ios = 0;
+	int			wakeups = 0;
 
 	MyBackendType = B_IO_WORKER;
 	AuxiliaryProcessMainCommon();
@@ -482,10 +680,8 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
 	while (!ShutdownRequestPending)
 	{
 		uint32		io_index;
-		Latch	   *latches[IO_WORKER_WAKEUP_FANOUT];
-		int			nlatches = 0;
-		int			nwakeups = 0;
-		int			worker;
+		uint32		queue_depth;
+		int			worker = -1;
 
 		/*
 		 * Try to get a job to do.
@@ -494,40 +690,48 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
 		 * to ensure that we don't see an outdated data in the handle.
 		 */
 		LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
-		if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
+		io_index = pgaio_worker_submission_queue_consume();
+		queue_depth = pgaio_worker_submission_queue_depth();
+		if (io_index == UINT32_MAX)
 		{
-			/*
-			 * Nothing to do.  Mark self idle.
-			 *
-			 * XXX: Invent some kind of back pressure to reduce useless
-			 * wakeups?
-			 */
-			io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+			/* Nothing to do.  Mark self idle. */
+			pgaio_worker_add(&io_worker_control->idle_worker_mask,
+							 MyIoWorkerId);
 		}
 		else
 		{
 			/* Got one.  Clear idle flag. */
-			io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
+			pgaio_worker_remove(&io_worker_control->idle_worker_mask,
+								MyIoWorkerId);
 
-			/* See if we can wake up some peers. */
-			nwakeups = Min(pgaio_worker_submission_queue_depth(),
-						   IO_WORKER_WAKEUP_FANOUT);
-			for (int i = 0; i < nwakeups; ++i)
-			{
-				if ((worker = pgaio_worker_choose_idle()) < 0)
-					break;
-				latches[nlatches++] = io_worker_control->workers[worker].latch;
-			}
+			/*
+			 * See if we should wake up a peer.  Only do this if this worker
+			 * is not experiencing spurious wakeups itself, to end a chain of
+			 * wasted scheduling.
+			 */
+			if (queue_depth > 0 && wakeups <= ios)
+				worker = pgaio_worker_choose_idle();
 		}
 		LWLockRelease(AioWorkerSubmissionQueueLock);
 
-		for (int i = 0; i < nlatches; ++i)
-			SetLatch(latches[i]);
+		/* Propagate wakeups. */
+		if (worker != -1)
+			pgaio_worker_wake(worker);
+		else if (wakeups <= ios)
+			pgaio_worker_consider_new_worker(queue_depth);
 
 		if (io_index != UINT32_MAX)
 		{
 			PgAioHandle *ioh = NULL;
 
+			/* Cancel timeout and update wakeup:work ratio. */
+			idle_timeout_abs = 0;
+			if (++ios == PGAIO_WORKER_STATS_MAX)
+			{
+				ios /= 2;
+				wakeups /= 2;
+			}
+
 			ioh = &pgaio_ctl->io_handles[io_index];
 			error_ioh = ioh;
 			errcallback.arg = ioh;
@@ -593,8 +797,69 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
 		}
 		else
 		{
-			WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
-					  WAIT_EVENT_IO_WORKER_MAIN);
+			int			timeout_ms;
+
+			/* Cancel new worker if pending. */
+			pgaio_worker_cancel_new_worker();
+
+			/* Compute the remaining allowed idle time. */
+			if (io_worker_idle_timeout == -1)
+			{
+				/* Never time out. */
+				timeout_ms = -1;
+			}
+			else
+			{
+				TimestampTz now = GetCurrentTimestamp();
+
+				/* If the GUC changes, reset timer. */
+				if (idle_timeout_abs != 0 &&
+					io_worker_idle_timeout != timeout_guc_used)
+					idle_timeout_abs = 0;
+
+				/* On first sleep, compute absolute timeout. */
+				if (idle_timeout_abs == 0)
+				{
+					idle_timeout_abs =
+						TimestampTzPlusMilliseconds(now,
+													io_worker_idle_timeout);
+					timeout_guc_used = io_worker_idle_timeout;
+				}
+
+				/*
+				 * All workers maintain the absolute timeout value, but only
+				 * the highest worker can actually time out and only if
+				 * io_min_workers is exceeded.  All others wait only for
+				 * explicit wakeups caused by queue insertion, wakeup
+				 * propagation, change of pool size (possibly making them
+				 * highest), or GUC reload.
+				 */
+				if (pgaio_worker_can_timeout())
+					timeout_ms =
+						TimestampDifferenceMilliseconds(now,
+														idle_timeout_abs);
+				else
+					timeout_ms = -1;
+			}
+
+			if (WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
+						  timeout_ms,
+						  WAIT_EVENT_IO_WORKER_MAIN) == WL_TIMEOUT)
+			{
+				/* WL_TIMEOUT */
+				if (pgaio_worker_can_timeout())
+					if (GetCurrentTimestamp() >= idle_timeout_abs)
+						break;
+			}
+			else
+			{
+				/* WL_LATCH_SET */
+				if (++wakeups == PGAIO_WORKER_STATS_MAX)
+				{
+					ios /= 2;
+					wakeups /= 2;
+				}
+			}
 			ResetLatch(MyLatch);
 		}
 
@@ -604,6 +869,10 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
 		{
 			ConfigReloadPending = false;
 			ProcessConfigFile(PGC_SIGHUP);
+
+			/* If io_max_workers has been decreased, exit highest first. */
+			if (MyIoWorkerId >= io_max_workers)
+				break;
 		}
 	}
 
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 4da68312b5f..c6c8107fe33 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -352,6 +352,7 @@ DSMRegistry	"Waiting to read or update the dynamic shared memory registry."
 InjectionPoint	"Waiting to read or update information related to injection points."
 SerialControl	"Waiting to read or update shared <filename>pg_serial</filename> state."
 AioWorkerSubmissionQueue	"Waiting to access AIO worker submission queue."
+AioWorkerControl	"Waiting to update AIO worker information."
 
 #
 # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index d14b1678e7f..ecb16facb67 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3306,14 +3306,52 @@ struct config_int ConfigureNamesInt[] =
 	},
 
 	{
-		{"io_workers",
+		{"io_max_workers",
 			PGC_SIGHUP,
 			RESOURCES_IO,
-			gettext_noop("Number of IO worker processes, for io_method=worker."),
+			gettext_noop("Maximum number of IO worker processes, for io_method=worker."),
 			NULL,
 		},
-		&io_workers,
-		3, 1, MAX_IO_WORKERS,
+		&io_max_workers,
+		8, 1, MAX_IO_WORKERS,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"io_min_workers",
+			PGC_SIGHUP,
+			RESOURCES_IO,
+			gettext_noop("Minimum number of IO worker processes, for io_method=worker."),
+			NULL,
+		},
+		&io_min_workers,
+		1, 1, MAX_IO_WORKERS,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"io_worker_idle_timeout",
+			PGC_SIGHUP,
+			RESOURCES_IO,
+			gettext_noop("Maximum idle time before IO workers exit, for io_method=worker."),
+			NULL,
+			GUC_UNIT_MS
+		},
+		&io_worker_idle_timeout,
+		60 * 1000, -1, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"io_worker_launch_interval",
+			PGC_SIGHUP,
+			RESOURCES_IO,
+			gettext_noop("Maximum idle time between launching IO workers, for io_method=worker."),
+			NULL,
+			GUC_UNIT_MS
+		},
+		&io_worker_launch_interval,
+		500, 0, INT_MAX,
 		NULL, NULL, NULL
 	},
 
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index a9d8293474a..1da6345ad7a 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -210,7 +210,10 @@
 					# can execute simultaneously
 					# -1 sets based on shared_buffers
 					# (change requires restart)
-#io_workers = 3				# 1-32;
+#io_min_workers = 1			# 1-32;
+#io_max_workers = 8			# 1-32;
+#io_worker_idle_timeout = 60s		# min 100ms
+#io_worker_launch_interval = 500ms	# min 0ms
 
 # - Worker Processes -
 
diff --git a/src/include/storage/io_worker.h b/src/include/storage/io_worker.h
index 7bde7e89c8a..de9c80109e0 100644
--- a/src/include/storage/io_worker.h
+++ b/src/include/storage/io_worker.h
@@ -17,6 +17,13 @@
 
 pg_noreturn extern void IoWorkerMain(const void *startup_data, size_t startup_data_len);
 
-extern PGDLLIMPORT int io_workers;
+extern PGDLLIMPORT int io_min_workers;
+extern PGDLLIMPORT int io_max_workers;
+extern PGDLLIMPORT int io_worker_idle_timeout;
+extern PGDLLIMPORT int io_worker_launch_interval;
+
+/* Interfaces visible to the postmaster. */
+extern bool pgaio_worker_test_new_worker_needed(void);
+extern bool pgaio_worker_clear_new_worker_needed(void);
 
 #endif							/* IO_WORKER_H */
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index a9681738146..c1801d08833 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -84,3 +84,4 @@ PG_LWLOCK(50, DSMRegistry)
 PG_LWLOCK(51, InjectionPoint)
 PG_LWLOCK(52, SerialControl)
 PG_LWLOCK(53, AioWorkerSubmissionQueue)
+PG_LWLOCK(54, AioWorkerControl)
diff --git a/src/include/storage/pmsignal.h b/src/include/storage/pmsignal.h
index 428aa3fd68a..2859a636349 100644
--- a/src/include/storage/pmsignal.h
+++ b/src/include/storage/pmsignal.h
@@ -38,6 +38,7 @@ typedef enum
 	PMSIGNAL_ROTATE_LOGFILE,	/* send SIGUSR1 to syslogger to rotate logfile */
 	PMSIGNAL_START_AUTOVAC_LAUNCHER,	/* start an autovacuum launcher */
 	PMSIGNAL_START_AUTOVAC_WORKER,	/* start an autovacuum worker */
+	PMSIGNAL_IO_WORKER_CHANGE,	/* IO worker pool change */
 	PMSIGNAL_BACKGROUND_WORKER_CHANGE,	/* background worker state change */
 	PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */
 	PMSIGNAL_ADVANCE_STATE_MACHINE, /* advance postmaster's state machine */
diff --git a/src/test/modules/test_aio/t/002_io_workers.pl b/src/test/modules/test_aio/t/002_io_workers.pl
index af5fae15ea7..a0252857798 100644
--- a/src/test/modules/test_aio/t/002_io_workers.pl
+++ b/src/test/modules/test_aio/t/002_io_workers.pl
@@ -14,6 +14,9 @@ $node->init();
 $node->append_conf(
 	'postgresql.conf', qq(
 io_method=worker
+io_worker_idle_timeout=0ms
+io_worker_launch_interval=0ms
+io_max_workers=32
 ));
 
 $node->start();
@@ -31,7 +34,7 @@ sub test_number_of_io_workers_dynamic
 {
 	my $node = shift;
 
-	my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_workers');
+	my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_min_workers');
 
 	# Verify that worker count can't be set to 0
 	change_number_of_io_workers($node, 0, $prev_worker_count, 1);
@@ -62,23 +65,23 @@ sub change_number_of_io_workers
 	my ($result, $stdout, $stderr);
 
 	($result, $stdout, $stderr) =
-	  $node->psql('postgres', "ALTER SYSTEM SET io_workers = $worker_count");
+	  $node->psql('postgres', "ALTER SYSTEM SET io_min_workers = $worker_count");
 	$node->safe_psql('postgres', 'SELECT pg_reload_conf()');
 
 	if ($expect_failure)
 	{
 		ok( $stderr =~
-			  /$worker_count is outside the valid range for parameter "io_workers"/,
-			"updating number of io_workers to $worker_count failed, as expected"
+			  /$worker_count is outside the valid range for parameter "io_min_workers"/,
+			"updating number of io_min_workers to $worker_count failed, as expected"
 		);
 
 		return $prev_worker_count;
 	}
 	else
 	{
-		is( $node->safe_psql('postgres', 'SHOW io_workers'),
+		is( $node->safe_psql('postgres', 'SHOW io_min_workers'),
 			$worker_count,
-			"updating number of io_workers from $prev_worker_count to $worker_count"
+			"updating number of io_min_workers from $prev_worker_count to $worker_count"
 		);
 
 		check_io_worker_count($node, $worker_count);
-- 
2.47.2

Reply via email to