Hi, On Mon, Jul 14, 2025 at 2:10 PM Masahiko Sawada <sawada.m...@gmail.com> wrote: > > --- > - shared->maintenance_work_mem_worker = > - (nindexes_mwm > 0) ? > - maintenance_work_mem / Min(parallel_workers, nindexes_mwm) : > - maintenance_work_mem; > + > + if (AmAutoVacuumWorkerProcess()) > + shared->maintenance_work_mem_worker = > + (nindexes_mwm > 0) ? > + autovacuum_work_mem / Min(parallel_workers, nindexes_mwm) : > + autovacuum_work_mem; > + else > + shared->maintenance_work_mem_worker = > + (nindexes_mwm > 0) ? > + maintenance_work_mem / Min(parallel_workers, nindexes_mwm) : > + maintenance_work_mem; > > Since we have a similar code in dead_items_alloc() I think it's better > to follow it: > > int vac_work_mem = AmAutoVacuumWorkerProcess() && > autovacuum_work_mem != -1 ? > autovacuum_work_mem : maintenance_work_mem; > > That is, we calculate vac_work_mem first and then calculate > shared->maintenance_work_mem_worker. I think it's more straightforward > as the formula of maintenance_work_mem_worker is the same whereas the > amount of memory used for vacuum and autovacuum varies. >
I was confused by the fact that initially maintenance_work_mem was used for calculations, not vac_work_mem. I agree that we should better use already calculated vac_work_mem value. > --- > + nlaunched_workers = pvs->pcxt->nworkers_launched; /* remember this value > */ > DestroyParallelContext(pvs->pcxt); > + > + /* Release all launched (i.e. reserved) parallel autovacuum workers. */ > + if (AmAutoVacuumWorkerProcess()) > + ParallelAutoVacuumReleaseWorkers(nlaunched_workers); > + > > Why don't we release workers before destroying the parallel context? > Destroying parallel context includes waiting for all workers to exit (after which, other operations can use them). If we first call ParallelAutoVacuumReleaseWorkers, some operation can reasonably request all released workers. But this request can fail, because there is no guarantee that workers managed to finish. Actually, there's nothing wrong with that, but I think releasing workers only after finishing work is a more logical approach. > --- > @@ -558,7 +576,9 @@ parallel_vacuum_compute_workers(Relation *indrels, > int nindexes, int nrequested, > * We don't allow performing parallel operation in standalone backend or > * when parallelism is disabled. > */ > - if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0) > + if (!IsUnderPostmaster || > + (autovacuum_max_parallel_workers == 0 && AmAutoVacuumWorkerProcess()) > || > + (max_parallel_maintenance_workers == 0 && > !AmAutoVacuumWorkerProcess())) > return 0; > > /* > @@ -597,15 +617,17 @@ parallel_vacuum_compute_workers(Relation > *indrels, int nindexes, int nrequested, > parallel_workers = (nrequested > 0) ? > Min(nrequested, nindexes_parallel) : nindexes_parallel; > > - /* Cap by max_parallel_maintenance_workers */ > - parallel_workers = Min(parallel_workers, > max_parallel_maintenance_workers); > + /* Cap by GUC variable */ > + parallel_workers = AmAutoVacuumWorkerProcess() ? > + Min(parallel_workers, autovacuum_max_parallel_workers) : > + Min(parallel_workers, max_parallel_maintenance_workers); > > return parallel_workers; > > How about calculating the maximum number of workers once and using it > in the above both places? > Agree. Good idea. > --- > + /* Check how many workers can provide autovacuum. */ > + if (AmAutoVacuumWorkerProcess() && nworkers > 0) > + nworkers = ParallelAutoVacuumReserveWorkers(nworkers); > + > > I think it's better to move this code to right after setting "nworkers > = Min(nworkers, pvs->pcxt->nworkers);" as it's a more related code. > > The comment needs to be updated as it doesn't match what the function > actually does (i.e. reserving the workers). > You are right, I'll fix it. > --- > /* Reinitialize parallel context to relaunch parallel workers */ > if (num_index_scans > 0) > + { > ReinitializeParallelDSM(pvs->pcxt); > > + /* > + * Release all launched (i.e. reserved) parallel autovacuum > + * workers. > + */ > + if (AmAutoVacuumWorkerProcess()) > + > ParallelAutoVacuumReleaseWorkers(pvs->pcxt->nworkers_launched); > + } > > Why do we need to release all workers here? If there is a reason, we > should mention it as a comment. > Hm, I guess it was left over from previous patch versions. Actually we don't need to release workers here, as we will try to launch them immediately. It is a bug, thank you for noticing it. > --- > @@ -706,16 +751,16 @@ > parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int > num_index_scan > > if (vacuum) > ereport(pvs->shared->elevel, > - (errmsg(ngettext("launched %d parallel vacuum > worker for index vacuuming (planned: %d)", > - "launched %d parallel vacuum > workers for index vacuuming (planned: %d)", > + (errmsg(ngettext("launched %d parallel %svacuum > worker for index vacuuming (planned: %d)", > + "launched %d parallel %svacuum > workers for index vacuuming (planned: %d)", > pvs->pcxt->nworkers_launched), > - pvs->pcxt->nworkers_launched, nworkers))); > + pvs->pcxt->nworkers_launched, > AmAutoVacuumWorkerProcess() ? "auto" : "", nworkers))); > > The "%svacuum" part doesn't work in terms of translation. We need to > construct the whole sentence instead. > But do we need this log message > change in the first place? IIUC autovacuums write logs only when the > execution time exceed the log_autovacuum_min_duration (or its > reloption). The patch unconditionally sets LOG level for autovacuums > but I'm not sure it's consistent with other autovacuum logging > behavior: > > + int elevel = AmAutoVacuumWorkerProcess() || > + vacrel->verbose ? > + INFO : DEBUG2; > > This log level is used only "for messages about parallel workers launched". I think that such logs relate more to the parallel workers module than autovacuum itself. Moreover, if we emit log "planned vs. launched" each time, it will simplify the task of selecting the optimal value of 'autovacuum_max_parallel_workers' parameter. What do you think? About "%svacuum" - I guess we need to clarify what exactly the workers were launched for. I'll add errhint to this log, but I don't know whether such approach is acceptable. > - * Support routines for parallel vacuum execution. > + * Support routines for parallel [auto]vacuum execution. > > The patch includes the change of "vacuum" -> "[auto]vacuum" in many > places. While I think we need to mention that vacuumparallel.c > supports autovacuums I'm not sure we really need all of them. If we > accept this style, we would require for all subsequent changes to > follow it, which could increase maintenance costs. > Agree. I'll leave a comment which says that vacuumparallel also supports parallel autovacuum. All other changes like "[auto]vacuum" will be deleted. > --- > @@ -299,6 +301,7 @@ typedef struct > WorkerInfo av_startingWorker; > AutoVacuumWorkItem av_workItems[NUM_WORKITEMS]; > pg_atomic_uint32 av_nworkersForBalance; > + uint32 av_available_parallel_workers; > > Other field names seem to have consistent naming rules; 'av_' prefix > followed by name in camel case. So how about renaming it to > av_freeParallelWorkers or something along those lines? > > --- > +int > +ParallelAutoVacuumReserveWorkers(int nworkers) > +{ > > Other exposed functions have "AutoVacuum" prefix, so how about > renaming it to AutoVacuumReserveParallelWorkers() or something along > those lines? > Agreeing with both comments, I'll rename the structure field and functions. > --- > + if (AutoVacuumShmem->av_available_parallel_workers < nworkers) > + { > + /* Provide as many workers as we can. */ > + can_launch = AutoVacuumShmem->av_available_parallel_workers; > + AutoVacuumShmem->av_available_parallel_workers = 0; > + } > + else > + { > + /* OK, we can provide all requested workers. */ > + can_launch = nworkers; > + AutoVacuumShmem->av_available_parallel_workers -= nworkers; > + } > > Can we simplify this logic as follows? > > can_launch = Min(AutoVacuumShmem->av_available_parallel_workers, nworkers); > AutoVacuumShmem->av_available_parallel_workers -= can_launch; > Sure, I'll simplify it. --- Thank you very much for your comments! Please, see v7 patch : 1) Rename few functions and variables + get rid of comments like "[auto]vacuum" in vacuumparallel.c 2) Simplified logic in 'parallel_vacuum_init' and 'AutoVacuumReserveParallelWorkers' functions 3) Refactor and bug fix in 'parallel_vacuum_process_all_indexes' function 4) Change "planned vs. launched" logging, so it can be translated 5) Rebased on newest commit in master branch -- Best regards, Daniil Davydov
From 7af255b4d0a5e7927f6a1c212c4b2342d6b044a7 Mon Sep 17 00:00:00 2001 From: Daniil Davidov <d.davy...@postgrespro.ru> Date: Fri, 16 May 2025 11:59:03 +0700 Subject: [PATCH v7 2/2] Sandbox for parallel index autovacuum --- src/test/modules/Makefile | 1 + src/test/modules/autovacuum/.gitignore | 1 + src/test/modules/autovacuum/Makefile | 14 ++ src/test/modules/autovacuum/meson.build | 12 ++ .../autovacuum/t/001_autovac_parallel.pl | 131 ++++++++++++++++++ src/test/modules/meson.build | 1 + 6 files changed, 160 insertions(+) create mode 100644 src/test/modules/autovacuum/.gitignore create mode 100644 src/test/modules/autovacuum/Makefile create mode 100644 src/test/modules/autovacuum/meson.build create mode 100644 src/test/modules/autovacuum/t/001_autovac_parallel.pl diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index aa1d27bbed3..b7f3e342e82 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -5,6 +5,7 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global SUBDIRS = \ + autovacuum \ brin \ commit_ts \ delay_execution \ diff --git a/src/test/modules/autovacuum/.gitignore b/src/test/modules/autovacuum/.gitignore new file mode 100644 index 00000000000..0b54641bceb --- /dev/null +++ b/src/test/modules/autovacuum/.gitignore @@ -0,0 +1 @@ +/tmp_check/ \ No newline at end of file diff --git a/src/test/modules/autovacuum/Makefile b/src/test/modules/autovacuum/Makefile new file mode 100644 index 00000000000..90c00ff350b --- /dev/null +++ b/src/test/modules/autovacuum/Makefile @@ -0,0 +1,14 @@ +# src/test/modules/autovacuum/Makefile + +TAP_TESTS = 1 + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/autovacuum +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif \ No newline at end of file diff --git a/src/test/modules/autovacuum/meson.build b/src/test/modules/autovacuum/meson.build new file mode 100644 index 00000000000..f91c1a14d2b --- /dev/null +++ b/src/test/modules/autovacuum/meson.build @@ -0,0 +1,12 @@ +# Copyright (c) 2022-2025, PostgreSQL Global Development Group + +tests += { + 'name': 'autovacuum', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'tap': { + 'tests': [ + 't/001_autovac_parallel.pl', + ], + }, +} diff --git a/src/test/modules/autovacuum/t/001_autovac_parallel.pl b/src/test/modules/autovacuum/t/001_autovac_parallel.pl new file mode 100644 index 00000000000..ae892e5b4de --- /dev/null +++ b/src/test/modules/autovacuum/t/001_autovac_parallel.pl @@ -0,0 +1,131 @@ +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $psql_out; + +my $node = PostgreSQL::Test::Cluster->new('node1'); +$node->init; +$node->append_conf('postgresql.conf', qq{ + autovacuum = off + max_wal_size = 4096 + max_worker_processes = 20 + max_parallel_workers = 20 + max_parallel_maintenance_workers = 20 + autovacuum_max_parallel_workers = 10 + log_min_messages = info +}); +$node->start; + +my $indexes_num = 80; +my $initial_rows_num = 100_000; +my $parallel_autovacuum_workers = 5; + +# Create big table and create specified number of b-tree indexes on it +$node->safe_psql('postgres', qq{ + CREATE TABLE test_autovac ( + id SERIAL PRIMARY KEY, + col_1 INTEGER, col_2 INTEGER, col_3 INTEGER, col_4 INTEGER, col_5 INTEGER, + col_6 INTEGER, col_7 INTEGER, col_8 INTEGER, col_9 INTEGER, col_10 INTEGER, + col_11 INTEGER, col_12 INTEGER, col_13 INTEGER, col_14 INTEGER, col_15 INTEGER, + col_16 INTEGER, col_17 INTEGER, col_18 INTEGER, col_19 INTEGER, col_20 INTEGER, + col_21 INTEGER, col_22 INTEGER, col_23 INTEGER, col_24 INTEGER, col_25 INTEGER, + col_26 INTEGER, col_27 INTEGER, col_28 INTEGER, col_29 INTEGER, col_30 INTEGER, + col_31 INTEGER, col_32 INTEGER, col_33 INTEGER, col_34 INTEGER, col_35 INTEGER, + col_36 INTEGER, col_37 INTEGER, col_38 INTEGER, col_39 INTEGER, col_40 INTEGER, + col_41 INTEGER, col_42 INTEGER, col_43 INTEGER, col_44 INTEGER, col_45 INTEGER, + col_46 INTEGER, col_47 INTEGER, col_48 INTEGER, col_49 INTEGER, col_50 INTEGER, + col_51 INTEGER, col_52 INTEGER, col_53 INTEGER, col_54 INTEGER, col_55 INTEGER, + col_56 INTEGER, col_57 INTEGER, col_58 INTEGER, col_59 INTEGER, col_60 INTEGER, + col_61 INTEGER, col_62 INTEGER, col_63 INTEGER, col_64 INTEGER, col_65 INTEGER, + col_66 INTEGER, col_67 INTEGER, col_68 INTEGER, col_69 INTEGER, col_70 INTEGER, + col_71 INTEGER, col_72 INTEGER, col_73 INTEGER, col_74 INTEGER, col_75 INTEGER, + col_76 INTEGER, col_77 INTEGER, col_78 INTEGER, col_79 INTEGER, col_80 INTEGER, + col_81 INTEGER, col_82 INTEGER, col_83 INTEGER, col_84 INTEGER, col_85 INTEGER, + col_86 INTEGER, col_87 INTEGER, col_88 INTEGER, col_89 INTEGER, col_90 INTEGER, + col_91 INTEGER, col_92 INTEGER, col_93 INTEGER, col_94 INTEGER, col_95 INTEGER, + col_96 INTEGER, col_97 INTEGER, col_98 INTEGER, col_99 INTEGER, col_100 INTEGER + ) WITH (parallel_autovacuum_workers = $parallel_autovacuum_workers); + + DO \$\$ + DECLARE + i INTEGER; + BEGIN + FOR i IN 1..$indexes_num LOOP + EXECUTE format('CREATE INDEX idx_col_\%s ON test_autovac (col_\%s);', i, i); + END LOOP; + END \$\$; +}); + +$node->psql('postgres', + "SELECT COUNT(*) FROM pg_index i + JOIN pg_class c ON c.oid = i.indrelid + WHERE c.relname = 'test_autovac';", + stdout => \$psql_out +); +is($psql_out, $indexes_num + 1, "All indexes created successfully"); + +$node->safe_psql('postgres', qq{ + DO \$\$ + DECLARE + i INTEGER; + BEGIN + FOR i IN 1..$initial_rows_num LOOP + INSERT INTO test_autovac ( + col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, + col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, + col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, + col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, + col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49, col_50, + col_51, col_52, col_53, col_54, col_55, col_56, col_57, col_58, col_59, col_60, + col_61, col_62, col_63, col_64, col_65, col_66, col_67, col_68, col_69, col_70, + col_71, col_72, col_73, col_74, col_75, col_76, col_77, col_78, col_79, col_80, + col_81, col_82, col_83, col_84, col_85, col_86, col_87, col_88, col_89, col_90, + col_91, col_92, col_93, col_94, col_95, col_96, col_97, col_98, col_99, col_100 + ) VALUES ( + i, i + 1, i + 2, i + 3, i + 4, i + 5, i + 6, i + 7, i + 8, i + 9, + i + 10, i + 11, i + 12, i + 13, i + 14, i + 15, i + 16, i + 17, i + 18, i + 19, + i + 20, i + 21, i + 22, i + 23, i + 24, i + 25, i + 26, i + 27, i + 28, i + 29, + i + 30, i + 31, i + 32, i + 33, i + 34, i + 35, i + 36, i + 37, i + 38, i + 39, + i + 40, i + 41, i + 42, i + 43, i + 44, i + 45, i + 46, i + 47, i + 48, i + 49, + i + 50, i + 51, i + 52, i + 53, i + 54, i + 55, i + 56, i + 57, i + 58, i + 59, + i + 60, i + 61, i + 62, i + 63, i + 64, i + 65, i + 66, i + 67, i + 68, i + 69, + i + 70, i + 71, i + 72, i + 73, i + 74, i + 75, i + 76, i + 77, i + 78, i + 79, + i + 80, i + 81, i + 82, i + 83, i + 84, i + 85, i + 86, i + 87, i + 88, i + 89, + i + 90, i + 91, i + 92, i + 93, i + 94, i + 95, i + 96, i + 97, i + 98, i + 99 + ); + END LOOP; + END \$\$; +}); + +$node->psql('postgres', + "SELECT COUNT(*) FROM test_autovac;", + stdout => \$psql_out +); +is($psql_out, $initial_rows_num, "All data inserted into table successfully"); + +$node->safe_psql('postgres', qq{ + UPDATE test_autovac SET col_1 = 0 WHERE (col_1 % 3) = 0; + ANALYZE test_autovac; +}); + +# Reduce autovacuum_work_mem, so leader process will perform parallel indexi +# vacuum phase several times +$node->append_conf('postgresql.conf', qq{ + autovacuum_naptime = '1s' + autovacuum_vacuum_threshold = 1 + autovacuum_analyze_threshold = 1 + autovacuum_vacuum_scale_factor = 0.1 + autovacuum_analyze_scale_factor = 0.1 + autovacuum = on +}); + +$node->restart; + +# sleep(3600); + +ok(1, "There are no segfaults"); + +$node->stop; +done_testing(); diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index 9de0057bd1d..7f2ad810ca0 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -1,5 +1,6 @@ # Copyright (c) 2022-2025, PostgreSQL Global Development Group +subdir('autovacuum') subdir('brin') subdir('commit_ts') subdir('delay_execution') -- 2.43.0
From 55b76f15bbc3991b7457de6c1d6998d39b16292c Mon Sep 17 00:00:00 2001 From: Daniil Davidov <d.davy...@postgrespro.ru> Date: Fri, 16 May 2025 11:58:40 +0700 Subject: [PATCH v7 1/2] Parallel index autovacuum with bgworkers --- src/backend/access/common/reloptions.c | 12 ++ src/backend/access/heap/vacuumlazy.c | 6 +- src/backend/commands/vacuumparallel.c | 57 ++++++-- src/backend/postmaster/autovacuum.c | 135 +++++++++++++++++- src/backend/utils/init/globals.c | 1 + src/backend/utils/misc/guc_tables.c | 10 ++ src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/miscadmin.h | 1 + src/include/postmaster/autovacuum.h | 4 + src/include/utils/guc_hooks.h | 2 + src/include/utils/rel.h | 2 + 11 files changed, 220 insertions(+), 11 deletions(-) diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c index 50747c16396..e36d59f632b 100644 --- a/src/backend/access/common/reloptions.c +++ b/src/backend/access/common/reloptions.c @@ -222,6 +222,16 @@ static relopt_int intRelOpts[] = }, SPGIST_DEFAULT_FILLFACTOR, SPGIST_MIN_FILLFACTOR, 100 }, + { + { + "parallel_autovacuum_workers", + "Maximum number of parallel autovacuum workers that can be taken from bgworkers pool for processing this table. " + "If value is 0 then parallel degree will computed based on number of indexes.", + RELOPT_KIND_HEAP, + ShareUpdateExclusiveLock + }, + -1, -1, 1024 + }, { { "autovacuum_vacuum_threshold", @@ -1872,6 +1882,8 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind) {"fillfactor", RELOPT_TYPE_INT, offsetof(StdRdOptions, fillfactor)}, {"autovacuum_enabled", RELOPT_TYPE_BOOL, offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, enabled)}, + {"parallel_autovacuum_workers", RELOPT_TYPE_INT, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, parallel_autovacuum_workers)}, {"autovacuum_vacuum_threshold", RELOPT_TYPE_INT, offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_threshold)}, {"autovacuum_vacuum_max_threshold", RELOPT_TYPE_INT, diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 14036c27e87..7e0ae0184aa 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -3477,6 +3477,10 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) autovacuum_work_mem != -1 ? autovacuum_work_mem : maintenance_work_mem; + int elevel = AmAutoVacuumWorkerProcess() || + vacrel->verbose ? + INFO : DEBUG2; + /* * Initialize state for a parallel vacuum. As of now, only one worker can * be used for an index, so we invoke parallelism only if there are at @@ -3503,7 +3507,7 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) vacrel->pvs = parallel_vacuum_init(vacrel->rel, vacrel->indrels, vacrel->nindexes, nworkers, vac_work_mem, - vacrel->verbose ? INFO : DEBUG2, + elevel, vacrel->bstrategy); /* diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index 0feea1d30ec..6ec610e29e4 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -1,7 +1,9 @@ /*------------------------------------------------------------------------- * * vacuumparallel.c - * Support routines for parallel vacuum execution. + * Support routines for parallel vacuum and autovacuum execution. In the + * future comments, the word "vacuum" will refer to both vacuum and + * autovacuum. * * This file contains routines that are intended to support setting up, using, * and tearing down a ParallelVacuumState. @@ -34,6 +36,7 @@ #include "executor/instrument.h" #include "optimizer/paths.h" #include "pgstat.h" +#include "postmaster/autovacuum.h" #include "storage/bufmgr.h" #include "tcop/tcopprot.h" #include "utils/lsyscache.h" @@ -371,10 +374,12 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, shared->relid = RelationGetRelid(rel); shared->elevel = elevel; shared->queryid = pgstat_get_my_query_id(); + shared->maintenance_work_mem_worker = (nindexes_mwm > 0) ? - maintenance_work_mem / Min(parallel_workers, nindexes_mwm) : - maintenance_work_mem; + vac_work_mem / Min(parallel_workers, nindexes_mwm) : + vac_work_mem; + shared->dead_items_info.max_bytes = vac_work_mem * (size_t) 1024; /* Prepare DSA space for dead items */ @@ -435,6 +440,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats) { + int nlaunched_workers; + Assert(!IsParallelWorker()); /* Copy the updated statistics */ @@ -453,7 +460,13 @@ parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats) TidStoreDestroy(pvs->dead_items); + nlaunched_workers = pvs->pcxt->nworkers_launched; /* remember this value */ DestroyParallelContext(pvs->pcxt); + + /* Release all launched (i.e. reserved) parallel autovacuum workers. */ + if (AmAutoVacuumWorkerProcess()) + AutoVacuumReleaseParallelWorkers(nlaunched_workers); + ExitParallelMode(); pfree(pvs->will_parallel_vacuum); @@ -553,12 +566,17 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, int nindexes_parallel_bulkdel = 0; int nindexes_parallel_cleanup = 0; int parallel_workers; + int max_parallel_workers; + + max_parallel_workers = AmAutoVacuumWorkerProcess() ? + autovacuum_max_parallel_workers : + max_parallel_maintenance_workers; /* * We don't allow performing parallel operation in standalone backend or * when parallelism is disabled. */ - if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0) + if (!IsUnderPostmaster || max_parallel_workers == 0) return 0; /* @@ -597,8 +615,8 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, parallel_workers = (nrequested > 0) ? Min(nrequested, nindexes_parallel) : nindexes_parallel; - /* Cap by max_parallel_maintenance_workers */ - parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers); + /* Cap by GUC variable */ + parallel_workers = Min(parallel_workers, max_parallel_workers); return parallel_workers; } @@ -646,6 +664,13 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan */ nworkers = Min(nworkers, pvs->pcxt->nworkers); + /* + * Also reserve workers in autovacuum global state. Note, that we may be + * given fewer workers than we requested. + */ + if (AmAutoVacuumWorkerProcess() && nworkers > 0) + nworkers = AutoVacuumReserveParallelWorkers(nworkers); + /* * Set index vacuum status and mark whether parallel vacuum worker can * process it. @@ -690,6 +715,16 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan LaunchParallelWorkers(pvs->pcxt); + if (AmAutoVacuumWorkerProcess() && + pvs->pcxt->nworkers_launched < nworkers) + { + /* + * Tell autovacuum that we could not launch all the previously + * reserved workers. + */ + AutoVacuumReleaseParallelWorkers(nworkers - pvs->pcxt->nworkers_launched); + } + if (pvs->pcxt->nworkers_launched > 0) { /* @@ -709,13 +744,19 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)", "launched %d parallel vacuum workers for index vacuuming (planned: %d)", pvs->pcxt->nworkers_launched), - pvs->pcxt->nworkers_launched, nworkers))); + pvs->pcxt->nworkers_launched, nworkers), + AmAutoVacuumWorkerProcess() ? + errhint("workers were launched for parallel autovacuum") : + errhint("workers were launched for parallel vacuum"))); else ereport(pvs->shared->elevel, (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)", "launched %d parallel vacuum workers for index cleanup (planned: %d)", pvs->pcxt->nworkers_launched), - pvs->pcxt->nworkers_launched, nworkers))); + pvs->pcxt->nworkers_launched, nworkers), + AmAutoVacuumWorkerProcess() ? + errhint("workers were launched for parallel autovacuum") : + errhint("workers were launched for parallel vacuum"))); } /* Vacuum the indexes that can be processed by only leader process */ diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index 9474095f271..98609ac8f8f 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -285,6 +285,7 @@ typedef struct AutoVacuumWorkItem * av_workItems work item array * av_nworkersForBalance the number of autovacuum workers to use when * calculating the per worker cost limit + * av_freeParallelWorkers the number of free parallel autovacuum workers * * This struct is protected by AutovacuumLock, except for av_signal and parts * of the worker list (see above). @@ -299,6 +300,7 @@ typedef struct WorkerInfo av_startingWorker; AutoVacuumWorkItem av_workItems[NUM_WORKITEMS]; pg_atomic_uint32 av_nworkersForBalance; + uint32 av_freeParallelWorkers; } AutoVacuumShmemStruct; static AutoVacuumShmemStruct *AutoVacuumShmem; @@ -354,6 +356,7 @@ static void autovac_report_workitem(AutoVacuumWorkItem *workitem, static void avl_sigusr2_handler(SIGNAL_ARGS); static bool av_worker_available(void); static void check_av_worker_gucs(void); +static void check_parallel_av_gucs(int prev_max_parallel_workers); @@ -753,7 +756,9 @@ ProcessAutoVacLauncherInterrupts(void) if (ConfigReloadPending) { int autovacuum_max_workers_prev = autovacuum_max_workers; + int autovacuum_max_parallel_workers_prev; + autovacuum_max_parallel_workers_prev = autovacuum_max_parallel_workers; ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); @@ -769,6 +774,14 @@ ProcessAutoVacLauncherInterrupts(void) if (autovacuum_max_workers_prev != autovacuum_max_workers) check_av_worker_gucs(); + /* + * If autovacuum_max_parallel_workers changed, we must take care of + * the correct value of available parallel autovacuum workers in shmem. + */ + if (autovacuum_max_parallel_workers_prev != + autovacuum_max_parallel_workers) + check_parallel_av_gucs(autovacuum_max_parallel_workers_prev); + /* rebuild the list in case the naptime changed */ rebuild_database_list(InvalidOid); } @@ -2847,8 +2860,12 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map, */ tab->at_params.index_cleanup = VACOPTVALUE_UNSPECIFIED; tab->at_params.truncate = VACOPTVALUE_UNSPECIFIED; - /* As of now, we don't support parallel vacuum for autovacuum */ - tab->at_params.nworkers = -1; + + /* Decide whether we need to process indexes of table in parallel. */ + tab->at_params.nworkers = avopts + ? avopts->parallel_autovacuum_workers + : -1; + tab->at_params.freeze_min_age = freeze_min_age; tab->at_params.freeze_table_age = freeze_table_age; tab->at_params.multixact_freeze_min_age = multixact_freeze_min_age; @@ -3329,6 +3346,64 @@ AutoVacuumRequestWork(AutoVacuumWorkItemType type, Oid relationId, return result; } +/* + * In order to meet the 'autovacuum_max_parallel_workers' limit, leader worker + * must call this function. It returns the number of parallel workers that + * actually can be launched and reserves (if any) these workers in global + * autovacuum state. + * + * NOTE: We will try to provide as many workers as requested, even if caller + * will occupy all available workers. + */ +int +AutoVacuumReserveParallelWorkers(int nworkers) +{ + int can_launch; + + /* Only leader worker can call this function. */ + Assert(AmAutoVacuumWorkerProcess() && !IsParallelWorker()); + + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + + /* Provide as many workers as we can. */ + can_launch = Min(AutoVacuumShmem->av_freeParallelWorkers, nworkers); + AutoVacuumShmem->av_freeParallelWorkers -= nworkers; + + LWLockRelease(AutovacuumLock); + return can_launch; +} + +/* + * When parallel autovacuum worker die, leader worker must call this function + * in order to refresh global autovacuum state. Thus, other leaders will be able + * to use these workers. + * + * 'nworkers' - how many workers caller wants to release. + */ +void +AutoVacuumReleaseParallelWorkers(int nworkers) +{ + /* Only leader worker can call this function. */ + Assert(AmAutoVacuumWorkerProcess() && !IsParallelWorker()); + + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + AutoVacuumShmem->av_freeParallelWorkers += nworkers; + + /* + * If autovacuum_max_parallel_workers variable was reduced during parallel + * autovacuum execution, we must cap available workers number by its new + * value. + */ + if (AutoVacuumShmem->av_freeParallelWorkers > + autovacuum_max_parallel_workers) + { + AutoVacuumShmem->av_freeParallelWorkers = + autovacuum_max_parallel_workers; + } + + LWLockRelease(AutovacuumLock); +} + /* * autovac_init * This is called at postmaster initialization. @@ -3389,6 +3464,8 @@ AutoVacuumShmemInit(void) Assert(!found); AutoVacuumShmem->av_launcherpid = 0; + AutoVacuumShmem->av_freeParallelWorkers = + autovacuum_max_parallel_workers; dclist_init(&AutoVacuumShmem->av_freeWorkers); dlist_init(&AutoVacuumShmem->av_runningWorkers); AutoVacuumShmem->av_startingWorker = NULL; @@ -3439,6 +3516,15 @@ check_autovacuum_work_mem(int *newval, void **extra, GucSource source) return true; } +bool +check_autovacuum_max_parallel_workers(int *newval, void **extra, + GucSource source) +{ + if (*newval >= max_worker_processes) + return false; + return true; +} + /* * Returns whether there is a free autovacuum worker slot available. */ @@ -3470,3 +3556,48 @@ check_av_worker_gucs(void) errdetail("The server will only start up to \"autovacuum_worker_slots\" (%d) autovacuum workers at a given time.", autovacuum_worker_slots))); } + +/* + * Make sure that number of available parallel workers corresponds to the + * autovacuum_max_parallel_workers parameter (after it was changed). + */ +static void +check_parallel_av_gucs(int prev_max_parallel_workers) +{ + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + + if (AutoVacuumShmem->av_freeParallelWorkers > + autovacuum_max_parallel_workers) + { + Assert(prev_max_parallel_workers > autovacuum_max_parallel_workers); + + /* + * Number of available workers must not exceed limit. + * + * Note, that if some parallel autovacuum workers are running at this + * moment, available workers number will not exceed limit after + * releasing them (see ParallelAutoVacuumReleaseWorkers). + */ + AutoVacuumShmem->av_freeParallelWorkers = + autovacuum_max_parallel_workers; + } + else if ((AutoVacuumShmem->av_freeParallelWorkers < + autovacuum_max_parallel_workers) && + (autovacuum_max_parallel_workers > prev_max_parallel_workers)) + { + /* + * If user wants to increase number of parallel autovacuum workers, we + * must increase number of available workers in shmem. + */ + AutoVacuumShmem->av_freeParallelWorkers += + (autovacuum_max_parallel_workers - prev_max_parallel_workers); + + /* + * Nothing to do when autovacuum_max_parallel_workers < + * prev_max_parallel_workers. Available workers number will be capped + * inside ParallelAutoVacuumReleaseWorkers. + */ + } + + LWLockRelease(AutovacuumLock); +} diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index d31cb45a058..977644978c1 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -143,6 +143,7 @@ int NBuffers = 16384; int MaxConnections = 100; int max_worker_processes = 8; int max_parallel_workers = 8; +int autovacuum_max_parallel_workers = 0; int MaxBackends = 0; /* GUC parameters for vacuum */ diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index d14b1678e7f..b6a192af8f8 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3604,6 +3604,16 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"autovacuum_max_parallel_workers", PGC_SIGHUP, VACUUM_AUTOVACUUM, + gettext_noop("Maximum number of parallel autovacuum workers, that can be taken from bgworkers pool."), + gettext_noop("This parameter is capped by \"max_worker_processes\" (not by \"autovacuum_max_workers\"!)."), + }, + &autovacuum_max_parallel_workers, + 0, 0, MAX_BACKENDS, + check_autovacuum_max_parallel_workers, NULL, NULL + }, + { {"max_parallel_maintenance_workers", PGC_USERSET, RESOURCES_WORKER_PROCESSES, gettext_noop("Sets the maximum number of parallel processes per maintenance operation."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index a9d8293474a..bbf5307000f 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -683,6 +683,7 @@ autovacuum_worker_slots = 16 # autovacuum worker slots to allocate # (change requires restart) #autovacuum_max_workers = 3 # max number of autovacuum subprocesses +#autovacuum_max_parallel_workers = 0 # disabled by default and limited by max_worker_processes #autovacuum_naptime = 1min # time between autovacuum runs #autovacuum_vacuum_threshold = 50 # min number of row updates before # vacuum diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 1bef98471c3..85926415657 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -177,6 +177,7 @@ extern PGDLLIMPORT int MaxBackends; extern PGDLLIMPORT int MaxConnections; extern PGDLLIMPORT int max_worker_processes; extern PGDLLIMPORT int max_parallel_workers; +extern PGDLLIMPORT int autovacuum_max_parallel_workers; extern PGDLLIMPORT int commit_timestamp_buffers; extern PGDLLIMPORT int multixact_member_buffers; diff --git a/src/include/postmaster/autovacuum.h b/src/include/postmaster/autovacuum.h index e8135f41a1c..863d206f2bd 100644 --- a/src/include/postmaster/autovacuum.h +++ b/src/include/postmaster/autovacuum.h @@ -64,6 +64,10 @@ pg_noreturn extern void AutoVacWorkerMain(const void *startup_data, size_t start extern bool AutoVacuumRequestWork(AutoVacuumWorkItemType type, Oid relationId, BlockNumber blkno); +/* parallel autovacuum stuff */ +extern int AutoVacuumReserveParallelWorkers(int nworkers); +extern void AutoVacuumReleaseParallelWorkers(int nworkers); + /* shared memory stuff */ extern Size AutoVacuumShmemSize(void); extern void AutoVacuumShmemInit(void); diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 82ac8646a8d..b45023a90b2 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -31,6 +31,8 @@ extern void assign_application_name(const char *newval, void *extra); extern const char *show_archive_command(void); extern bool check_autovacuum_work_mem(int *newval, void **extra, GucSource source); +extern bool check_autovacuum_max_parallel_workers(int *newval, void **extra, + GucSource source); extern bool check_vacuum_buffer_usage_limit(int *newval, void **extra, GucSource source); extern bool check_backtrace_functions(char **newval, void **extra, diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index b552359915f..29c32f75780 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -311,6 +311,8 @@ typedef struct ForeignKeyCacheInfo typedef struct AutoVacOpts { bool enabled; + int parallel_autovacuum_workers; /* max number of parallel + autovacuum workers */ int vacuum_threshold; int vacuum_max_threshold; int vacuum_ins_threshold; -- 2.43.0