Here is a rebase. I would like to push these shortly if there are no
objections. I propose 8 as the default upper limit.
From ea67807f7ba32f02943a78f2a15e20748df4dd14 Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Wed, 18 Mar 2026 16:54:34 +1300
Subject: [PATCH v3 1/3] aio: Simplify pgaio_worker_submit().
Rename pgaio_worker_submit_internal() to pgaio_worker_submit(). The
extra wrapper didn't serve any useful purpose.
---
src/backend/storage/aio/method_worker.c | 20 +++++---------------
1 file changed, 5 insertions(+), 15 deletions(-)
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index efe38e9f113..e24357a7a0a 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -239,8 +239,8 @@ pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
|| !pgaio_io_can_reopen(ioh);
}
-static void
-pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
+static int
+pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
{
PgAioHandle **synchronous_ios = NULL;
int nsync = 0;
@@ -249,6 +249,9 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
+ for (int i = 0; i < num_staged_ios; i++)
+ pgaio_io_prepare_submit(staged_ios[i]);
+
if (LWLockConditionalAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE))
{
for (int i = 0; i < num_staged_ios; ++i)
@@ -299,19 +302,6 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
pgaio_io_perform_synchronously(synchronous_ios[i]);
}
}
-}
-
-static int
-pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
-{
- for (int i = 0; i < num_staged_ios; i++)
- {
- PgAioHandle *ioh = staged_ios[i];
-
- pgaio_io_prepare_submit(ioh);
- }
-
- pgaio_worker_submit_internal(num_staged_ios, staged_ios);
return num_staged_ios;
}
--
2.53.0
From 1ad41a1f07343fd676ee7bb9741dbf539889f50f Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Fri, 11 Apr 2025 21:17:26 +1200
Subject: [PATCH v3 2/3] aio: Improve I/O worker behavior on full queue.
Previously, when the submission queue was full we'd run all remaining
IOs in a batch synchronously. Now we'll try again between synchronous
operations, because the I/O workers might have drained some of the
queue.
Suggested-by: Andres Freund <[email protected]>
Discussion: https://postgr.es/m/CA%2BhUKG%2Bm4xV0LMoH2c%3DoRAdEXuCnh%2BtGBTWa7uFeFMGgTLAw%2BQ%40mail.gmail.com
---
src/backend/storage/aio/method_worker.c | 24 +++++++++++++++++++++---
1 file changed, 21 insertions(+), 3 deletions(-)
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index e24357a7a0a..b1b0b6848a0 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -295,11 +295,29 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
SetLatch(wakeup);
/* Run whatever is left synchronously. */
- if (nsync > 0)
+ while (nsync > 0)
{
- for (int i = 0; i < nsync; ++i)
+ pgaio_io_perform_synchronously(*synchronous_ios++);
+ nsync--;
+
+ /* Between synchronous operations, try to enqueue again. */
+ if (nsync > 0)
{
- pgaio_io_perform_synchronously(synchronous_ios[i]);
+ wakeup = NULL;
+ if (LWLockConditionalAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE))
+ {
+ while (nsync > 0 &&
+ pgaio_worker_submission_queue_insert(*synchronous_ios))
+ {
+ synchronous_ios++;
+ nsync--;
+ if (wakeup == NULL && (worker = pgaio_worker_choose_idle()) >= 0)
+ wakeup = io_worker_control->workers[worker].latch;
+ }
+ LWLockRelease(AioWorkerSubmissionQueueLock);
+ }
+ if (wakeup)
+ SetLatch(wakeup);
}
}
--
2.53.0
From 0e4619a7243c85a2db95a993cc49f6e70ff2258a Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Sat, 22 Mar 2025 00:36:49 +1300
Subject: [PATCH v3 3/3] aio: Adjust I/O worker pool size automatically.
Replace the simple io_workers setting with:
io_min_workers=1
io_max_workers=8 (can be up to 32)
io_worker_idle_timeout=60s
io_worker_launch_interval=100ms
The pool is automatically sized within the configured range according to
recent demand.
Reviewed-by: Dmitry Dolgov <[email protected]>
Discussion: https://postgr.es/m/CA%2BhUKG%2Bm4xV0LMoH2c%3DoRAdEXuCnh%2BtGBTWa7uFeFMGgTLAw%2BQ%40mail.gmail.com
---
doc/src/sgml/config.sgml | 69 ++-
src/backend/postmaster/postmaster.c | 87 ++-
src/backend/storage/aio/method_worker.c | 515 ++++++++++++++----
.../utils/activity/wait_event_names.txt | 1 +
src/backend/utils/misc/guc_parameters.dat | 34 +-
src/backend/utils/misc/postgresql.conf.sample | 6 +-
src/include/storage/io_worker.h | 10 +-
src/include/storage/lwlocklist.h | 1 +
src/include/storage/pmsignal.h | 1 +
src/test/modules/test_aio/t/002_io_workers.pl | 15 +-
src/tools/pgindent/typedefs.list | 1 +
11 files changed, 608 insertions(+), 132 deletions(-)
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 229f41353eb..4c8a133bb4d 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2870,16 +2870,75 @@ include_dir 'conf.d'
</listitem>
</varlistentry>
- <varlistentry id="guc-io-workers" xreflabel="io_workers">
- <term><varname>io_workers</varname> (<type>integer</type>)
+ <varlistentry id="guc-io-min-workers" xreflabel="io_min_workers">
+ <term><varname>io_min_workers</varname> (<type>integer</type>)
<indexterm>
- <primary><varname>io_workers</varname> configuration parameter</primary>
+ <primary><varname>io_min_workers</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
- Selects the number of I/O worker processes to use. The default is
- 3. This parameter can only be set in the
+ Sets the minimum number of I/O worker processes to use. The default is
+ 1. This parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command
+ line.
+ </para>
+ <para>
+ Only has an effect if <xref linkend="guc-io-method"/> is set to
+ <literal>worker</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry id="guc-io-max-workers" xreflabel="io_max_workers">
+ <term><varname>io_max_workers</varname> (<type>int</type>)
+ <indexterm>
+ <primary><varname>io_max_workers</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the maximum number of I/O worker processes to use. The default is
+ 8. This parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command
+ line.
+ </para>
+ <para>
+ Only has an effect if <xref linkend="guc-io-method"/> is set to
+ <literal>worker</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry id="guc-io-worker-idle-timeout" xreflabel="io_worker_idle_timeout">
+ <term><varname>io_worker_idle_timeout</varname> (<type>int</type>)
+ <indexterm>
+ <primary><varname>io_worker_idle_timeout</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the time after which idle I/O worker processes will exit, reducing the
+ size of pool when demand reduces. The default is 1 minute. This
+ parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command
+ line.
+ </para>
+ <para>
+ Only has an effect if <xref linkend="guc-io-method"/> is set to
+ <literal>worker</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry id="guc-io-worker-launch-interval" xreflabel="io_worker_launch_interval">
+ <term><varname>io_worker_launch_interval</varname> (<type>int</type>)
+ <indexterm>
+ <primary><varname>io_worker_launch_interval</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the minimum time between launching new I/O workers. This can be used to avoid
+ creating too many for a short lived burst of demand. The default is 100ms.
+ This parameter can only be set in the
<filename>postgresql.conf</filename> file or on the server command
line.
</para>
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 3fac46c402b..2ff2e43c504 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -408,6 +408,8 @@ static DNSServiceRef bonjour_sdref = NULL;
#endif
/* State for IO worker management. */
+static TimestampTz io_worker_launch_next_time = 0;
+static TimestampTz io_worker_launch_last_time = 0;
static int io_worker_count = 0;
static PMChild *io_worker_children[MAX_IO_WORKERS];
@@ -1550,10 +1552,9 @@ DetermineSleepTime(void)
/*
* Normal case: either there are no background workers at all, or we're in
- * a shutdown sequence (during which we ignore bgworkers altogether).
+ * a shutdown sequence.
*/
- if (Shutdown > NoShutdown ||
- (!StartWorkerNeeded && !HaveCrashedWorker))
+ if (Shutdown > NoShutdown)
{
if (AbortStartTime != 0)
{
@@ -1573,13 +1574,16 @@ DetermineSleepTime(void)
return seconds * 1000;
}
- else
- return 60 * 1000;
}
- if (StartWorkerNeeded)
+ /* Handle background workers, unless we're shutting down. */
+ if (StartWorkerNeeded && Shutdown == NoShutdown)
return 0;
+ /* If we need a new IO worker, defer until launch interval expires. */
+ if (pgaio_worker_test_grow() && io_worker_count < io_max_workers)
+ next_wakeup = io_worker_launch_next_time;
+
if (HaveCrashedWorker)
{
dlist_mutable_iter iter;
@@ -3776,6 +3780,15 @@ process_pm_pmsignal(void)
StartWorkerNeeded = true;
}
+ /* Process IO worker start requests. */
+ if (CheckPostmasterSignal(PMSIGNAL_IO_WORKER_GROW))
+ {
+ /*
+ * No local flag, as the state is exposed through pgaio_worker_*()
+ * functions. This signal is received on potentially actionable level
+ * changes, so that maybe_adjust_io_workers() will run.
+ */
+ }
/* Process background worker state changes. */
if (CheckPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE))
{
@@ -4380,8 +4393,9 @@ maybe_reap_io_worker(int pid)
/*
* Start or stop IO workers, to close the gap between the number of running
* workers and the number of configured workers. Used to respond to change of
- * the io_workers GUC (by increasing and decreasing the number of workers), as
- * well as workers terminating in response to errors (by starting
+ * the io_{min,max}_workers GUCs (by increasing and decreasing the number of
+ * workers) and requests to start a new one due to submission queue backlog,
+ * as well as workers terminating in response to errors (by starting
* "replacement" workers).
*/
static void
@@ -4410,12 +4424,47 @@ maybe_adjust_io_workers(void)
Assert(pmState < PM_WAIT_IO_WORKERS);
- /* Not enough running? */
- while (io_worker_count < io_workers)
+ /* Not enough workers running? */
+ while (io_worker_count < io_max_workers)
{
PMChild *child;
int i;
+ /* Respect launch interval after minimum pool is reached. */
+ if (io_worker_count >= io_min_workers)
+ {
+ TimestampTz now = GetCurrentTimestamp();
+
+ /*
+ * Still waiting for launch interval to expire, or no launch
+ * requested?
+ */
+ if (now < io_worker_launch_next_time ||
+ !pgaio_worker_test_and_clear_grow())
+ break;
+
+ /*
+ * Compute next launch time relative to the existing value, so
+ * that the postmaster's other duties and the advancing clock
+ * don't produce an inaccurate launch interval.
+ */
+ io_worker_launch_next_time =
+ TimestampTzPlusMilliseconds(io_worker_launch_next_time,
+ io_worker_launch_interval);
+
+ /*
+ * If that's already in the past, the interval is either
+ * impossibly short or we received no requests for new workers for
+ * a period. Compute a new future time relative to the last
+ * actual launch time instead, and proceed to launch a worker.
+ */
+ if (io_worker_launch_next_time <= now)
+ io_worker_launch_next_time =
+ TimestampTzPlusMilliseconds(io_worker_launch_last_time,
+ io_worker_launch_interval);
+ io_worker_launch_last_time = now;
+ }
+
/* find unused entry in io_worker_children array */
for (i = 0; i < MAX_IO_WORKERS; ++i)
{
@@ -4436,19 +4485,11 @@ maybe_adjust_io_workers(void)
break; /* try again next time */
}
- /* Too many running? */
- if (io_worker_count > io_workers)
- {
- /* ask the IO worker in the highest slot to exit */
- for (int i = MAX_IO_WORKERS - 1; i >= 0; --i)
- {
- if (io_worker_children[i] != NULL)
- {
- kill(io_worker_children[i]->pid, SIGUSR2);
- break;
- }
- }
- }
+ /*
+ * If there are too many running because io_max_workers changed, that will
+ * be handled by the IO workers themselves so they can shut down in
+ * preferred order.
+ */
}
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index b1b0b6848a0..e4d3348e98e 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -11,9 +11,8 @@
* infrastructure for reopening the file, and must processed synchronously by
* the client code when submitted.
*
- * So that the submitter can make just one system call when submitting a batch
- * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
- * could be improved by using futexes instead of latches to wake N waiters.
+ * The pool tries to stabilize at a size that can handle recently seen
+ * variation in demand, within the configured limits.
*
* This method of AIO is available in all builds on all operating systems, and
* is the default.
@@ -29,6 +28,8 @@
#include "postgres.h"
+#include <limits.h>
+
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "port/pg_bitutils.h"
@@ -40,6 +41,8 @@
#include "storage/io_worker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
+#include "storage/lwlock.h"
+#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/injection_point.h"
@@ -47,10 +50,11 @@
#include "utils/ps_status.h"
#include "utils/wait_event.h"
+/* Saturation for stats counters used to estimate wakeup:work ratio. */
+#define PGAIO_WORKER_STATS_MAX 4
-/* How many workers should each worker wake up if needed? */
-#define IO_WORKER_WAKEUP_FANOUT 2
-
+/* Debugging only: show activity and statistics in ps command line. */
+/* #define PGAIO_WORKER_SHOW_PS_INFO */
typedef struct PgAioWorkerSubmissionQueue
{
@@ -62,17 +66,37 @@ typedef struct PgAioWorkerSubmissionQueue
typedef struct PgAioWorkerSlot
{
- Latch *latch;
- bool in_use;
+ ProcNumber proc_number;
} PgAioWorkerSlot;
+/*
+ * Sets of worker IDs are held in a simple bitmap, accessed through functions
+ * that provide a more readable abstraction. If we wanted to support more
+ * workers than that, the contention on the single queue would surely get too
+ * high, so we might want to consider multiple pools instead of widening this.
+ */
+typedef uint64 PgAioWorkerSet;
+
+#define PGAIO_WORKER_SET_BITS (sizeof(PgAioWorkerSet) * CHAR_BIT)
+
+static_assert(PGAIO_WORKER_SET_BITS >= MAX_IO_WORKERS, "too small");
+
typedef struct PgAioWorkerControl
{
- uint64 idle_worker_mask;
+ /* Seen by postmaster */
+ volatile bool grow;
+
+ /* Protected by AioWorkerSubmissionQueueLock. */
+ PgAioWorkerSet idle_worker_set;
+
+ /* Protected by AioWorkerControlLock. */
+ PgAioWorkerSet worker_set;
+ int nworkers;
+
+ /* Protected by AioWorkerControlLock. */
PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
} PgAioWorkerControl;
-
static size_t pgaio_worker_shmem_size(void);
static void pgaio_worker_shmem_init(bool first_time);
@@ -90,15 +114,103 @@ const IoMethodOps pgaio_worker_ops = {
/* GUCs */
-int io_workers = 3;
+int io_min_workers = 1;
+int io_max_workers = 8;
+int io_worker_idle_timeout = 60000;
+int io_worker_launch_interval = 100;
static int io_worker_queue_size = 64;
-static int MyIoWorkerId;
+static int MyIoWorkerId = -1;
static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
static PgAioWorkerControl *io_worker_control;
+static void
+pgaio_worker_set_initialize(PgAioWorkerSet *set)
+{
+ *set = 0;
+}
+
+static bool
+pgaio_worker_set_is_empty(PgAioWorkerSet *set)
+{
+ return *set == 0;
+}
+
+static PgAioWorkerSet
+pgaio_worker_set_singleton(int worker)
+{
+ return UINT64_C(1) << worker;
+}
+
+static void
+pgaio_worker_set_fill(PgAioWorkerSet *set)
+{
+ *set = UINT64_MAX >> (PGAIO_WORKER_SET_BITS - MAX_IO_WORKERS);
+}
+
+static void
+pgaio_worker_set_subtract(PgAioWorkerSet *set1, const PgAioWorkerSet *set2)
+{
+ *set1 &= ~*set2;
+}
+
+static void
+pgaio_worker_set_insert(PgAioWorkerSet *set, int worker)
+{
+ *set |= pgaio_worker_set_singleton(worker);
+}
+
+static void
+pgaio_worker_set_remove(PgAioWorkerSet *set, int worker)
+{
+ *set &= ~pgaio_worker_set_singleton(worker);
+}
+
+static void
+pgaio_worker_set_remove_less_than(PgAioWorkerSet *set, int worker)
+{
+ *set &= ~(pgaio_worker_set_singleton(worker) - 1);
+}
+
+static int
+pgaio_worker_set_get_highest(PgAioWorkerSet *set)
+{
+ Assert(!pgaio_worker_set_is_empty(set));
+ return pg_leftmost_one_pos64(*set);
+}
+
+static int
+pgaio_worker_set_get_lowest(PgAioWorkerSet *set)
+{
+ Assert(!pgaio_worker_set_is_empty(set));
+ return pg_rightmost_one_pos64(*set);
+}
+
+static int
+pgaio_worker_set_pop_lowest(PgAioWorkerSet *set)
+{
+ int worker = pgaio_worker_set_get_lowest(set);
+
+ pgaio_worker_set_remove(set, worker);
+ return worker;
+}
+
+#ifdef USE_ASSERT_CHECKING
+static bool
+pgaio_worker_set_contains(PgAioWorkerSet *set, int worker)
+{
+ return (*set & pgaio_worker_set_singleton(worker)) != 0;
+}
+
+static int
+pgaio_worker_set_count(PgAioWorkerSet *set)
+{
+ return pg_popcount64(*set);
+}
+#endif
+
static size_t
pgaio_worker_queue_shmem_size(int *queue_size)
{
@@ -151,37 +263,113 @@ pgaio_worker_shmem_init(bool first_time)
&found);
if (!found)
{
- io_worker_control->idle_worker_mask = 0;
+ io_worker_control->grow = false;
+ pgaio_worker_set_initialize(&io_worker_control->worker_set);
+ pgaio_worker_set_initialize(&io_worker_control->idle_worker_set);
for (int i = 0; i < MAX_IO_WORKERS; ++i)
+ io_worker_control->workers[i].proc_number = INVALID_PROC_NUMBER;
+ }
+}
+
+static void
+pgaio_worker_grow(bool grow)
+{
+ /*
+ * This is called from sites that don't hold AioWorkerControlLock, but
+ * these values change infrequently and an up-to-date value is not
+ * required for this heuristic purpose.
+ */
+ if (!grow)
+ {
+ /* Avoid dirtying memory if not already set. */
+ if (io_worker_control->grow)
+ io_worker_control->grow = false;
+ }
+ else
+ {
+ /* Do nothing if request already pending. */
+ if (!io_worker_control->grow)
{
- io_worker_control->workers[i].latch = NULL;
- io_worker_control->workers[i].in_use = false;
+ io_worker_control->grow = true;
+ SendPostmasterSignal(PMSIGNAL_IO_WORKER_GROW);
}
}
}
+/*
+ * Called by the postmaster to check if a new worker is needed.
+ */
+bool
+pgaio_worker_test_grow(void)
+{
+ return io_worker_control && io_worker_control->grow;
+}
+
+/*
+ * Called by the postmaster to check if a new worker is needed when it's ready
+ * to launch one, and clear the flag.
+ */
+bool
+pgaio_worker_test_and_clear_grow(void)
+{
+ bool result;
+
+ result = io_worker_control->grow;
+ if (result)
+ io_worker_control->grow = false;
+
+ return result;
+}
+
static int
-pgaio_worker_choose_idle(void)
+pgaio_worker_choose_idle(int minimum_worker)
{
+ PgAioWorkerSet worker_set;
int worker;
- if (io_worker_control->idle_worker_mask == 0)
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
+ worker_set = io_worker_control->idle_worker_set;
+ pgaio_worker_set_remove_less_than(&worker_set, minimum_worker);
+ if (pgaio_worker_set_is_empty(&worker_set))
return -1;
- /* Find the lowest bit position, and clear it. */
- worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
- io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
- Assert(io_worker_control->workers[worker].in_use);
+ /* Find the lowest numbered idle worker and mark it not idle. */
+ worker = pgaio_worker_set_get_lowest(&worker_set);
+ pgaio_worker_set_remove(&io_worker_control->idle_worker_set, worker);
return worker;
}
+/*
+ * Try to wake a worker by setting its latch, to tell it there are IOs to
+ * process in the submission queue.
+ */
+static void
+pgaio_worker_wake(int worker)
+{
+ ProcNumber proc_number;
+
+ /*
+ * If the selected worker is concurrently exiting, then pgaio_worker_die()
+ * had not yet removed it as of when we saw it in idle_worker_set. That's
+ * OK, because it will wake all remaining workers to close wakeup-vs-exit
+ * races: *someone* will see the queued IO. If there are no workers
+ * running, the postmaster will start a new one.
+ */
+ proc_number = io_worker_control->workers[worker].proc_number;
+ if (proc_number != INVALID_PROC_NUMBER)
+ SetLatch(&GetPGProcByNumber(proc_number)->procLatch);
+}
+
static bool
pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
{
PgAioWorkerSubmissionQueue *queue;
uint32 new_head;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
queue = io_worker_submission_queue;
new_head = (queue->head + 1) & (queue->size - 1);
if (new_head == queue->tail)
@@ -203,6 +391,8 @@ pgaio_worker_submission_queue_consume(void)
PgAioWorkerSubmissionQueue *queue;
int result;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
queue = io_worker_submission_queue;
if (queue->tail == queue->head)
return -1; /* empty */
@@ -219,6 +409,8 @@ pgaio_worker_submission_queue_depth(void)
uint32 head;
uint32 tail;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
head = io_worker_submission_queue->head;
tail = io_worker_submission_queue->tail;
@@ -244,8 +436,7 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
{
PgAioHandle **synchronous_ios = NULL;
int nsync = 0;
- Latch *wakeup = NULL;
- int worker;
+ int worker = -1;
Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
@@ -269,18 +460,12 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
break;
}
+ }
- if (wakeup == NULL)
- {
- /* Choose an idle worker to wake up if we haven't already. */
- worker = pgaio_worker_choose_idle();
- if (worker >= 0)
- wakeup = io_worker_control->workers[worker].latch;
-
- pgaio_debug_io(DEBUG4, staged_ios[i],
- "choosing worker %d",
- worker);
- }
+ if (worker == -1)
+ {
+ /* Choose an idle worker to wake up if we haven't already. */
+ worker = pgaio_worker_choose_idle(0);
}
LWLockRelease(AioWorkerSubmissionQueueLock);
}
@@ -291,8 +476,12 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
nsync = num_staged_ios;
}
- if (wakeup)
- SetLatch(wakeup);
+ /*
+ * If we didn't find a worker to wake up, the existing workers will
+ * determine whether the pool is too small.
+ */
+ if (worker != -1)
+ pgaio_worker_wake(worker);
/* Run whatever is left synchronously. */
while (nsync > 0)
@@ -303,7 +492,7 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
/* Between synchronous operations, try to enqueue again. */
if (nsync > 0)
{
- wakeup = NULL;
+ worker = -1;
if (LWLockConditionalAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE))
{
while (nsync > 0 &&
@@ -311,13 +500,13 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
{
synchronous_ios++;
nsync--;
- if (wakeup == NULL && (worker = pgaio_worker_choose_idle()) >= 0)
- wakeup = io_worker_control->workers[worker].latch;
+ if (worker == -1)
+ worker = pgaio_worker_choose_idle(0);
}
LWLockRelease(AioWorkerSubmissionQueueLock);
}
- if (wakeup)
- SetLatch(wakeup);
+ if (worker != -1)
+ pgaio_worker_wake(worker);
}
}
@@ -331,14 +520,27 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
static void
pgaio_worker_die(int code, Datum arg)
{
- LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
- Assert(io_worker_control->workers[MyIoWorkerId].in_use);
- Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+ PgAioWorkerSet notify_set;
- io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
- io_worker_control->workers[MyIoWorkerId].in_use = false;
- io_worker_control->workers[MyIoWorkerId].latch = NULL;
+ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ pgaio_worker_set_remove(&io_worker_control->idle_worker_set, MyIoWorkerId);
LWLockRelease(AioWorkerSubmissionQueueLock);
+
+ LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
+ Assert(io_worker_control->workers[MyIoWorkerId].proc_number == MyProcNumber);
+ io_worker_control->workers[MyIoWorkerId].proc_number = INVALID_PROC_NUMBER;
+ Assert(pgaio_worker_set_contains(&io_worker_control->worker_set, MyIoWorkerId));
+ pgaio_worker_set_remove(&io_worker_control->worker_set, MyIoWorkerId);
+ notify_set = io_worker_control->worker_set;
+ Assert(io_worker_control->nworkers > 0);
+ io_worker_control->nworkers--;
+ Assert(pgaio_worker_set_count(&io_worker_control->worker_set) ==
+ io_worker_control->nworkers);
+ LWLockRelease(AioWorkerControlLock);
+
+ /* Notify other workers on pool change. */
+ while (!pgaio_worker_set_is_empty(¬ify_set))
+ pgaio_worker_wake(pgaio_worker_set_pop_lowest(¬ify_set));
}
/*
@@ -348,33 +550,34 @@ pgaio_worker_die(int code, Datum arg)
static void
pgaio_worker_register(void)
{
+ PgAioWorkerSet free_worker_set;
+ PgAioWorkerSet old_worker_set;
+
MyIoWorkerId = -1;
- /*
- * XXX: This could do with more fine-grained locking. But it's also not
- * very common for the number of workers to change at the moment...
- */
- LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
+ pgaio_worker_set_fill(&free_worker_set);
+ pgaio_worker_set_subtract(&free_worker_set, &io_worker_control->worker_set);
+ if (!pgaio_worker_set_is_empty(&free_worker_set))
+ MyIoWorkerId = pgaio_worker_set_get_lowest(&free_worker_set);
+ if (MyIoWorkerId == -1)
+ elog(ERROR, "couldn't find a free worker ID");
- for (int i = 0; i < MAX_IO_WORKERS; ++i)
- {
- if (!io_worker_control->workers[i].in_use)
- {
- Assert(io_worker_control->workers[i].latch == NULL);
- io_worker_control->workers[i].in_use = true;
- MyIoWorkerId = i;
- break;
- }
- else
- Assert(io_worker_control->workers[i].latch != NULL);
- }
+ Assert(io_worker_control->workers[MyIoWorkerId].proc_number ==
+ INVALID_PROC_NUMBER);
+ io_worker_control->workers[MyIoWorkerId].proc_number = MyProcNumber;
- if (MyIoWorkerId == -1)
- elog(ERROR, "couldn't find a free worker slot");
+ old_worker_set = io_worker_control->worker_set;
+ Assert(!pgaio_worker_set_contains(&old_worker_set, MyIoWorkerId));
+ pgaio_worker_set_insert(&io_worker_control->worker_set, MyIoWorkerId);
+ io_worker_control->nworkers++;
+ Assert(pgaio_worker_set_count(&io_worker_control->worker_set) ==
+ io_worker_control->nworkers);
+ LWLockRelease(AioWorkerControlLock);
- io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
- io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
- LWLockRelease(AioWorkerSubmissionQueueLock);
+ /* Notify other workers on pool change. */
+ while (!pgaio_worker_set_is_empty(&old_worker_set))
+ pgaio_worker_wake(pgaio_worker_set_pop_lowest(&old_worker_set));
on_shmem_exit(pgaio_worker_die, 0);
}
@@ -400,14 +603,47 @@ pgaio_worker_error_callback(void *arg)
errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
}
+/*
+ * Check if this backend is allowed to time out, and thus should use a
+ * non-infinite sleep time. Only the highest-numbered worker is allowed to
+ * time out, and only if the pool is above io_min_workers. Serializing
+ * timeouts keeps IDs in a range 0..N without gaps, and avoids undershooting
+ * io_min_workers.
+ *
+ * The result is only instantaneously true and may be temporarily inconsistent
+ * in different workers around transitions, but all workers are woken up on
+ * pool size or GUC changes making the result eventually consistent.
+ */
+static bool
+pgaio_worker_can_timeout(void)
+{
+ PgAioWorkerSet worker_set;
+
+ /* Serialize against pool size changes. */
+ LWLockAcquire(AioWorkerControlLock, LW_SHARED);
+ worker_set = io_worker_control->worker_set;
+ LWLockRelease(AioWorkerControlLock);
+
+ if (MyIoWorkerId != pgaio_worker_set_get_highest(&worker_set))
+ return false;
+ if (MyIoWorkerId < io_min_workers)
+ return false;
+
+ return true;
+}
+
void
IoWorkerMain(const void *startup_data, size_t startup_data_len)
{
sigjmp_buf local_sigjmp_buf;
+ TimestampTz idle_timeout_abs = 0;
+ int timeout_guc_used = 0;
PgAioHandle *volatile error_ioh = NULL;
ErrorContextCallback errcallback = {0};
volatile int error_errno = 0;
char cmd[128];
+ int ios = 0;
+ int wakeups = 0;
AuxiliaryProcessMainCommon();
@@ -475,10 +711,9 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
while (!ShutdownRequestPending)
{
uint32 io_index;
- Latch *latches[IO_WORKER_WAKEUP_FANOUT];
- int nlatches = 0;
- int nwakeups = 0;
- int worker;
+ int worker = -1;
+ int queue_depth = 0;
+ bool grow = false;
/*
* Try to get a job to do.
@@ -489,38 +724,55 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
if ((io_index = pgaio_worker_submission_queue_consume()) == -1)
{
- /*
- * Nothing to do. Mark self idle.
- *
- * XXX: Invent some kind of back pressure to reduce useless
- * wakeups?
- */
- io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+ /* Nothing to do. Mark self idle. */
+ pgaio_worker_set_insert(&io_worker_control->idle_worker_set,
+ MyIoWorkerId);
}
else
{
/* Got one. Clear idle flag. */
- io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
+ pgaio_worker_set_remove(&io_worker_control->idle_worker_set,
+ MyIoWorkerId);
- /* See if we can wake up some peers. */
- nwakeups = Min(pgaio_worker_submission_queue_depth(),
- IO_WORKER_WAKEUP_FANOUT);
- for (int i = 0; i < nwakeups; ++i)
+ /*
+ * See if we should wake up a higher numbered peer. Only do this
+ * if this worker is itself not receiving spurious wakeups. This
+ * heuristic discovers the useful wakeup propagation chain length.
+ */
+ if (wakeups <= ios)
{
- if ((worker = pgaio_worker_choose_idle()) < 0)
- break;
- latches[nlatches++] = io_worker_control->workers[worker].latch;
+ queue_depth = pgaio_worker_submission_queue_depth();
+ worker = pgaio_worker_choose_idle(MyIoWorkerId + 1);
+
+ /*
+ * If there were no idle higher numbered peers and there are
+ * more than enough IOs queued for me and all lower numbered
+ * peers, then try to start a new worker.
+ */
+ if (worker == -1 && queue_depth > MyIoWorkerId)
+ grow = true;
}
}
LWLockRelease(AioWorkerSubmissionQueueLock);
- for (int i = 0; i < nlatches; ++i)
- SetLatch(latches[i]);
+ /* Propagate wakeups. */
+ if (worker != -1)
+ pgaio_worker_wake(worker);
+ else if (grow)
+ pgaio_worker_grow(true);
if (io_index != -1)
{
PgAioHandle *ioh = NULL;
+ /* Cancel timeout and update wakeup:work ratio. */
+ idle_timeout_abs = 0;
+ if (++ios == PGAIO_WORKER_STATS_MAX)
+ {
+ ios /= 2;
+ wakeups /= 2;
+ }
+
ioh = &pgaio_ctl->io_handles[io_index];
error_ioh = ioh;
errcallback.arg = ioh;
@@ -573,6 +825,14 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
}
#endif
+#ifdef PGAIO_WORKER_SHOW_PS_INFO
+ sprintf(cmd, "%d: [%s] %s",
+ MyIoWorkerId,
+ pgaio_io_get_op_name(ioh),
+ pgaio_io_get_target_description(ioh));
+ set_ps_display(cmd);
+#endif
+
/*
* We don't expect this to ever fail with ERROR or FATAL, no need
* to keep error_ioh set to the IO.
@@ -586,8 +846,75 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
}
else
{
- WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
- WAIT_EVENT_IO_WORKER_MAIN);
+ int timeout_ms;
+
+ /* Cancel new worker if pending. */
+ pgaio_worker_grow(false);
+
+ /* Compute the remaining allowed idle time. */
+ if (io_worker_idle_timeout == -1)
+ {
+ /* Never time out. */
+ timeout_ms = -1;
+ }
+ else
+ {
+ TimestampTz now = GetCurrentTimestamp();
+
+ /* If the GUC changes, reset timer. */
+ if (idle_timeout_abs != 0 &&
+ io_worker_idle_timeout != timeout_guc_used)
+ idle_timeout_abs = 0;
+
+ /* On first sleep, compute absolute timeout. */
+ if (idle_timeout_abs == 0)
+ {
+ idle_timeout_abs =
+ TimestampTzPlusMilliseconds(now,
+ io_worker_idle_timeout);
+ timeout_guc_used = io_worker_idle_timeout;
+ }
+
+ /*
+ * All workers maintain the absolute timeout value, but only
+ * the highest worker can actually time out and only if
+ * io_min_workers is satisfied. All others wait only for
+ * explicit wakeups caused by queue insertion, wakeup
+ * propagation, change of pool size (possibly promoting one to
+ * new highest) or GUC reload.
+ */
+ if (pgaio_worker_can_timeout())
+ timeout_ms =
+ TimestampDifferenceMilliseconds(now,
+ idle_timeout_abs);
+ else
+ timeout_ms = -1;
+ }
+
+#ifdef PGAIO_WORKER_SHOW_PS_INFO
+ sprintf(cmd, "%d: idle, ios:wakeups = %d:%d",
+ MyIoWorkerId, ios, wakeups);
+ set_ps_display(cmd);
+#endif
+
+ if (WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
+ timeout_ms,
+ WAIT_EVENT_IO_WORKER_MAIN) == WL_TIMEOUT)
+ {
+ /* WL_TIMEOUT */
+ if (pgaio_worker_can_timeout())
+ if (GetCurrentTimestamp() >= idle_timeout_abs)
+ break;
+ }
+ else
+ {
+ /* WL_LATCH_SET */
+ if (++wakeups == PGAIO_WORKER_STATS_MAX)
+ {
+ ios /= 2;
+ wakeups /= 2;
+ }
+ }
ResetLatch(MyLatch);
}
@@ -597,6 +924,10 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
+
+ /* If io_max_workers has been decreased, exit highest first. */
+ if (MyIoWorkerId >= io_max_workers)
+ break;
}
}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 6be80d2daad..5b58f45e18b 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -365,6 +365,7 @@ SerialControl "Waiting to read or update shared <filename>pg_serial</filename> s
AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue."
WaitLSN "Waiting to read or update shared Wait-for-LSN state."
LogicalDecodingControl "Waiting to read or update logical decoding status information."
+AioWorkerControl "Waiting to update AIO worker information."
#
# END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 0a862693fcd..9301f7f806f 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -1381,6 +1381,14 @@
check_hook => 'check_io_max_concurrency',
},
+{ name => 'io_max_workers', type => 'int', context => 'PGC_SIGHUP', group => 'RESOURCES_IO',
+ short_desc => 'Maximum number of I/O worker processes, for io_method=worker.',
+ variable => 'io_max_workers',
+ boot_val => '8',
+ min => '1',
+ max => 'MAX_IO_WORKERS',
+},
+
{ name => 'io_method', type => 'enum', context => 'PGC_POSTMASTER', group => 'RESOURCES_IO',
short_desc => 'Selects the method for executing asynchronous I/O.',
variable => 'io_method',
@@ -1389,14 +1397,32 @@
assign_hook => 'assign_io_method',
},
-{ name => 'io_workers', type => 'int', context => 'PGC_SIGHUP', group => 'RESOURCES_IO',
- short_desc => 'Number of IO worker processes, for io_method=worker.',
- variable => 'io_workers',
- boot_val => '3',
+{ name => 'io_min_workers', type => 'int', context => 'PGC_SIGHUP', group => 'RESOURCES_IO',
+ short_desc => 'Minimum number of I/O worker processes, for io_method=worker.',
+ variable => 'io_min_workers',
+ boot_val => '1',
min => '1',
max => 'MAX_IO_WORKERS',
},
+{ name => 'io_worker_idle_timeout', type => 'int', context => 'PGC_SIGHUP', group => 'RESOURCES_IO',
+ short_desc => 'Maximum time before idle I/O worker processes time out, for io_method=worker.',
+ variable => 'io_worker_idle_timeout',
+ flags => 'GUC_UNIT_MS',
+ boot_val => '60000',
+ min => '0',
+ max => 'INT_MAX',
+},
+
+{ name => 'io_worker_launch_interval', type => 'int', context => 'PGC_SIGHUP', group => 'RESOURCES_IO',
+ short_desc => 'Minimum time before launching a new I/O worker process, for io_method=worker.',
+ variable => 'io_worker_launch_interval',
+ flags => 'GUC_UNIT_MS',
+ boot_val => '100',
+ min => '0',
+ max => 'INT_MAX',
+},
+
# Not for general use --- used by SET SESSION AUTHORIZATION and SET
# ROLE
{ name => 'is_superuser', type => 'bool', context => 'PGC_INTERNAL', group => 'UNGROUPED',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index cf15597385b..643dd2866e0 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -218,7 +218,11 @@
# can execute simultaneously
# -1 sets based on shared_buffers
# (change requires restart)
-#io_workers = 3 # 1-32;
+
+#io_min_workers = 1 # 1-32 (change requires pg_reload_conf())
+#io_max_workers = 8 # 1-32
+#io_worker_idle_timeout = 60s
+#io_worker_launch_interval = 100ms
# - Worker Processes -
diff --git a/src/include/storage/io_worker.h b/src/include/storage/io_worker.h
index f7d5998a138..40559fb2428 100644
--- a/src/include/storage/io_worker.h
+++ b/src/include/storage/io_worker.h
@@ -17,6 +17,14 @@
pg_noreturn extern void IoWorkerMain(const void *startup_data, size_t startup_data_len);
-extern PGDLLIMPORT int io_workers;
+/* Public GUCs. */
+extern PGDLLIMPORT int io_min_workers;
+extern PGDLLIMPORT int io_max_workers;
+extern PGDLLIMPORT int io_worker_idle_timeout;
+extern PGDLLIMPORT int io_worker_launch_interval;
+
+/* Interfaces visible to the postmaster. */
+extern bool pgaio_worker_test_grow(void);
+extern bool pgaio_worker_test_and_clear_grow(void);
#endif /* IO_WORKER_H */
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 59ee097977d..e39d5a947fa 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -87,6 +87,7 @@ PG_LWLOCK(52, SerialControl)
PG_LWLOCK(53, AioWorkerSubmissionQueue)
PG_LWLOCK(54, WaitLSN)
PG_LWLOCK(55, LogicalDecodingControl)
+PG_LWLOCK(56, AioWorkerControl)
/*
* There also exist several built-in LWLock tranches. As with the predefined
diff --git a/src/include/storage/pmsignal.h b/src/include/storage/pmsignal.h
index 206fb78f8a5..00e1b426d69 100644
--- a/src/include/storage/pmsignal.h
+++ b/src/include/storage/pmsignal.h
@@ -38,6 +38,7 @@ typedef enum
PMSIGNAL_ROTATE_LOGFILE, /* send SIGUSR1 to syslogger to rotate logfile */
PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */
PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */
+ PMSIGNAL_IO_WORKER_GROW, /* I/O worker pool wants to grow */
PMSIGNAL_BACKGROUND_WORKER_CHANGE, /* background worker state change */
PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */
PMSIGNAL_ADVANCE_STATE_MACHINE, /* advance postmaster's state machine */
diff --git a/src/test/modules/test_aio/t/002_io_workers.pl b/src/test/modules/test_aio/t/002_io_workers.pl
index 34bc132ea08..b9775811d4d 100644
--- a/src/test/modules/test_aio/t/002_io_workers.pl
+++ b/src/test/modules/test_aio/t/002_io_workers.pl
@@ -14,6 +14,9 @@ $node->init();
$node->append_conf(
'postgresql.conf', qq(
io_method=worker
+io_worker_idle_timeout=0ms
+io_worker_launch_interval=0ms
+io_max_workers=32
));
$node->start();
@@ -31,7 +34,7 @@ sub test_number_of_io_workers_dynamic
{
my $node = shift;
- my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_workers');
+ my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_min_workers');
# Verify that worker count can't be set to 0
change_number_of_io_workers($node, 0, $prev_worker_count, 1);
@@ -62,24 +65,24 @@ sub change_number_of_io_workers
my ($result, $stdout, $stderr);
($result, $stdout, $stderr) =
- $node->psql('postgres', "ALTER SYSTEM SET io_workers = $worker_count");
+ $node->psql('postgres', "ALTER SYSTEM SET io_min_workers = $worker_count");
$node->safe_psql('postgres', 'SELECT pg_reload_conf()');
if ($expect_failure)
{
like(
$stderr,
- qr/$worker_count is outside the valid range for parameter "io_workers"/,
- "updating number of io_workers to $worker_count failed, as expected"
+ qr/$worker_count is outside the valid range for parameter "io_min_workers"/,
+ "updating io_min_workers to $worker_count failed, as expected"
);
return $prev_worker_count;
}
else
{
- is( $node->safe_psql('postgres', 'SHOW io_workers'),
+ is( $node->safe_psql('postgres', 'SHOW io_min_workers'),
$worker_count,
- "updating number of io_workers from $prev_worker_count to $worker_count"
+ "updating number of io_min_workers from $prev_worker_count to $worker_count"
);
check_io_worker_count($node, $worker_count);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e3c1007abdf..7e340a9e791 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2250,6 +2250,7 @@ PgAioUringCaps
PgAioUringContext
PgAioWaitRef
PgAioWorkerControl
+PgAioWorkerSet
PgAioWorkerSlot
PgAioWorkerSubmissionQueue
PgArchData
--
2.53.0