On Wed, May 28, 2025 at 5:55 AM Dmitry Dolgov <9erthali...@gmail.com> wrote: > I probably had to start with a statement that I find the current approach > reasonable, and I'm only curious if there is more to get out of it. I haven't > benchmarked the patch yet (plan getting to it when I'll get back), and can > imagine practical considerations significantly impacting any potential > solution.
Here's a rebase.
From fa7aac1bc9c0a47fbdbd9459424f08fa61b71ce2 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Fri, 11 Apr 2025 21:17:26 +1200 Subject: [PATCH v2 1/2] aio: Try repeatedly to give batched IOs to workers. Previously, when the submission queue was full we'd run all remaining IOs in a batched submissoin synchronously. Andres rightly pointed out that we should really try again between synchronous IOs, since the workers might have made progress in draining the queue. Suggested-by: Andres Freund <and...@anarazel.de> Discussion: https://postgr.es/m/CA%2BhUKG%2Bm4xV0LMoH2c%3DoRAdEXuCnh%2BtGBTWa7uFeFMGgTLAw%2BQ%40mail.gmail.com --- src/backend/storage/aio/method_worker.c | 30 ++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index bf8f77e6ff6..9a82d5f847d 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -282,12 +282,36 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios) SetLatch(wakeup); /* Run whatever is left synchronously. */ - if (nsync > 0) + for (int i = 0; i < nsync; ++i) { - for (int i = 0; i < nsync; ++i) + wakeup = NULL; + + /* + * Between synchronous IO operations, try again to enqueue as many as + * we can. + */ + if (i > 0) { - pgaio_io_perform_synchronously(synchronous_ios[i]); + wakeup = NULL; + + LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); + while (i < nsync && + pgaio_worker_submission_queue_insert(synchronous_ios[i])) + { + if (wakeup == NULL && (worker = pgaio_worker_choose_idle()) >= 0) + wakeup = io_worker_control->workers[worker].latch; + i++; + } + LWLockRelease(AioWorkerSubmissionQueueLock); + + if (wakeup) + SetLatch(wakeup); + + if (i == nsync) + break; } + + pgaio_io_perform_synchronously(synchronous_ios[i]); } } -- 2.47.2
From a0a5fff1f1d21c002bf68d36de9aff21bdf61783 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sat, 22 Mar 2025 00:36:49 +1300 Subject: [PATCH v2 2/2] aio: Adjust IO worker pool size automatically. Replace the simple io_workers setting with: io_min_workers=1 io_max_workers=8 io_worker_idle_timeout=60s io_worker_launch_interval=500ms The pool is automatically sized within the configured range according to demand. Discussion: https://postgr.es/m/CA%2BhUKG%2Bm4xV0LMoH2c%3DoRAdEXuCnh%2BtGBTWa7uFeFMGgTLAw%2BQ%40mail.gmail.com --- doc/src/sgml/config.sgml | 70 ++- src/backend/postmaster/postmaster.c | 64 ++- src/backend/storage/aio/method_worker.c | 445 ++++++++++++++---- .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_tables.c | 46 +- src/backend/utils/misc/postgresql.conf.sample | 5 +- src/include/storage/io_worker.h | 9 +- src/include/storage/lwlocklist.h | 1 + src/include/storage/pmsignal.h | 1 + src/test/modules/test_aio/t/002_io_workers.pl | 15 +- 10 files changed, 535 insertions(+), 122 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index c7acc0f182f..98532e55041 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2787,16 +2787,76 @@ include_dir 'conf.d' </listitem> </varlistentry> - <varlistentry id="guc-io-workers" xreflabel="io_workers"> - <term><varname>io_workers</varname> (<type>integer</type>) + <varlistentry id="guc-io-min-workers" xreflabel="io_min_workers"> + <term><varname>io_min_workers</varname> (<type>integer</type>) <indexterm> - <primary><varname>io_workers</varname> configuration parameter</primary> + <primary><varname>io_min_workers</varname> configuration parameter</primary> </indexterm> </term> <listitem> <para> - Selects the number of I/O worker processes to use. The default is - 3. This parameter can only be set in the + Sets the minimum number of I/O worker processes to use. The default is + 1. This parameter can only be set in the + <filename>postgresql.conf</filename> file or on the server command + line. + </para> + <para> + Only has an effect if <xref linkend="guc-io-method"/> is set to + <literal>worker</literal>. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-io-max-workers" xreflabel="io_max_workers"> + <term><varname>io_max_workers</varname> (<type>int</type>) + <indexterm> + <primary><varname>io_max_workers</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Sets the maximum number of I/O worker processes to use. The default is + 8. This parameter can only be set in the + <filename>postgresql.conf</filename> file or on the server command + line. + </para> + <para> + Only has an effect if <xref linkend="guc-io-method"/> is set to + <literal>worker</literal>. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-io-worker-idle-timeout" xreflabel="io_worker_idle_timeout"> + <term><varname>io_worker_idle_timeout</varname> (<type>int</type>) + <indexterm> + <primary><varname>io_worker_idle_timeout</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Sets the time after which idle I/O worker processes will exit, reducing the + maximum size of the I/O worker pool towards the minimum. The default + is 1 minute. + This parameter can only be set in the + <filename>postgresql.conf</filename> file or on the server command + line. + </para> + <para> + Only has an effect if <xref linkend="guc-io-method"/> is set to + <literal>worker</literal>. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-io-worker-launch-interval" xreflabel="io_worker_launch_interval"> + <term><varname>io_worker_launch_interval</varname> (<type>int</type>) + <indexterm> + <primary><varname>io_worker_launch_interval</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Sets the minimum time between launching new I/O workers. This can be used to avoid + sudden bursts of new I/O workers. The default is 100ms. + This parameter can only be set in the <filename>postgresql.conf</filename> file or on the server command line. </para> diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index cca9b946e53..a5438fa079d 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -408,6 +408,7 @@ static DNSServiceRef bonjour_sdref = NULL; #endif /* State for IO worker management. */ +static TimestampTz io_worker_launch_delay_until = 0; static int io_worker_count = 0; static PMChild *io_worker_children[MAX_IO_WORKERS]; @@ -1569,6 +1570,15 @@ DetermineSleepTime(void) if (StartWorkerNeeded) return 0; + /* If we need a new IO worker, defer until launch delay expires. */ + if (pgaio_worker_test_new_worker_needed() && + io_worker_count < io_max_workers) + { + if (io_worker_launch_delay_until == 0) + return 0; + next_wakeup = io_worker_launch_delay_until; + } + if (HaveCrashedWorker) { dlist_mutable_iter iter; @@ -3750,6 +3760,15 @@ process_pm_pmsignal(void) StartWorkerNeeded = true; } + /* Process IO worker start requets. */ + if (CheckPostmasterSignal(PMSIGNAL_IO_WORKER_CHANGE)) + { + /* + * No local flag, as the state is exposed through pgaio_worker_*() + * functions. This signal is received on potentially actionable level + * changes, so that maybe_adjust_io_workers() will run. + */ + } /* Process background worker state changes. */ if (CheckPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE)) { @@ -4355,8 +4374,9 @@ maybe_reap_io_worker(int pid) /* * Start or stop IO workers, to close the gap between the number of running * workers and the number of configured workers. Used to respond to change of - * the io_workers GUC (by increasing and decreasing the number of workers), as - * well as workers terminating in response to errors (by starting + * the io_{min,max}_workers GUCs (by increasing and decreasing the number of + * workers) and requests to start a new one due to submission queue backlog, + * as well as workers terminating in response to errors (by starting * "replacement" workers). */ static void @@ -4385,8 +4405,16 @@ maybe_adjust_io_workers(void) Assert(pmState < PM_WAIT_IO_WORKERS); - /* Not enough running? */ - while (io_worker_count < io_workers) + /* Cancel the launch delay when it expires to minimize clock access. */ + if (io_worker_launch_delay_until != 0 && + io_worker_launch_delay_until <= GetCurrentTimestamp()) + io_worker_launch_delay_until = 0; + + /* Not enough workers running? */ + while (io_worker_launch_delay_until == 0 && + io_worker_count < io_max_workers && + ((io_worker_count < io_min_workers || + pgaio_worker_clear_new_worker_needed()))) { PMChild *child; int i; @@ -4400,6 +4428,16 @@ maybe_adjust_io_workers(void) if (i == MAX_IO_WORKERS) elog(ERROR, "could not find a free IO worker slot"); + /* + * Apply launch delay even for failures to avoid retrying too fast on + * fork() failure, but not while we're still building the minimum pool + * size. + */ + if (io_worker_count >= io_min_workers) + io_worker_launch_delay_until = + TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + io_worker_launch_interval); + /* Try to launch one. */ child = StartChildProcess(B_IO_WORKER); if (child != NULL) @@ -4411,19 +4449,11 @@ maybe_adjust_io_workers(void) break; /* try again next time */ } - /* Too many running? */ - if (io_worker_count > io_workers) - { - /* ask the IO worker in the highest slot to exit */ - for (int i = MAX_IO_WORKERS - 1; i >= 0; --i) - { - if (io_worker_children[i] != NULL) - { - kill(io_worker_children[i]->pid, SIGUSR2); - break; - } - } - } + /* + * If there are too many running because io_max_workers changed, that will + * be handled by the IO workers themselves so they can shut down in + * preferred order. + */ } diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index 9a82d5f847d..6d3f5289e18 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -11,9 +11,10 @@ * infrastructure for reopening the file, and must processed synchronously by * the client code when submitted. * - * So that the submitter can make just one system call when submitting a batch - * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This - * could be improved by using futexes instead of latches to wake N waiters. + * When a batch of IOs is submitted, the lowest numbered idle worker is woken + * up. If it sees more work in the queue it wakes a peer to help, and so on + * in a chain. When a backlog is detected, the pool size is increased. When + * the highest numbered worker times out after a period of inactivity. * * This method of AIO is available in all builds on all operating systems, and * is the default. @@ -40,6 +41,8 @@ #include "storage/io_worker.h" #include "storage/ipc.h" #include "storage/latch.h" +#include "storage/lwlock.h" +#include "storage/pmsignal.h" #include "storage/proc.h" #include "tcop/tcopprot.h" #include "utils/injection_point.h" @@ -47,10 +50,8 @@ #include "utils/ps_status.h" #include "utils/wait_event.h" - -/* How many workers should each worker wake up if needed? */ -#define IO_WORKER_WAKEUP_FANOUT 2 - +/* Saturation for stats counters used to estimate wakeup:work ratio. */ +#define PGAIO_WORKER_STATS_MAX 64 typedef struct PgAioWorkerSubmissionQueue { @@ -63,17 +64,25 @@ typedef struct PgAioWorkerSubmissionQueue typedef struct PgAioWorkerSlot { - Latch *latch; - bool in_use; + ProcNumber proc_number; } PgAioWorkerSlot; typedef struct PgAioWorkerControl { + /* Seen by postmaster */ + volatile bool new_worker_needed; + + /* Potected by AioWorkerSubmissionQueueLock. */ uint64 idle_worker_mask; + + /* Protected by AioWorkerControlLock. */ + uint64 worker_set; + int nworkers; + + /* Protected by AioWorkerControlLock. */ PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]; } PgAioWorkerControl; - static size_t pgaio_worker_shmem_size(void); static void pgaio_worker_shmem_init(bool first_time); @@ -91,11 +100,14 @@ const IoMethodOps pgaio_worker_ops = { /* GUCs */ -int io_workers = 3; +int io_min_workers = 1; +int io_max_workers = 8; +int io_worker_idle_timeout = 60000; +int io_worker_launch_interval = 500; static int io_worker_queue_size = 64; -static int MyIoWorkerId; +static int MyIoWorkerId = -1; static PgAioWorkerSubmissionQueue *io_worker_submission_queue; static PgAioWorkerControl *io_worker_control; @@ -152,37 +164,172 @@ pgaio_worker_shmem_init(bool first_time) &found); if (!found) { + io_worker_control->new_worker_needed = false; + io_worker_control->worker_set = 0; io_worker_control->idle_worker_mask = 0; for (int i = 0; i < MAX_IO_WORKERS; ++i) - { - io_worker_control->workers[i].latch = NULL; - io_worker_control->workers[i].in_use = false; - } + io_worker_control->workers[i].proc_number = INVALID_PROC_NUMBER; } } +static void +pgaio_worker_consider_new_worker(uint32 queue_depth) +{ + /* + * This is called from sites that don't hold AioWorkerControlLock, but it + * changes infrequently and an up to date value is not required for this + * heuristic purpose. + */ + if (!io_worker_control->new_worker_needed && + queue_depth >= io_worker_control->nworkers) + { + io_worker_control->new_worker_needed = true; + SendPostmasterSignal(PMSIGNAL_IO_WORKER_CHANGE); + } +} + +/* + * Called by a worker when the queue is empty, to try to prevent a delayed + * reaction to a brief burst. This races against the postmaster acting on the + * old value if it was recently set to true, but that's OK, the ordering would + * be indeterminate anyway even if we could use locks in the postmaster. + */ +static void +pgaio_worker_cancel_new_worker(void) +{ + io_worker_control->new_worker_needed = false; +} + +/* + * Called by the postmaster to check if a new worker is needed. + */ +bool +pgaio_worker_test_new_worker_needed(void) +{ + return io_worker_control->new_worker_needed; +} + +/* + * Called by the postmaster to check if a new worker is needed when it's ready + * to launch one, and clear the flag. + */ +bool +pgaio_worker_clear_new_worker_needed(void) +{ + bool result; + + result = io_worker_control->new_worker_needed; + if (result) + io_worker_control->new_worker_needed = false; + + return result; +} + +static uint64 +pgaio_worker_mask(int worker) +{ + return UINT64_C(1) << worker; +} + +static void +pgaio_worker_add(uint64 *set, int worker) +{ + *set |= pgaio_worker_mask(worker); +} + +static void +pgaio_worker_remove(uint64 *set, int worker) +{ + *set &= ~pgaio_worker_mask(worker); +} + +#ifdef USE_ASSERT_CHECKING +static bool +pgaio_worker_in(uint64 set, int worker) +{ + return (set & pgaio_worker_mask(worker)) != 0; +} +#endif + +static uint64 +pgaio_worker_highest(uint64 set) +{ + return pg_leftmost_one_pos64(set); +} + +static uint64 +pgaio_worker_lowest(uint64 set) +{ + return pg_rightmost_one_pos64(set); +} + +static int +pgaio_worker_pop(uint64 *set) +{ + int worker; + + Assert(set != 0); + worker = pgaio_worker_lowest(*set); + pgaio_worker_remove(set, worker); + return worker; +} + static int pgaio_worker_choose_idle(void) { + uint64 idle_worker_mask; int worker; - if (io_worker_control->idle_worker_mask == 0) + Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE)); + + /* + * Workers only wake higher numbered workers, to try to encourage an + * ordering of wakeup:work ratios, reducing spurious wakeups in lower + * numbered workers. + */ + idle_worker_mask = io_worker_control->idle_worker_mask; + if (MyIoWorkerId != -1) + idle_worker_mask &= ~(pgaio_worker_mask(MyIoWorkerId) - 1); + + if (idle_worker_mask == 0) return -1; /* Find the lowest bit position, and clear it. */ - worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask); - io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker); - Assert(io_worker_control->workers[worker].in_use); + worker = pgaio_worker_lowest(idle_worker_mask); + pgaio_worker_remove(&io_worker_control->idle_worker_mask, worker); return worker; } +/* + * Try to wake a worker by setting its latch, to tell it there are IOs to + * process in the submission queue. + */ +static void +pgaio_worker_wake(int worker) +{ + ProcNumber proc_number; + + /* + * If the selected worker is concurrently exiting, then pgaio_worker_die() + * had not yet removed it as of when we saw it in idle_worker_mask. That's + * OK, because it will wake all remaining workers to close wakeup-vs-exit + * races: *someone* will see the queued IO. If there are no workers + * running, the postmaster will start a new one. + */ + proc_number = io_worker_control->workers[worker].proc_number; + if (proc_number != INVALID_PROC_NUMBER) + SetLatch(&GetPGProcByNumber(proc_number)->procLatch); +} + static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh) { PgAioWorkerSubmissionQueue *queue; uint32 new_head; + Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE)); + queue = io_worker_submission_queue; new_head = (queue->head + 1) & (queue->size - 1); if (new_head == queue->tail) @@ -204,6 +351,8 @@ pgaio_worker_submission_queue_consume(void) PgAioWorkerSubmissionQueue *queue; uint32 result; + Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE)); + queue = io_worker_submission_queue; if (queue->tail == queue->head) return UINT32_MAX; /* empty */ @@ -220,6 +369,8 @@ pgaio_worker_submission_queue_depth(void) uint32 head; uint32 tail; + Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE)); + head = io_worker_submission_queue->head; tail = io_worker_submission_queue->tail; @@ -244,9 +395,9 @@ static void pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios) { PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE]; + uint32 queue_depth; + int worker = -1; int nsync = 0; - Latch *wakeup = NULL; - int worker; Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE); @@ -261,51 +412,48 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios) * we can to workers, to maximize concurrency. */ synchronous_ios[nsync++] = staged_ios[i]; - continue; } - - if (wakeup == NULL) + else if (worker == -1) { /* Choose an idle worker to wake up if we haven't already. */ worker = pgaio_worker_choose_idle(); - if (worker >= 0) - wakeup = io_worker_control->workers[worker].latch; pgaio_debug_io(DEBUG4, staged_ios[i], "choosing worker %d", worker); } } + queue_depth = pgaio_worker_submission_queue_depth(); LWLockRelease(AioWorkerSubmissionQueueLock); - if (wakeup) - SetLatch(wakeup); + if (worker != -1) + pgaio_worker_wake(worker); + else + pgaio_worker_consider_new_worker(queue_depth); /* Run whatever is left synchronously. */ for (int i = 0; i < nsync; ++i) { - wakeup = NULL; - /* * Between synchronous IO operations, try again to enqueue as many as * we can. */ if (i > 0) { - wakeup = NULL; + worker = -1; LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); while (i < nsync && pgaio_worker_submission_queue_insert(synchronous_ios[i])) { - if (wakeup == NULL && (worker = pgaio_worker_choose_idle()) >= 0) - wakeup = io_worker_control->workers[worker].latch; + if (worker == -1) + worker = pgaio_worker_choose_idle(); i++; } LWLockRelease(AioWorkerSubmissionQueueLock); - if (wakeup) - SetLatch(wakeup); + if (worker != -1) + pgaio_worker_wake(worker); if (i == nsync) break; @@ -337,14 +485,27 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios) static void pgaio_worker_die(int code, Datum arg) { - LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); - Assert(io_worker_control->workers[MyIoWorkerId].in_use); - Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch); + uint64 notify_set; - io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId); - io_worker_control->workers[MyIoWorkerId].in_use = false; - io_worker_control->workers[MyIoWorkerId].latch = NULL; + LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); + pgaio_worker_remove(&io_worker_control->idle_worker_mask, MyIoWorkerId); LWLockRelease(AioWorkerSubmissionQueueLock); + + LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE); + Assert(io_worker_control->workers[MyIoWorkerId].proc_number == MyProcNumber); + io_worker_control->workers[MyIoWorkerId].proc_number = INVALID_PROC_NUMBER; + Assert(pgaio_worker_in(io_worker_control->worker_set, MyIoWorkerId)); + pgaio_worker_remove(&io_worker_control->worker_set, MyIoWorkerId); + notify_set = io_worker_control->worker_set; + Assert(io_worker_control->nworkers > 0); + io_worker_control->nworkers--; + Assert(pg_popcount64(io_worker_control->worker_set) == + io_worker_control->nworkers); + LWLockRelease(AioWorkerControlLock); + + /* Notify other workers on pool change. */ + while (notify_set != 0) + pgaio_worker_wake(pgaio_worker_pop(¬ify_set)); } /* @@ -354,33 +515,37 @@ pgaio_worker_die(int code, Datum arg) static void pgaio_worker_register(void) { - MyIoWorkerId = -1; + uint64 worker_set_inverted; + uint64 old_worker_set; - /* - * XXX: This could do with more fine-grained locking. But it's also not - * very common for the number of workers to change at the moment... - */ - LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); + MyIoWorkerId = -1; - for (int i = 0; i < MAX_IO_WORKERS; ++i) + LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE); + worker_set_inverted = ~io_worker_control->worker_set; + if (worker_set_inverted != 0) { - if (!io_worker_control->workers[i].in_use) - { - Assert(io_worker_control->workers[i].latch == NULL); - io_worker_control->workers[i].in_use = true; - MyIoWorkerId = i; - break; - } - else - Assert(io_worker_control->workers[i].latch != NULL); + MyIoWorkerId = pgaio_worker_lowest(worker_set_inverted); + if (MyIoWorkerId >= MAX_IO_WORKERS) + MyIoWorkerId = -1; } - if (MyIoWorkerId == -1) elog(ERROR, "couldn't find a free worker slot"); - io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId); - io_worker_control->workers[MyIoWorkerId].latch = MyLatch; - LWLockRelease(AioWorkerSubmissionQueueLock); + Assert(io_worker_control->workers[MyIoWorkerId].proc_number == + INVALID_PROC_NUMBER); + io_worker_control->workers[MyIoWorkerId].proc_number = MyProcNumber; + + old_worker_set = io_worker_control->worker_set; + Assert(!pgaio_worker_in(old_worker_set, MyIoWorkerId)); + pgaio_worker_add(&io_worker_control->worker_set, MyIoWorkerId); + io_worker_control->nworkers++; + Assert(pg_popcount64(io_worker_control->worker_set) == + io_worker_control->nworkers); + LWLockRelease(AioWorkerControlLock); + + /* Notify other workers on pool change. */ + while (old_worker_set != 0) + pgaio_worker_wake(pgaio_worker_pop(&old_worker_set)); on_shmem_exit(pgaio_worker_die, 0); } @@ -406,14 +571,47 @@ pgaio_worker_error_callback(void *arg) errcontext("I/O worker executing I/O on behalf of process %d", owner_pid); } +/* + * Check if this backend is allowed to time out, and thus should use a + * non-infinite sleep time. Only the highest-numbered worker is allowed to + * time out, and only if the pool is above io_min_workers. Serializing + * timeouts keeps IDs in a range 0..N without gaps, and avoids undershooting + * io_min_workers. + * + * The result is only instantaneously true and may be temporarily inconsistent + * in different workers around transitions, but all workers are woken up on + * pool size or GUC changes making the result eventually consistent. + */ +static bool +pgaio_worker_can_timeout(void) +{ + uint64 worker_set; + + /* Serialize against pool sized changes. */ + LWLockAcquire(AioWorkerControlLock, LW_SHARED); + worker_set = io_worker_control->worker_set; + LWLockRelease(AioWorkerControlLock); + + if (MyIoWorkerId != pgaio_worker_highest(worker_set)) + return false; + if (MyIoWorkerId < io_min_workers) + return false; + + return true; +} + void IoWorkerMain(const void *startup_data, size_t startup_data_len) { sigjmp_buf local_sigjmp_buf; + TimestampTz idle_timeout_abs = 0; + int timeout_guc_used = 0; PgAioHandle *volatile error_ioh = NULL; ErrorContextCallback errcallback = {0}; volatile int error_errno = 0; char cmd[128]; + int ios = 0; + int wakeups = 0; MyBackendType = B_IO_WORKER; AuxiliaryProcessMainCommon(); @@ -482,10 +680,8 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) while (!ShutdownRequestPending) { uint32 io_index; - Latch *latches[IO_WORKER_WAKEUP_FANOUT]; - int nlatches = 0; - int nwakeups = 0; - int worker; + uint32 queue_depth; + int worker = -1; /* * Try to get a job to do. @@ -494,40 +690,48 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) * to ensure that we don't see an outdated data in the handle. */ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); - if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX) + io_index = pgaio_worker_submission_queue_consume(); + queue_depth = pgaio_worker_submission_queue_depth(); + if (io_index == UINT32_MAX) { - /* - * Nothing to do. Mark self idle. - * - * XXX: Invent some kind of back pressure to reduce useless - * wakeups? - */ - io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId); + /* Nothing to do. Mark self idle. */ + pgaio_worker_add(&io_worker_control->idle_worker_mask, + MyIoWorkerId); } else { /* Got one. Clear idle flag. */ - io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId); + pgaio_worker_remove(&io_worker_control->idle_worker_mask, + MyIoWorkerId); - /* See if we can wake up some peers. */ - nwakeups = Min(pgaio_worker_submission_queue_depth(), - IO_WORKER_WAKEUP_FANOUT); - for (int i = 0; i < nwakeups; ++i) - { - if ((worker = pgaio_worker_choose_idle()) < 0) - break; - latches[nlatches++] = io_worker_control->workers[worker].latch; - } + /* + * See if we should wake up a peer. Only do this if this worker + * is not experiencing spurious wakeups itself, to end a chain of + * wasted scheduling. + */ + if (queue_depth > 0 && wakeups <= ios) + worker = pgaio_worker_choose_idle(); } LWLockRelease(AioWorkerSubmissionQueueLock); - for (int i = 0; i < nlatches; ++i) - SetLatch(latches[i]); + /* Propagate wakeups. */ + if (worker != -1) + pgaio_worker_wake(worker); + else if (wakeups <= ios) + pgaio_worker_consider_new_worker(queue_depth); if (io_index != UINT32_MAX) { PgAioHandle *ioh = NULL; + /* Cancel timeout and update wakeup:work ratio. */ + idle_timeout_abs = 0; + if (++ios == PGAIO_WORKER_STATS_MAX) + { + ios /= 2; + wakeups /= 2; + } + ioh = &pgaio_ctl->io_handles[io_index]; error_ioh = ioh; errcallback.arg = ioh; @@ -593,8 +797,69 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) } else { - WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1, - WAIT_EVENT_IO_WORKER_MAIN); + int timeout_ms; + + /* Cancel new worker if pending. */ + pgaio_worker_cancel_new_worker(); + + /* Compute the remaining allowed idle time. */ + if (io_worker_idle_timeout == -1) + { + /* Never time out. */ + timeout_ms = -1; + } + else + { + TimestampTz now = GetCurrentTimestamp(); + + /* If the GUC changes, reset timer. */ + if (idle_timeout_abs != 0 && + io_worker_idle_timeout != timeout_guc_used) + idle_timeout_abs = 0; + + /* On first sleep, compute absolute timeout. */ + if (idle_timeout_abs == 0) + { + idle_timeout_abs = + TimestampTzPlusMilliseconds(now, + io_worker_idle_timeout); + timeout_guc_used = io_worker_idle_timeout; + } + + /* + * All workers maintain the absolute timeout value, but only + * the highest worker can actually time out and only if + * io_min_workers is exceeded. All others wait only for + * explicit wakeups caused by queue insertion, wakeup + * propagation, change of pool size (possibly making them + * highest), or GUC reload. + */ + if (pgaio_worker_can_timeout()) + timeout_ms = + TimestampDifferenceMilliseconds(now, + idle_timeout_abs); + else + timeout_ms = -1; + } + + if (WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT, + timeout_ms, + WAIT_EVENT_IO_WORKER_MAIN) == WL_TIMEOUT) + { + /* WL_TIMEOUT */ + if (pgaio_worker_can_timeout()) + if (GetCurrentTimestamp() >= idle_timeout_abs) + break; + } + else + { + /* WL_LATCH_SET */ + if (++wakeups == PGAIO_WORKER_STATS_MAX) + { + ios /= 2; + wakeups /= 2; + } + } ResetLatch(MyLatch); } @@ -604,6 +869,10 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); + + /* If io_max_workers has been decreased, exit highest first. */ + if (MyIoWorkerId >= io_max_workers) + break; } } diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 4da68312b5f..c6c8107fe33 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -352,6 +352,7 @@ DSMRegistry "Waiting to read or update the dynamic shared memory registry." InjectionPoint "Waiting to read or update information related to injection points." SerialControl "Waiting to read or update shared <filename>pg_serial</filename> state." AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue." +AioWorkerControl "Waiting to update AIO worker information." # # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE) diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index d14b1678e7f..ecb16facb67 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3306,14 +3306,52 @@ struct config_int ConfigureNamesInt[] = }, { - {"io_workers", + {"io_max_workers", PGC_SIGHUP, RESOURCES_IO, - gettext_noop("Number of IO worker processes, for io_method=worker."), + gettext_noop("Maximum number of IO worker processes, for io_method=worker."), NULL, }, - &io_workers, - 3, 1, MAX_IO_WORKERS, + &io_max_workers, + 8, 1, MAX_IO_WORKERS, + NULL, NULL, NULL + }, + + { + {"io_min_workers", + PGC_SIGHUP, + RESOURCES_IO, + gettext_noop("Minimum number of IO worker processes, for io_method=worker."), + NULL, + }, + &io_min_workers, + 1, 1, MAX_IO_WORKERS, + NULL, NULL, NULL + }, + + { + {"io_worker_idle_timeout", + PGC_SIGHUP, + RESOURCES_IO, + gettext_noop("Maximum idle time before IO workers exit, for io_method=worker."), + NULL, + GUC_UNIT_MS + }, + &io_worker_idle_timeout, + 60 * 1000, -1, INT_MAX, + NULL, NULL, NULL + }, + + { + {"io_worker_launch_interval", + PGC_SIGHUP, + RESOURCES_IO, + gettext_noop("Maximum idle time between launching IO workers, for io_method=worker."), + NULL, + GUC_UNIT_MS + }, + &io_worker_launch_interval, + 500, 0, INT_MAX, NULL, NULL, NULL }, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index a9d8293474a..1da6345ad7a 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -210,7 +210,10 @@ # can execute simultaneously # -1 sets based on shared_buffers # (change requires restart) -#io_workers = 3 # 1-32; +#io_min_workers = 1 # 1-32; +#io_max_workers = 8 # 1-32; +#io_worker_idle_timeout = 60s # min 100ms +#io_worker_launch_interval = 500ms # min 0ms # - Worker Processes - diff --git a/src/include/storage/io_worker.h b/src/include/storage/io_worker.h index 7bde7e89c8a..de9c80109e0 100644 --- a/src/include/storage/io_worker.h +++ b/src/include/storage/io_worker.h @@ -17,6 +17,13 @@ pg_noreturn extern void IoWorkerMain(const void *startup_data, size_t startup_data_len); -extern PGDLLIMPORT int io_workers; +extern PGDLLIMPORT int io_min_workers; +extern PGDLLIMPORT int io_max_workers; +extern PGDLLIMPORT int io_worker_idle_timeout; +extern PGDLLIMPORT int io_worker_launch_interval; + +/* Interfaces visible to the postmaster. */ +extern bool pgaio_worker_test_new_worker_needed(void); +extern bool pgaio_worker_clear_new_worker_needed(void); #endif /* IO_WORKER_H */ diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index a9681738146..c1801d08833 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -84,3 +84,4 @@ PG_LWLOCK(50, DSMRegistry) PG_LWLOCK(51, InjectionPoint) PG_LWLOCK(52, SerialControl) PG_LWLOCK(53, AioWorkerSubmissionQueue) +PG_LWLOCK(54, AioWorkerControl) diff --git a/src/include/storage/pmsignal.h b/src/include/storage/pmsignal.h index 428aa3fd68a..2859a636349 100644 --- a/src/include/storage/pmsignal.h +++ b/src/include/storage/pmsignal.h @@ -38,6 +38,7 @@ typedef enum PMSIGNAL_ROTATE_LOGFILE, /* send SIGUSR1 to syslogger to rotate logfile */ PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */ PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */ + PMSIGNAL_IO_WORKER_CHANGE, /* IO worker pool change */ PMSIGNAL_BACKGROUND_WORKER_CHANGE, /* background worker state change */ PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */ PMSIGNAL_ADVANCE_STATE_MACHINE, /* advance postmaster's state machine */ diff --git a/src/test/modules/test_aio/t/002_io_workers.pl b/src/test/modules/test_aio/t/002_io_workers.pl index af5fae15ea7..a0252857798 100644 --- a/src/test/modules/test_aio/t/002_io_workers.pl +++ b/src/test/modules/test_aio/t/002_io_workers.pl @@ -14,6 +14,9 @@ $node->init(); $node->append_conf( 'postgresql.conf', qq( io_method=worker +io_worker_idle_timeout=0ms +io_worker_launch_interval=0ms +io_max_workers=32 )); $node->start(); @@ -31,7 +34,7 @@ sub test_number_of_io_workers_dynamic { my $node = shift; - my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_workers'); + my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_min_workers'); # Verify that worker count can't be set to 0 change_number_of_io_workers($node, 0, $prev_worker_count, 1); @@ -62,23 +65,23 @@ sub change_number_of_io_workers my ($result, $stdout, $stderr); ($result, $stdout, $stderr) = - $node->psql('postgres', "ALTER SYSTEM SET io_workers = $worker_count"); + $node->psql('postgres', "ALTER SYSTEM SET io_min_workers = $worker_count"); $node->safe_psql('postgres', 'SELECT pg_reload_conf()'); if ($expect_failure) { ok( $stderr =~ - /$worker_count is outside the valid range for parameter "io_workers"/, - "updating number of io_workers to $worker_count failed, as expected" + /$worker_count is outside the valid range for parameter "io_min_workers"/, + "updating number of io_min_workers to $worker_count failed, as expected" ); return $prev_worker_count; } else { - is( $node->safe_psql('postgres', 'SHOW io_workers'), + is( $node->safe_psql('postgres', 'SHOW io_min_workers'), $worker_count, - "updating number of io_workers from $prev_worker_count to $worker_count" + "updating number of io_min_workers from $prev_worker_count to $worker_count" ); check_io_worker_count($node, $worker_count); -- 2.47.2