Hi all, The parallel vacuum we have today supports only for index vacuuming. Therefore, while multiple workers can work on different indexes in parallel, the heap table is always processed by the single process. I'd like to propose $subject, which enables us to have multiple workers running on the single heap table. This would be helpful to speedup vacuuming for tables without indexes or tables with INDEX_CLENAUP = off.
I've attached a PoC patch for this feature. It implements only parallel heap scans in lazyvacum. We can extend this feature to support parallel heap vacuum as well in the future or in the same patch. # Overall idea (for parallel heap scan in lazy vacuum) At the beginning of vacuum, we determine how many workers to launch based on the table size like other parallel query operations. The number of workers is capped by max_parallel_maitenance_workers. Once we decided to use parallel heap scan, we prepared DSM to share data among parallel workers and leader. The information include at least the vacuum option such as aggressive, the counters collected during lazy vacuum such as scanned_pages, vacuum cutoff such as VacuumCutoffs and GlobalVisState, and parallel scan description. Before starting heap scan in lazy vacuum, we launch parallel workers and then each worker (and the leader) process different blocks. Each worker does HOT-pruning on pages and collects dead tuple TIDs. When adding dead tuple TIDs, workers need to hold an exclusive lock on TidStore. At the end of heap scan phase, workers exit and the leader will wait for all workers to exit. After that, the leader process gather the counters collected by parallel workers, and compute the oldest relfrozenxid (and relminmxid). Then if parallel index vacuum is also enabled, we launch other parallel workers for parallel index vacuuming. When it comes to parallel heap scan in lazy vacuum, I think we can use the table_block_parallelscan_XXX() family. One tricky thing we need to deal with is that if the TideStore memory usage reaches the limit, we stop the parallel scan, do index vacuum and table vacuum, and then resume the parallel scan from the previous state. In order to do that, in the patch, we store ParallelBlockTableScanWorker, per-worker parallel scan state, into DSM so that different parallel workers can resume the scan using the same parallel scan state. In addition to that, since we could end up launching fewer workers than requested, it could happen that some ParallelBlockTableScanWorker data is used once and never be used while remaining unprocessed blocks. To handle this case, in the patch, the leader process checks at the end of the parallel scan if there is an uncompleted parallel scan. If so, the leader process does the scan using worker's ParallelBlockTableScanWorker data on behalf of workers. # Discussions I'm somewhat convinced the brief design of this feature, but there are some points regarding the implementation we need to discuss. In the patch, I extended vacuumparalle.c to support parallel table scan (and vacuum in the future). So I was required to add some table AM callbacks such as DSM size estimation, DSM initialization, and actual table scans etc. We need to verify these APIs are appropriate. Specifically, if we want to support both parallel heap scan and parallel heap vacuum, do we want to add separate callbacks for them? It could be overkill since such a 2-pass vacuum strategy is specific to heap AM. As another implementation idea, we might want to implement parallel heap scan/vacuum in lazyvacuum.c while minimizing changes for vacuumparallel.c. That way, we would not need to add table AM callbacks. However, we would end up having duplicate codes related to parallel operation in vacuum such as vacuum delays. Also, we might need to add some functions to share GlobalVisState among parallel workers, since GlobalVisState is a private struct. Other points I'm somewhat uncomfortable with or need to be discussed remain in the code with XXX comments. # Benchmark results * Test-1: parallel heap scan on the table without indexes I created 20GB table, made garbage on the table, and run vacuum while changing parallel degree: create unlogged table test (a int) with (autovacuum_enabled = off); insert into test select generate_series(1, 600000000); --- 20GB table delete from test where a % 5 = 0; vacuum (verbose, parallel 0) test; Here are the results (total time and heap scan time): PARALLEL 0: 21.99 s (single process) PARALLEL 1: 11.39 s PARALLEL 2: 8.36 s PARALLEL 3: 6.14 s PARALLEL 4: 5.08 s * Test-2: parallel heap scan on the table with one index I used a similar table to the test case 1 but created one btree index on it: create unlogged table test (a int) with (autovacuum_enabled = off); insert into test select generate_series(1, 600000000); --- 20GB table create index on test (a); delete from test where a % 5 = 0; vacuum (verbose, parallel 0) test; I've measured the total execution time as well as the time of each vacuum phase (from left heap scan time, index vacuum time, and heap vacuum time): PARALLEL 0: 45.11 s (21.89, 16.74, 6.48) PARALLEL 1: 42.13 s (12.75, 22.04, 7.23) PARALLEL 2: 39.27 s (8.93, 22.78, 7.45) PARALLEL 3: 36.53 s (6.76, 22.00, 7.65) PARALLEL 4: 35.84 s (5.85, 22.04, 7.83) Overall, I can see the parallel heap scan in lazy vacuum has a decent scalability; In both test-1 and test-2, the execution time of heap scan got ~4x faster with 4 parallel workers. On the other hand, when it comes to the total vacuum execution time, I could not see much performance improvement in test-2 (45.11 vs. 35.84). Looking at the results PARALLEL 0 vs. PARALLEL 1 in test-2, the heap scan got faster (21.89 vs. 12.75) whereas index vacuum got slower (16.74 vs. 22.04), and heap scan in case 2 was not as fast as in case 1 with 1 parallel worker (12.75 vs. 11.39). I think the reason is the shared TidStore is not very scalable since we have a single lock on it. In all cases in the test-1, we don't use the shared TidStore since all dead tuples are removed during heap pruning. So the scalability was better overall than in test-2. In parallel 0 case in test-2, we use the local TidStore, and from parallel degree of 1 in test-2, we use the shared TidStore and parallel worker concurrently update it. Also, I guess that the lookup performance of the local TidStore is better than the shared TidStore's lookup performance because of the differences between a bump context and an DSA area. I think that this difference contributed the fact that index vacuuming got slower (16.74 vs. 22.04). There are two obvious improvement ideas to improve overall vacuum execution time: (1) improve the shared TidStore scalability and (2) support parallel heap vacuum. For (1), several ideas are proposed by the ART authors[1]. I've not tried these ideas but it might be applicable to our ART implementation. But I prefer to start with (2) since it would be easier. Feedback is very welcome. Regards, [1] https://db.in.tum.de/~leis/papers/artsync.pdf -- Masahiko Sawada Amazon Web Services: https://aws.amazon.com
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 6f8b1b7929..cf8c6614cd 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2630,6 +2630,12 @@ static const TableAmRoutine heapam_methods = { .relation_copy_data = heapam_relation_copy_data, .relation_copy_for_cluster = heapam_relation_copy_for_cluster, .relation_vacuum = heap_vacuum_rel, + + .parallel_vacuum_compute_workers = heap_parallel_vacuum_compute_workers, + .parallel_vacuum_estimate = heap_parallel_vacuum_estimate, + .parallel_vacuum_initialize = heap_parallel_vacuum_initialize, + .parallel_vacuum_scan_worker = heap_parallel_vacuum_scan_worker, + .scan_analyze_next_block = heapam_scan_analyze_next_block, .scan_analyze_next_tuple = heapam_scan_analyze_next_tuple, .index_build_range_scan = heapam_index_build_range_scan, diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 3f88cf1e8e..4ccf15ffe3 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -49,6 +49,7 @@ #include "common/int.h" #include "executor/instrument.h" #include "miscadmin.h" +#include "optimizer/paths.h" #include "pgstat.h" #include "portability/instr_time.h" #include "postmaster/autovacuum.h" @@ -117,10 +118,22 @@ #define PREFETCH_SIZE ((BlockNumber) 32) /* - * Macro to check if we are in a parallel vacuum. If true, we are in the - * parallel mode and the DSM segment is initialized. + * DSM keys for heap parallel vacuum scan. Unlike other parallel execution code, we + * we don't need to worry about DSM keys conflicting with plan_node_id, but need to + * avoid conflicting with DSM keys used in vacuumparallel.c. + */ +#define LV_PARALLEL_SCAN_SHARED 0xFFFF0001 +#define LV_PARALLEL_SCAN_DESC 0xFFFF0002 +#define LV_PARALLEL_SCAN_DESC_WORKER 0xFFFF0003 + +/* + * Macro to check if we are in a parallel vacuum. If ParallelVacuumIsActive() is + * true, we are in the parallel mode, meaning that we do either parallel index + * vacuuming or parallel table vacuuming, or both. If ParallelHeapVacuumIsActive() + * is true, we do at least parallel table vacuuming. */ #define ParallelVacuumIsActive(vacrel) ((vacrel)->pvs != NULL) +#define ParallelHeapVacuumIsActive(vacrel) ((vacrel)->phvstate != NULL) /* Phases of vacuum during which we report error context. */ typedef enum @@ -133,6 +146,80 @@ typedef enum VACUUM_ERRCB_PHASE_TRUNCATE, } VacErrPhase; +/* + * Relation statistics collected during heap scanning and need to be shared among + * parallel vacuum workers. + */ +typedef struct LVRelCounters +{ + BlockNumber scanned_pages; /* # pages examined (not skipped via VM) */ + BlockNumber removed_pages; /* # pages removed by relation truncation */ + BlockNumber frozen_pages; /* # pages with newly frozen tuples */ + BlockNumber lpdead_item_pages; /* # pages with LP_DEAD items */ + BlockNumber missed_dead_pages; /* # pages with missed dead tuples */ + BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */ + + /* Counters that follow are only for scanned_pages */ + int64 tuples_deleted; /* # deleted from table */ + int64 tuples_frozen; /* # newly frozen */ + int64 lpdead_items; /* # deleted from indexes */ + int64 live_tuples; /* # live tuples remaining */ + int64 recently_dead_tuples; /* # dead, but not yet removable */ + int64 missed_dead_tuples; /* # removable, but not removed */ + + /* Tracks oldest extant XID/MXID for setting relfrozenxid/relminmxid. */ + TransactionId NewRelfrozenXid; + MultiXactId NewRelminMxid; + bool skippedallvis; +} LVRelCounters; + +/* + * Struct for information that need to be shared among parallel vacuum workers + */ +typedef struct PHVShared +{ + bool aggressive; + bool skipwithvm; + + /* The initial values shared by the leader process */ + TransactionId NewRelfrozenXid; + MultiXactId NewRelminMxid; + bool skippedallvis; + + /* VACUUM operation's cutoffs for freezing and pruning */ + struct VacuumCutoffs cutoffs; + GlobalVisState vistest; + + LVRelCounters worker_relcnts[FLEXIBLE_ARRAY_MEMBER]; +} PHVShared; +#define SizeOfPHVShared (offsetof(PHVShared, worker_relcnts)) + +/* Per-worker scan state */ +typedef struct PHVScanWorkerState +{ + ParallelBlockTableScanWorkerData state; + bool maybe_have_blocks; +} PHVScanWorkerState; + +/* Struct for parallel heap vacuum */ +typedef struct PHVState +{ + /* Parallel scan description shared among parallel workers */ + ParallelBlockTableScanDesc pscandesc; + + /* Shared information */ + PHVShared *shared; + + /* Per-worker scan state */ + PHVScanWorkerState *myscanstate; + + /* Points to all per-worker scan state array */ + PHVScanWorkerState *scanstates; + + /* The number of workers launched for parallel heap vacuum */ + int nworkers_launched; +} PHVState; + typedef struct LVRelState { /* Target heap relation and its indexes */ @@ -144,6 +231,12 @@ typedef struct LVRelState BufferAccessStrategy bstrategy; ParallelVacuumState *pvs; + /* Parallel heap vacuum state and sizes for each struct */ + PHVState *phvstate; + Size pscan_len; + Size shared_len; + Size pscanwork_len; + /* Aggressive VACUUM? (must set relfrozenxid >= FreezeLimit) */ bool aggressive; /* Use visibility map to skip? (disabled by DISABLE_PAGE_SKIPPING) */ @@ -159,10 +252,6 @@ typedef struct LVRelState /* VACUUM operation's cutoffs for freezing and pruning */ struct VacuumCutoffs cutoffs; GlobalVisState *vistest; - /* Tracks oldest extant XID/MXID for setting relfrozenxid/relminmxid */ - TransactionId NewRelfrozenXid; - MultiXactId NewRelminMxid; - bool skippedallvis; /* Error reporting state */ char *dbname; @@ -188,12 +277,10 @@ typedef struct LVRelState VacDeadItemsInfo *dead_items_info; BlockNumber rel_pages; /* total number of pages */ - BlockNumber scanned_pages; /* # pages examined (not skipped via VM) */ - BlockNumber removed_pages; /* # pages removed by relation truncation */ - BlockNumber frozen_pages; /* # pages with newly frozen tuples */ - BlockNumber lpdead_item_pages; /* # pages with LP_DEAD items */ - BlockNumber missed_dead_pages; /* # pages with missed dead tuples */ - BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */ + BlockNumber next_fsm_block_to_vacuum; + + /* Block and tuple counters for the relation */ + LVRelCounters *counters; /* Statistics output by us, for table */ double new_rel_tuples; /* new estimated total # of tuples */ @@ -203,13 +290,6 @@ typedef struct LVRelState /* Instrumentation counters */ int num_index_scans; - /* Counters that follow are only for scanned_pages */ - int64 tuples_deleted; /* # deleted from table */ - int64 tuples_frozen; /* # newly frozen */ - int64 lpdead_items; /* # deleted from indexes */ - int64 live_tuples; /* # live tuples remaining */ - int64 recently_dead_tuples; /* # dead, but not yet removable */ - int64 missed_dead_tuples; /* # removable, but not removed */ /* State maintained by heap_vac_scan_next_block() */ BlockNumber current_block; /* last block returned */ @@ -229,6 +309,7 @@ typedef struct LVSavedErrInfo /* non-export function prototypes */ static void lazy_scan_heap(LVRelState *vacrel); +static bool do_lazy_scan_heap(LVRelState *vacrel); static bool heap_vac_scan_next_block(LVRelState *vacrel, BlockNumber *blkno, bool *all_visible_according_to_vm); static void find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis); @@ -271,6 +352,12 @@ static void dead_items_cleanup(LVRelState *vacrel); static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf, TransactionId *visibility_cutoff_xid, bool *all_frozen); static void update_relstats_all_indexes(LVRelState *vacrel); + + +static void do_parallel_lazy_scan_heap(LVRelState *vacrel); +static void parallel_heap_vacuum_gather_scan_stats(LVRelState *vacrel); +static void parallel_heap_complete_unfinised_scan(LVRelState *vacrel); + static void vacuum_error_callback(void *arg); static void update_vacuum_error_info(LVRelState *vacrel, LVSavedErrInfo *saved_vacrel, @@ -296,6 +383,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, BufferAccessStrategy bstrategy) { LVRelState *vacrel; + LVRelCounters *counters; bool verbose, instrument, skipwithvm, @@ -406,14 +494,28 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, Assert(params->index_cleanup == VACOPTVALUE_AUTO); } + vacrel->next_fsm_block_to_vacuum = 0; + /* Initialize page counters explicitly (be tidy) */ - vacrel->scanned_pages = 0; - vacrel->removed_pages = 0; - vacrel->frozen_pages = 0; - vacrel->lpdead_item_pages = 0; - vacrel->missed_dead_pages = 0; - vacrel->nonempty_pages = 0; - /* dead_items_alloc allocates vacrel->dead_items later on */ + counters = palloc(sizeof(LVRelCounters)); + counters->scanned_pages = 0; + counters->removed_pages = 0; + counters->frozen_pages = 0; + counters->lpdead_item_pages = 0; + counters->missed_dead_pages = 0; + counters->nonempty_pages = 0; + + /* Initialize remaining counters (be tidy) */ + counters->tuples_deleted = 0; + counters->tuples_frozen = 0; + counters->lpdead_items = 0; + counters->live_tuples = 0; + counters->recently_dead_tuples = 0; + counters->missed_dead_tuples = 0; + + vacrel->counters = counters; + + vacrel->num_index_scans = 0; /* Allocate/initialize output statistics state */ vacrel->new_rel_tuples = 0; @@ -421,14 +523,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, vacrel->indstats = (IndexBulkDeleteResult **) palloc0(vacrel->nindexes * sizeof(IndexBulkDeleteResult *)); - /* Initialize remaining counters (be tidy) */ - vacrel->num_index_scans = 0; - vacrel->tuples_deleted = 0; - vacrel->tuples_frozen = 0; - vacrel->lpdead_items = 0; - vacrel->live_tuples = 0; - vacrel->recently_dead_tuples = 0; - vacrel->missed_dead_tuples = 0; + /* dead_items_alloc allocates vacrel->dead_items later on */ /* * Get cutoffs that determine which deleted tuples are considered DEAD, @@ -450,9 +545,9 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, vacrel->rel_pages = orig_rel_pages = RelationGetNumberOfBlocks(rel); vacrel->vistest = GlobalVisTestFor(rel); /* Initialize state used to track oldest extant XID/MXID */ - vacrel->NewRelfrozenXid = vacrel->cutoffs.OldestXmin; - vacrel->NewRelminMxid = vacrel->cutoffs.OldestMxact; - vacrel->skippedallvis = false; + vacrel->counters->NewRelfrozenXid = vacrel->cutoffs.OldestXmin; + vacrel->counters->NewRelminMxid = vacrel->cutoffs.OldestMxact; + vacrel->counters->skippedallvis = false; skipwithvm = true; if (params->options & VACOPT_DISABLE_PAGE_SKIPPING) { @@ -533,15 +628,15 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, * value >= FreezeLimit, and relminmxid to a value >= MultiXactCutoff. * Non-aggressive VACUUMs may advance them by any amount, or not at all. */ - Assert(vacrel->NewRelfrozenXid == vacrel->cutoffs.OldestXmin || + Assert(vacrel->counters->NewRelfrozenXid == vacrel->cutoffs.OldestXmin || TransactionIdPrecedesOrEquals(vacrel->aggressive ? vacrel->cutoffs.FreezeLimit : vacrel->cutoffs.relfrozenxid, - vacrel->NewRelfrozenXid)); - Assert(vacrel->NewRelminMxid == vacrel->cutoffs.OldestMxact || + vacrel->counters->NewRelfrozenXid)); + Assert(vacrel->counters->NewRelminMxid == vacrel->cutoffs.OldestMxact || MultiXactIdPrecedesOrEquals(vacrel->aggressive ? vacrel->cutoffs.MultiXactCutoff : vacrel->cutoffs.relminmxid, - vacrel->NewRelminMxid)); - if (vacrel->skippedallvis) + vacrel->counters->NewRelminMxid)); + if (vacrel->counters->skippedallvis) { /* * Must keep original relfrozenxid in a non-aggressive VACUUM that @@ -549,8 +644,8 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, * values will have missed unfrozen XIDs from the pages we skipped. */ Assert(!vacrel->aggressive); - vacrel->NewRelfrozenXid = InvalidTransactionId; - vacrel->NewRelminMxid = InvalidMultiXactId; + vacrel->counters->NewRelfrozenXid = InvalidTransactionId; + vacrel->counters->NewRelminMxid = InvalidMultiXactId; } /* @@ -571,7 +666,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, */ vac_update_relstats(rel, new_rel_pages, vacrel->new_live_tuples, new_rel_allvisible, vacrel->nindexes > 0, - vacrel->NewRelfrozenXid, vacrel->NewRelminMxid, + vacrel->counters->NewRelfrozenXid, vacrel->counters->NewRelminMxid, &frozenxid_updated, &minmulti_updated, false); /* @@ -587,8 +682,8 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, pgstat_report_vacuum(RelationGetRelid(rel), rel->rd_rel->relisshared, Max(vacrel->new_live_tuples, 0), - vacrel->recently_dead_tuples + - vacrel->missed_dead_tuples); + vacrel->counters->recently_dead_tuples + + vacrel->counters->missed_dead_tuples); pgstat_progress_end_command(); if (instrument) @@ -651,21 +746,21 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, vacrel->relname, vacrel->num_index_scans); appendStringInfo(&buf, _("pages: %u removed, %u remain, %u scanned (%.2f%% of total)\n"), - vacrel->removed_pages, + vacrel->counters->removed_pages, new_rel_pages, - vacrel->scanned_pages, + vacrel->counters->scanned_pages, orig_rel_pages == 0 ? 100.0 : - 100.0 * vacrel->scanned_pages / orig_rel_pages); + 100.0 * vacrel->counters->scanned_pages / orig_rel_pages); appendStringInfo(&buf, _("tuples: %lld removed, %lld remain, %lld are dead but not yet removable\n"), - (long long) vacrel->tuples_deleted, + (long long) vacrel->counters->tuples_deleted, (long long) vacrel->new_rel_tuples, - (long long) vacrel->recently_dead_tuples); - if (vacrel->missed_dead_tuples > 0) + (long long) vacrel->counters->recently_dead_tuples); + if (vacrel->counters->missed_dead_tuples > 0) appendStringInfo(&buf, _("tuples missed: %lld dead from %u pages not removed due to cleanup lock contention\n"), - (long long) vacrel->missed_dead_tuples, - vacrel->missed_dead_pages); + (long long) vacrel->counters->missed_dead_tuples, + vacrel->counters->missed_dead_pages); diff = (int32) (ReadNextTransactionId() - vacrel->cutoffs.OldestXmin); appendStringInfo(&buf, @@ -673,25 +768,25 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, vacrel->cutoffs.OldestXmin, diff); if (frozenxid_updated) { - diff = (int32) (vacrel->NewRelfrozenXid - + diff = (int32) (vacrel->counters->NewRelfrozenXid - vacrel->cutoffs.relfrozenxid); appendStringInfo(&buf, _("new relfrozenxid: %u, which is %d XIDs ahead of previous value\n"), - vacrel->NewRelfrozenXid, diff); + vacrel->counters->NewRelfrozenXid, diff); } if (minmulti_updated) { - diff = (int32) (vacrel->NewRelminMxid - + diff = (int32) (vacrel->counters->NewRelminMxid - vacrel->cutoffs.relminmxid); appendStringInfo(&buf, _("new relminmxid: %u, which is %d MXIDs ahead of previous value\n"), - vacrel->NewRelminMxid, diff); + vacrel->counters->NewRelminMxid, diff); } appendStringInfo(&buf, _("frozen: %u pages from table (%.2f%% of total) had %lld tuples frozen\n"), - vacrel->frozen_pages, + vacrel->counters->frozen_pages, orig_rel_pages == 0 ? 100.0 : - 100.0 * vacrel->frozen_pages / orig_rel_pages, - (long long) vacrel->tuples_frozen); + 100.0 * vacrel->counters->frozen_pages / orig_rel_pages, + (long long) vacrel->counters->tuples_frozen); if (vacrel->do_index_vacuuming) { if (vacrel->nindexes == 0 || vacrel->num_index_scans == 0) @@ -711,10 +806,10 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, msgfmt = _("%u pages from table (%.2f%% of total) have %lld dead item identifiers\n"); } appendStringInfo(&buf, msgfmt, - vacrel->lpdead_item_pages, + vacrel->counters->lpdead_item_pages, orig_rel_pages == 0 ? 100.0 : - 100.0 * vacrel->lpdead_item_pages / orig_rel_pages, - (long long) vacrel->lpdead_items); + 100.0 * vacrel->counters->lpdead_item_pages / orig_rel_pages, + (long long) vacrel->counters->lpdead_items); for (int i = 0; i < vacrel->nindexes; i++) { IndexBulkDeleteResult *istat = vacrel->indstats[i]; @@ -815,14 +910,8 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, static void lazy_scan_heap(LVRelState *vacrel) { - BlockNumber rel_pages = vacrel->rel_pages, - blkno, - next_fsm_block_to_vacuum = 0; - bool all_visible_according_to_vm; - - TidStore *dead_items = vacrel->dead_items; + BlockNumber rel_pages = vacrel->rel_pages; VacDeadItemsInfo *dead_items_info = vacrel->dead_items_info; - Buffer vmbuffer = InvalidBuffer; const int initprog_index[] = { PROGRESS_VACUUM_PHASE, PROGRESS_VACUUM_TOTAL_HEAP_BLKS, @@ -842,14 +931,78 @@ lazy_scan_heap(LVRelState *vacrel) vacrel->next_unskippable_allvis = false; vacrel->next_unskippable_vmbuffer = InvalidBuffer; + if (ParallelHeapVacuumIsActive(vacrel)) + do_parallel_lazy_scan_heap(vacrel); + else + do_lazy_scan_heap(vacrel); + + vacrel->blkno = InvalidBlockNumber; + + /* report that everything is now scanned */ + pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, rel_pages); + + /* now we can compute the new value for pg_class.reltuples */ + vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages, + vacrel->counters->scanned_pages, + vacrel->counters->live_tuples); + + /* + * Also compute the total number of surviving heap entries. In the + * (unlikely) scenario that new_live_tuples is -1, take it as zero. + */ + vacrel->new_rel_tuples = + Max(vacrel->new_live_tuples, 0) + vacrel->counters->recently_dead_tuples + + vacrel->counters->missed_dead_tuples; + + /* + * Do index vacuuming (call each index's ambulkdelete routine), then do + * related heap vacuuming + */ + if (dead_items_info->num_items > 0) + lazy_vacuum(vacrel); + + /* + * Vacuum the remainder of the Free Space Map. We must do this whether or + * not there were indexes, and whether or not we bypassed index vacuuming. + */ + if (rel_pages > vacrel->next_fsm_block_to_vacuum) + FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum, + rel_pages); + + /* report all blocks vacuumed */ + pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, rel_pages); + + /* Do final index cleanup (call each index's amvacuumcleanup routine) */ + if (vacrel->nindexes > 0 && vacrel->do_index_cleanup) + lazy_cleanup_all_indexes(vacrel); +} + +/* + * Workhorse for lazy_scan_heap(). + * + * Return true if we processed all blocks, otherwise false if we exit from this function + * while not completing the heap scan due to full of dead item TIDs. In serial heap scan + * case, this function always returns true. In parallel heap vacuum scan, this function + * is called by both worker processes and the leader process, and could return false. + */ +static bool +do_lazy_scan_heap(LVRelState *vacrel) +{ + bool all_visible_according_to_vm; + TidStore *dead_items = vacrel->dead_items; + VacDeadItemsInfo *dead_items_info = vacrel->dead_items_info; + BlockNumber blkno; + Buffer vmbuffer = InvalidBuffer; + bool scan_done = true; + while (heap_vac_scan_next_block(vacrel, &blkno, &all_visible_according_to_vm)) { - Buffer buf; - Page page; - bool has_lpdead_items; - bool got_cleanup_lock = false; + Buffer buf; + Page page; + bool has_lpdead_items; + bool got_cleanup_lock = false; - vacrel->scanned_pages++; + vacrel->counters->scanned_pages++; /* Report as block scanned, update error traceback information */ pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno); @@ -867,46 +1020,10 @@ lazy_scan_heap(LVRelState *vacrel) * one-pass strategy, and the two-pass strategy with the index_cleanup * param set to 'off'. */ - if (vacrel->scanned_pages % FAILSAFE_EVERY_PAGES == 0) + if (!IsParallelWorker() && + vacrel->counters->scanned_pages % FAILSAFE_EVERY_PAGES == 0) lazy_check_wraparound_failsafe(vacrel); - /* - * Consider if we definitely have enough space to process TIDs on page - * already. If we are close to overrunning the available space for - * dead_items TIDs, pause and do a cycle of vacuuming before we tackle - * this page. - */ - if (TidStoreMemoryUsage(dead_items) > dead_items_info->max_bytes) - { - /* - * Before beginning index vacuuming, we release any pin we may - * hold on the visibility map page. This isn't necessary for - * correctness, but we do it anyway to avoid holding the pin - * across a lengthy, unrelated operation. - */ - if (BufferIsValid(vmbuffer)) - { - ReleaseBuffer(vmbuffer); - vmbuffer = InvalidBuffer; - } - - /* Perform a round of index and heap vacuuming */ - vacrel->consider_bypass_optimization = false; - lazy_vacuum(vacrel); - - /* - * Vacuum the Free Space Map to make newly-freed space visible on - * upper-level FSM pages. Note we have not yet processed blkno. - */ - FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, - blkno); - next_fsm_block_to_vacuum = blkno; - - /* Report that we are once again scanning the heap */ - pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, - PROGRESS_VACUUM_PHASE_SCAN_HEAP); - } - /* * Pin the visibility map page in case we need to mark the page * all-visible. In most cases this will be very cheap, because we'll @@ -994,10 +1111,14 @@ lazy_scan_heap(LVRelState *vacrel) * also be no opportunity to update the FSM later, because we'll never * revisit this page. Since updating the FSM is desirable but not * absolutely required, that's OK. + * + * XXX: in parallel heap scan, some blocks before blkno might not + * been processed yet. Is it worth vacuuming FSM? */ - if (vacrel->nindexes == 0 - || !vacrel->do_index_vacuuming - || !has_lpdead_items) + if (!IsParallelWorker() && + (vacrel->nindexes == 0 + || !vacrel->do_index_vacuuming + || !has_lpdead_items)) { Size freespace = PageGetHeapFreeSpace(page); @@ -1011,57 +1132,144 @@ lazy_scan_heap(LVRelState *vacrel) * held the cleanup lock and lazy_scan_prune() was called. */ if (got_cleanup_lock && vacrel->nindexes == 0 && has_lpdead_items && - blkno - next_fsm_block_to_vacuum >= VACUUM_FSM_EVERY_PAGES) + blkno - vacrel->next_fsm_block_to_vacuum >= VACUUM_FSM_EVERY_PAGES) { - FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, + FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum, blkno); - next_fsm_block_to_vacuum = blkno; + vacrel->next_fsm_block_to_vacuum = blkno; } } else UnlockReleaseBuffer(buf); + + /* + * Consider if we definitely have enough space to process TIDs on page + * already. If we are close to overrunning the available space for + * dead_items TIDs, pause and do a cycle of vacuuming before we tackle + * this page. + */ + if (TidStoreMemoryUsage(dead_items) > dead_items_info->max_bytes) + { + /* + * Before beginning index vacuuming, we release any pin we may + * hold on the visibility map page. This isn't necessary for + * correctness, but we do it anyway to avoid holding the pin + * across a lengthy, unrelated operation. + */ + if (BufferIsValid(vmbuffer)) + { + ReleaseBuffer(vmbuffer); + vmbuffer = InvalidBuffer; + } + + if (ParallelHeapVacuumIsActive(vacrel)) + { + /* + * In parallel heap vacuum case, both the leader process and the + * worker processes have to exit without invoking index and heap + * vacuuming. The leader process will wait for all workers to + * finish and perform index and heap vacuuming. + */ + scan_done = false; + break; + } + + /* Perform a round of index and heap vacuuming */ + vacrel->consider_bypass_optimization = false; + lazy_vacuum(vacrel); + + /* + * Vacuum the Free Space Map to make newly-freed space visible on + * upper-level FSM pages. + * + * XXX: in parallel heap scan, some blocks before blkno might not + * been processed yet. Is it worth vacuuming FSM? + */ + FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum, + blkno + 1); + vacrel->next_fsm_block_to_vacuum = blkno; + + /* Report that we are once again scanning the heap */ + pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_PHASE_SCAN_HEAP); + + continue; + } } - vacrel->blkno = InvalidBlockNumber; if (BufferIsValid(vmbuffer)) ReleaseBuffer(vmbuffer); - /* report that everything is now scanned */ - pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno); + return scan_done; +} - /* now we can compute the new value for pg_class.reltuples */ - vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages, - vacrel->scanned_pages, - vacrel->live_tuples); +/* + * A parallel scan variant of heap_vac_scan_next_block. + * + * In parallel vacuum scan, we don't use the SKIP_PAGES_THRESHOLD optimization. + */ +static bool +heap_vac_scan_next_block_parallel(LVRelState *vacrel, BlockNumber *blkno, + bool *all_visible_according_to_vm) +{ + PHVState *phvstate = vacrel->phvstate; + BlockNumber next_block; + Buffer vmbuffer = InvalidBuffer; + uint8 mapbits = 0; - /* - * Also compute the total number of surviving heap entries. In the - * (unlikely) scenario that new_live_tuples is -1, take it as zero. - */ - vacrel->new_rel_tuples = - Max(vacrel->new_live_tuples, 0) + vacrel->recently_dead_tuples + - vacrel->missed_dead_tuples; + Assert(ParallelHeapVacuumIsActive(vacrel)); - /* - * Do index vacuuming (call each index's ambulkdelete routine), then do - * related heap vacuuming - */ - if (dead_items_info->num_items > 0) - lazy_vacuum(vacrel); + for (;;) + { + next_block = table_block_parallelscan_nextpage(vacrel->rel, + &(phvstate->myscanstate->state), + phvstate->pscandesc); - /* - * Vacuum the remainder of the Free Space Map. We must do this whether or - * not there were indexes, and whether or not we bypassed index vacuuming. - */ - if (blkno > next_fsm_block_to_vacuum) - FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, blkno); + /* Have we reached the end of the table? */ + if (!BlockNumberIsValid(next_block) || next_block >= vacrel->rel_pages) + { + if (BufferIsValid(vmbuffer)) + ReleaseBuffer(vmbuffer); - /* report all blocks vacuumed */ - pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno); + *blkno = vacrel->rel_pages; + return false; + } - /* Do final index cleanup (call each index's amvacuumcleanup routine) */ - if (vacrel->nindexes > 0 && vacrel->do_index_cleanup) - lazy_cleanup_all_indexes(vacrel); + /* We always treat the last block as unsafe to skip */ + if (next_block == vacrel->rel_pages - 1) + break; + + mapbits = visibilitymap_get_status(vacrel->rel, next_block, &vmbuffer); + + /* DISABLE_PAGE_SKIPPING makes all skipping unsafe */ + if (!vacrel->skipwithvm) + break; + + /* + * Aggressive VACUUM caller can't skip pages just because they are + * all-visible. + */ + if ((mapbits & VISIBILITYMAP_ALL_FROZEN) == 0) + { + + if (vacrel->aggressive) + break; + + /* + * All-visible block is safe to skip in non-aggressive case. But + * remember that the final range contains such a block for later. + */ + vacrel->counters->skippedallvis = true; + } + } + + if (BufferIsValid(vmbuffer)) + ReleaseBuffer(vmbuffer); + + *blkno = next_block; + *all_visible_according_to_vm = (mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0; + + return true; } /* @@ -1088,6 +1296,9 @@ heap_vac_scan_next_block(LVRelState *vacrel, BlockNumber *blkno, { BlockNumber next_block; + if (ParallelHeapVacuumIsActive(vacrel)) + return heap_vac_scan_next_block_parallel(vacrel, blkno, all_visible_according_to_vm); + /* relies on InvalidBlockNumber + 1 overflowing to 0 on first call */ next_block = vacrel->current_block + 1; @@ -1137,7 +1348,7 @@ heap_vac_scan_next_block(LVRelState *vacrel, BlockNumber *blkno, { next_block = vacrel->next_unskippable_block; if (skipsallvis) - vacrel->skippedallvis = true; + vacrel->counters->skippedallvis = true; } } @@ -1210,7 +1421,7 @@ find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis) /* * Caller must scan the last page to determine whether it has tuples - * (caller must have the opportunity to set vacrel->nonempty_pages). + * (caller must have the opportunity to set vacrel->counters->nonempty_pages). * This rule avoids having lazy_truncate_heap() take access-exclusive * lock on rel to attempt a truncation that fails anyway, just because * there are tuples on the last page (it is likely that there will be @@ -1439,10 +1650,10 @@ lazy_scan_prune(LVRelState *vacrel, heap_page_prune_and_freeze(rel, buf, vacrel->vistest, prune_options, &vacrel->cutoffs, &presult, PRUNE_VACUUM_SCAN, &vacrel->offnum, - &vacrel->NewRelfrozenXid, &vacrel->NewRelminMxid); + &vacrel->counters->NewRelfrozenXid, &vacrel->counters->NewRelminMxid); Assert(MultiXactIdIsValid(vacrel->NewRelminMxid)); - Assert(TransactionIdIsValid(vacrel->NewRelfrozenXid)); + Assert(TransactionIdIsValid(vacrel->counters->NewRelfrozenXid)); if (presult.nfrozen > 0) { @@ -1451,7 +1662,7 @@ lazy_scan_prune(LVRelState *vacrel, * nfrozen == 0, since it only counts pages with newly frozen tuples * (don't confuse that with pages newly set all-frozen in VM). */ - vacrel->frozen_pages++; + vacrel->counters->frozen_pages++; } /* @@ -1486,7 +1697,7 @@ lazy_scan_prune(LVRelState *vacrel, */ if (presult.lpdead_items > 0) { - vacrel->lpdead_item_pages++; + vacrel->counters->lpdead_item_pages++; /* * deadoffsets are collected incrementally in @@ -1501,15 +1712,15 @@ lazy_scan_prune(LVRelState *vacrel, } /* Finally, add page-local counts to whole-VACUUM counts */ - vacrel->tuples_deleted += presult.ndeleted; - vacrel->tuples_frozen += presult.nfrozen; - vacrel->lpdead_items += presult.lpdead_items; - vacrel->live_tuples += presult.live_tuples; - vacrel->recently_dead_tuples += presult.recently_dead_tuples; + vacrel->counters->tuples_deleted += presult.ndeleted; + vacrel->counters->tuples_frozen += presult.nfrozen; + vacrel->counters->lpdead_items += presult.lpdead_items; + vacrel->counters->live_tuples += presult.live_tuples; + vacrel->counters->recently_dead_tuples += presult.recently_dead_tuples; /* Can't truncate this page */ if (presult.hastup) - vacrel->nonempty_pages = blkno + 1; + vacrel->counters->nonempty_pages = blkno + 1; /* Did we find LP_DEAD items? */ *has_lpdead_items = (presult.lpdead_items > 0); @@ -1659,8 +1870,8 @@ lazy_scan_noprune(LVRelState *vacrel, missed_dead_tuples; bool hastup; HeapTupleHeader tupleheader; - TransactionId NoFreezePageRelfrozenXid = vacrel->NewRelfrozenXid; - MultiXactId NoFreezePageRelminMxid = vacrel->NewRelminMxid; + TransactionId NoFreezePageRelfrozenXid = vacrel->counters->NewRelfrozenXid; + MultiXactId NoFreezePageRelminMxid = vacrel->counters->NewRelminMxid; OffsetNumber deadoffsets[MaxHeapTuplesPerPage]; Assert(BufferGetBlockNumber(buf) == blkno); @@ -1787,8 +1998,8 @@ lazy_scan_noprune(LVRelState *vacrel, * this particular page until the next VACUUM. Remember its details now. * (lazy_scan_prune expects a clean slate, so we have to do this last.) */ - vacrel->NewRelfrozenXid = NoFreezePageRelfrozenXid; - vacrel->NewRelminMxid = NoFreezePageRelminMxid; + vacrel->counters->NewRelfrozenXid = NoFreezePageRelfrozenXid; + vacrel->counters->NewRelminMxid = NoFreezePageRelminMxid; /* Save any LP_DEAD items found on the page in dead_items */ if (vacrel->nindexes == 0) @@ -1815,25 +2026,25 @@ lazy_scan_noprune(LVRelState *vacrel, * indexes will be deleted during index vacuuming (and then marked * LP_UNUSED in the heap) */ - vacrel->lpdead_item_pages++; + vacrel->counters->lpdead_item_pages++; dead_items_add(vacrel, blkno, deadoffsets, lpdead_items); - vacrel->lpdead_items += lpdead_items; + vacrel->counters->lpdead_items += lpdead_items; } /* * Finally, add relevant page-local counts to whole-VACUUM counts */ - vacrel->live_tuples += live_tuples; - vacrel->recently_dead_tuples += recently_dead_tuples; - vacrel->missed_dead_tuples += missed_dead_tuples; + vacrel->counters->live_tuples += live_tuples; + vacrel->counters->recently_dead_tuples += recently_dead_tuples; + vacrel->counters->missed_dead_tuples += missed_dead_tuples; if (missed_dead_tuples > 0) - vacrel->missed_dead_pages++; + vacrel->counters->missed_dead_pages++; /* Can't truncate this page */ if (hastup) - vacrel->nonempty_pages = blkno + 1; + vacrel->counters->nonempty_pages = blkno + 1; /* Did we find LP_DEAD items? */ *has_lpdead_items = (lpdead_items > 0); @@ -1862,7 +2073,7 @@ lazy_vacuum(LVRelState *vacrel) /* Should not end up here with no indexes */ Assert(vacrel->nindexes > 0); - Assert(vacrel->lpdead_item_pages > 0); + Assert(vacrel->counters->lpdead_item_pages > 0); if (!vacrel->do_index_vacuuming) { @@ -1896,7 +2107,7 @@ lazy_vacuum(LVRelState *vacrel) BlockNumber threshold; Assert(vacrel->num_index_scans == 0); - Assert(vacrel->lpdead_items == vacrel->dead_items_info->num_items); + Assert(vacrel->counters->lpdead_items == vacrel->dead_items_info->num_items); Assert(vacrel->do_index_vacuuming); Assert(vacrel->do_index_cleanup); @@ -1923,7 +2134,7 @@ lazy_vacuum(LVRelState *vacrel) * cases then this may need to be reconsidered. */ threshold = (double) vacrel->rel_pages * BYPASS_THRESHOLD_PAGES; - bypass = (vacrel->lpdead_item_pages < threshold && + bypass = (vacrel->counters->lpdead_item_pages < threshold && (TidStoreMemoryUsage(vacrel->dead_items) < (32L * 1024L * 1024L))); } @@ -2061,7 +2272,7 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) * place). */ Assert(vacrel->num_index_scans > 0 || - vacrel->dead_items_info->num_items == vacrel->lpdead_items); + vacrel->dead_items_info->num_items == vacrel->counters->lpdead_items); Assert(allindexes || VacuumFailsafeActive); /* @@ -2165,8 +2376,8 @@ lazy_vacuum_heap_rel(LVRelState *vacrel) * the second heap pass. No more, no less. */ Assert(vacrel->num_index_scans > 1 || - (vacrel->dead_items_info->num_items == vacrel->lpdead_items && - vacuumed_pages == vacrel->lpdead_item_pages)); + (vacrel->dead_items_info->num_items == vacrel->counters->lpdead_items && + vacuumed_pages == vacrel->counters->lpdead_item_pages)); ereport(DEBUG2, (errmsg("table \"%s\": removed %lld dead item identifiers in %u pages", @@ -2347,7 +2558,7 @@ static void lazy_cleanup_all_indexes(LVRelState *vacrel) { double reltuples = vacrel->new_rel_tuples; - bool estimated_count = vacrel->scanned_pages < vacrel->rel_pages; + bool estimated_count = vacrel->counters->scanned_pages < vacrel->rel_pages; const int progress_start_index[] = { PROGRESS_VACUUM_PHASE, PROGRESS_VACUUM_INDEXES_TOTAL @@ -2528,7 +2739,7 @@ should_attempt_truncation(LVRelState *vacrel) if (!vacrel->do_rel_truncate || VacuumFailsafeActive) return false; - possibly_freeable = vacrel->rel_pages - vacrel->nonempty_pages; + possibly_freeable = vacrel->rel_pages - vacrel->counters->nonempty_pages; if (possibly_freeable > 0 && (possibly_freeable >= REL_TRUNCATE_MINIMUM || possibly_freeable >= vacrel->rel_pages / REL_TRUNCATE_FRACTION)) @@ -2554,7 +2765,7 @@ lazy_truncate_heap(LVRelState *vacrel) /* Update error traceback information one last time */ update_vacuum_error_info(vacrel, NULL, VACUUM_ERRCB_PHASE_TRUNCATE, - vacrel->nonempty_pages, InvalidOffsetNumber); + vacrel->counters->nonempty_pages, InvalidOffsetNumber); /* * Loop until no more truncating can be done. @@ -2655,7 +2866,7 @@ lazy_truncate_heap(LVRelState *vacrel) * without also touching reltuples, since the tuple count wasn't * changed by the truncation. */ - vacrel->removed_pages += orig_rel_pages - new_rel_pages; + vacrel->counters->removed_pages += orig_rel_pages - new_rel_pages; vacrel->rel_pages = new_rel_pages; ereport(vacrel->verbose ? INFO : DEBUG2, @@ -2663,7 +2874,7 @@ lazy_truncate_heap(LVRelState *vacrel) vacrel->relname, orig_rel_pages, new_rel_pages))); orig_rel_pages = new_rel_pages; - } while (new_rel_pages > vacrel->nonempty_pages && lock_waiter_detected); + } while (new_rel_pages > vacrel->counters->nonempty_pages && lock_waiter_detected); } /* @@ -2691,7 +2902,7 @@ count_nondeletable_pages(LVRelState *vacrel, bool *lock_waiter_detected) StaticAssertStmt((PREFETCH_SIZE & (PREFETCH_SIZE - 1)) == 0, "prefetch size must be power of 2"); prefetchedUntil = InvalidBlockNumber; - while (blkno > vacrel->nonempty_pages) + while (blkno > vacrel->counters->nonempty_pages) { Buffer buf; Page page; @@ -2803,7 +3014,7 @@ count_nondeletable_pages(LVRelState *vacrel, bool *lock_waiter_detected) * pages still are; we need not bother to look at the last known-nonempty * page. */ - return vacrel->nonempty_pages; + return vacrel->counters->nonempty_pages; } /* @@ -2821,12 +3032,8 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) autovacuum_work_mem != -1 ? autovacuum_work_mem : maintenance_work_mem; - /* - * Initialize state for a parallel vacuum. As of now, only one worker can - * be used for an index, so we invoke parallelism only if there are at - * least two indexes on a table. - */ - if (nworkers >= 0 && vacrel->nindexes > 1 && vacrel->do_index_vacuuming) + /* Initialize state for a parallel vacuum */ + if (nworkers >= 0) { /* * Since parallel workers cannot access data in temporary tables, we @@ -2844,11 +3051,18 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) vacrel->relname))); } else + { + /* + * For parallel index vacuuming, only one worker can be used for an + * index, we invoke parallelism only if there are at least two indexes + * on a table. + */ vacrel->pvs = parallel_vacuum_init(vacrel->rel, vacrel->indrels, vacrel->nindexes, nworkers, vac_work_mem, vacrel->verbose ? INFO : DEBUG2, - vacrel->bstrategy); + vacrel->bstrategy, (void *) vacrel); + } /* * If parallel mode started, dead_items and dead_items_info spaces are @@ -2889,9 +3103,19 @@ dead_items_add(LVRelState *vacrel, BlockNumber blkno, OffsetNumber *offsets, }; int64 prog_val[2]; + /* + * Protect both dead_items and dead_items_info from concurrent updates + * in parallel heap scan cases. + */ + if (ParallelHeapVacuumIsActive(vacrel)) + TidStoreLockExclusive(dead_items); + TidStoreSetBlockOffsets(dead_items, blkno, offsets, num_offsets); vacrel->dead_items_info->num_items += num_offsets; + if (ParallelHeapVacuumIsActive(vacrel)) + TidStoreUnlock(dead_items); + /* update the progress information */ prog_val[0] = vacrel->dead_items_info->num_items; prog_val[1] = TidStoreMemoryUsage(dead_items); @@ -3093,6 +3317,359 @@ update_relstats_all_indexes(LVRelState *vacrel) } } +/* + * Compute the number of parallel workers for parallel vacuum heap scan. + * + * The calculation logic is borrowed from compute_parallel_worker(). + */ +int +heap_parallel_vacuum_compute_workers(Relation rel, int nrequested) +{ + int parallel_workers = 0; + int heap_parallel_threshold; + int heap_pages; + + if (nrequested == 0) + { + /* + * Select the number of workers based on the log of the size of + * the relation. This probably needs to be a good deal more + * sophisticated, but we need something here for now. Note that + * the upper limit of the min_parallel_table_scan_size GUC is + * chosen to prevent overflow here. + */ + heap_parallel_threshold = Max(min_parallel_table_scan_size, 1); + heap_pages = RelationGetNumberOfBlocks(rel); + while (heap_pages >= (BlockNumber) (heap_parallel_threshold * 3)) + { + parallel_workers++; + heap_parallel_threshold *= 3; + if (heap_parallel_threshold > INT_MAX / 3) + break; + } + } + else + parallel_workers = nrequested; + + return parallel_workers; +} + +/* + * Compute the amount of space we'll need in the parallel heap vacuum + * DSM, and inform pcxt->estimator about our needs. + * + * nworkers is the number of workers for the table vacuum. Note that it could + * be different than pcxt->nworkers since it is the maximum of number of + * workers for table vacuum and index vacuum. + */ +void +heap_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt, + int nworkers, void *state) +{ + Size size = 0; + LVRelState *vacrel = (LVRelState *) state; + + /* space for PHVShared */ + size = add_size(size, SizeOfPHVShared); + size = add_size(size, mul_size(sizeof(LVRelCounters), nworkers)); + vacrel->shared_len = size; + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* space for ParallelBlockTableScanDesc */ + vacrel->pscan_len = table_block_parallelscan_estimate(rel); + shm_toc_estimate_chunk(&pcxt->estimator, vacrel->pscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* space for per-worker scan state, PHVScanWorkerState */ + vacrel->pscanwork_len = mul_size(sizeof(PHVScanWorkerState), nworkers); + shm_toc_estimate_chunk(&pcxt->estimator, vacrel->pscanwork_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* + * Set up shared memory for parallel heap vacuum. + */ +void +heap_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt, + int nworkers, void *state) +{ + LVRelState *vacrel = (LVRelState *) state; + ParallelBlockTableScanDesc pscan; + PHVScanWorkerState *pscanwork; + PHVShared *shared; + PHVState *phvstate; + + phvstate = (PHVState *) palloc(sizeof(PHVState)); + + shared = shm_toc_allocate(pcxt->toc, vacrel->shared_len); + + /* Prepare the shared information */ + + MemSet(shared, 0, vacrel->shared_len); + shared->aggressive = vacrel->aggressive; + shared->skipwithvm = vacrel->skipwithvm; + shared->cutoffs = vacrel->cutoffs; + shared->NewRelfrozenXid = vacrel->counters->NewRelfrozenXid; + shared->NewRelminMxid = vacrel->counters->NewRelminMxid; + shared->skippedallvis = vacrel->counters->skippedallvis; + + /* + * XXX: we copy the contents of vistest to the shared area, but in order to do + * that, we need to either expose GlobalVisTest or to provide functions to copy + * contents of GlobalVisTest to somewhere. Currently we do the former but not + * sure it's the best choice. + * + * Alternative idea is to have each worker determine cutoff and have their own + * vistest. But we need to carefully consider it since parallel workers end up + * having different cutoff and horizon. + */ + shared->vistest = *vacrel->vistest; + + shm_toc_insert(pcxt->toc, LV_PARALLEL_SCAN_SHARED, shared); + + phvstate->shared = shared; + + /* prepare the parallel block table scan description */ + pscan = shm_toc_allocate(pcxt->toc, vacrel->pscan_len); + shm_toc_insert(pcxt->toc, LV_PARALLEL_SCAN_DESC, pscan); + + /* initialize parallel scan description */ + table_block_parallelscan_initialize(rel, (ParallelTableScanDesc) pscan); + phvstate->pscandesc = pscan; + + /* prepare the workers' parallel block table scan state */ + pscanwork = shm_toc_allocate(pcxt->toc, vacrel->pscanwork_len); + MemSet(pscanwork, 0, vacrel->pscanwork_len); + shm_toc_insert(pcxt->toc, LV_PARALLEL_SCAN_DESC_WORKER, pscanwork); + phvstate->scanstates = pscanwork; + + vacrel->phvstate = phvstate; +} + +/* + * Main function for parallel heap vacuum workers. + */ +void +heap_parallel_vacuum_scan_worker(Relation rel, ParallelVacuumState *pvs, + ParallelWorkerContext *pwcxt) +{ + LVRelState vacrel = {0}; + PHVState *phvstate; + PHVShared *shared; + ParallelBlockTableScanDesc pscandesc; + PHVScanWorkerState *scanstate; + LVRelCounters *counters; + bool scan_done; + + phvstate = palloc(sizeof(PHVState)); + + pscandesc = (ParallelBlockTableScanDesc) shm_toc_lookup(pwcxt->toc, + LV_PARALLEL_SCAN_DESC, + false); + phvstate->pscandesc = pscandesc; + + shared = (PHVShared *) shm_toc_lookup(pwcxt->toc, LV_PARALLEL_SCAN_SHARED, + false); + phvstate->shared = shared; + + scanstate = (PHVScanWorkerState *) shm_toc_lookup(pwcxt->toc, + LV_PARALLEL_SCAN_DESC_WORKER, + false); + + phvstate->myscanstate = &(scanstate[ParallelWorkerNumber]); + counters = &(shared->worker_relcnts[ParallelWorkerNumber]); + + /* Prepare LVRelState */ + vacrel.rel = rel; + vacrel.indrels = parallel_vacuum_get_table_indexes(pvs, &vacrel.nindexes); + vacrel.pvs = pvs; + vacrel.phvstate = phvstate; + vacrel.aggressive = shared->aggressive; + vacrel.skipwithvm = shared->skipwithvm; + vacrel.cutoffs = shared->cutoffs; + vacrel.vistest = &(shared->vistest); + vacrel.dead_items = parallel_vacuum_get_dead_items(pvs, + &vacrel.dead_items_info); + vacrel.rel_pages = RelationGetNumberOfBlocks(rel); + vacrel.counters = counters; + + /* initialize per-worker relation statistics */ + MemSet(counters, 0, sizeof(LVRelCounters)); + + vacrel.counters->NewRelfrozenXid = shared->NewRelfrozenXid; + vacrel.counters->NewRelminMxid = shared->NewRelminMxid; + vacrel.counters->skippedallvis = shared->skippedallvis; + + /* + * XXX: the following fields are not set yet: + * - index vacuum related fields such as consider_bypass_optimization, + * do_index_vacuuming etc. + * - error reporting state. + * - statistics such as scanned_pages etc. + * - oldest extant XID/MXID. + * - states maintained by heap_vac_scan_next_block() + */ + + /* Initialize the start block if not yet */ + if (!phvstate->myscanstate->maybe_have_blocks) + { + table_block_parallelscan_startblock_init(rel, + &(phvstate->myscanstate->state), + phvstate->pscandesc); + + phvstate->myscanstate->maybe_have_blocks = false; + } + + /* + * XXX: if we want to support parallel heap *vacuum*, we need to allow + * workers to call different function based on the shared information. + */ + scan_done = do_lazy_scan_heap(&vacrel); + + phvstate->myscanstate->maybe_have_blocks = !scan_done; +} + +/* + * Complete parallel heaps scans that have remaining blocks in their + * chunks. + */ +static void +parallel_heap_complete_unfinised_scan(LVRelState *vacrel) +{ + int nworkers; + + Assert(!IsParallelWorker()); + + nworkers = parallel_vacuum_get_nworkers_table(vacrel->pvs); + + for (int i = 0; i < nworkers; i++) + { + PHVScanWorkerState *wstate = &(vacrel->phvstate->scanstates[i]); + bool scan_done PG_USED_FOR_ASSERTS_ONLY; + + if (!wstate->maybe_have_blocks) + continue; + + vacrel->phvstate->myscanstate = wstate; + + scan_done = do_lazy_scan_heap(vacrel); + + Assert(scan_done); + } +} + +/* + * Accumulate relation counters that parallel workers collected into the + * leader's counters. + */ +static void +parallel_heap_vacuum_gather_scan_stats(LVRelState *vacrel) +{ + PHVState *phvstate = vacrel->phvstate; + + Assert(ParallelHeapVacuumIsActive(vacrel)); + + for (int i = 0; i < phvstate->nworkers_launched; i++) + { + LVRelCounters *counters = &(phvstate->shared->worker_relcnts[i]); + +#define LV_ACCUM_ITEM(item) (vacrel)->counters->item += (counters)->item + + LV_ACCUM_ITEM(scanned_pages); + LV_ACCUM_ITEM(removed_pages); + LV_ACCUM_ITEM(frozen_pages); + LV_ACCUM_ITEM(lpdead_item_pages); + LV_ACCUM_ITEM(missed_dead_pages); + LV_ACCUM_ITEM(nonempty_pages); + LV_ACCUM_ITEM(tuples_deleted); + LV_ACCUM_ITEM(tuples_frozen); + LV_ACCUM_ITEM(lpdead_items); + LV_ACCUM_ITEM(live_tuples); + LV_ACCUM_ITEM(recently_dead_tuples); + LV_ACCUM_ITEM(missed_dead_tuples); + +#undef LV_ACCUM_ITEM + + if (TransactionIdPrecedes(counters->NewRelfrozenXid, vacrel->counters->NewRelfrozenXid)) + vacrel->counters->NewRelfrozenXid = counters->NewRelfrozenXid; + + if (MultiXactIdPrecedesOrEquals(counters->NewRelminMxid, vacrel->counters->NewRelminMxid)) + vacrel->counters->NewRelminMxid = counters->NewRelminMxid; + + if (!vacrel->counters->skippedallvis && counters->skippedallvis) + vacrel->counters->skippedallvis = true; + } +} + +/* + * A parallel variant of do_lazy_scan_heap(). The leader process launches parallel + * workers to scan the heap in parallel. + */ +static void +do_parallel_lazy_scan_heap(LVRelState *vacrel) +{ + PHVScanWorkerState *scanstate; + TidStore *dead_items = vacrel->dead_items; + VacDeadItemsInfo *dead_items_info = vacrel->dead_items_info; + + Assert(ParallelHeapVacuumIsActive(vacrel)); + Assert(!IsParallelWorker()); + + /* launcher workers */ + vacrel->phvstate->nworkers_launched = parallel_vacuum_table_scan_begin(vacrel->pvs); + + /* initialize parallel scan description to join as a worker */ + scanstate = palloc(sizeof(PHVScanWorkerState)); + table_block_parallelscan_startblock_init(vacrel->rel, &(scanstate->state), + vacrel->phvstate->pscandesc); + vacrel->phvstate->myscanstate = scanstate; + + for (;;) + { + bool scan_done PG_USED_FOR_ASSERTS_ONLY; + + /* + * Scan the table until either we are close to overrunning the available + * space for dead_items TIDs or we reach the end of the table. + */ + scan_done = do_lazy_scan_heap(vacrel); + + /* stop parallel workers and gather the collected stats */ + parallel_vacuum_table_scan_end(vacrel->pvs); + parallel_heap_vacuum_gather_scan_stats(vacrel); + + /* + * Consider if we definitely have enough space to process TIDs on page + * already. If we are close to overrunning the available space for + * dead_items TIDs, pause and do a cycle of vacuuming before we tackle + * this page. + */ + if (TidStoreMemoryUsage(dead_items) > dead_items_info->max_bytes) + { + /* Perform a round of index and heap vacuuming */ + vacrel->consider_bypass_optimization = false; + lazy_vacuum(vacrel); + + /* Report that we are once again scanning the heap */ + pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_PHASE_SCAN_HEAP); + + /* re-launcher workers */ + vacrel->phvstate->nworkers_launched = + parallel_vacuum_table_scan_begin(vacrel->pvs); + + continue; + } + + /* We reach the end of the table */ + Assert(scan_done); + break; + } + + parallel_heap_complete_unfinised_scan(vacrel); +} + /* * Error context callback for errors occurring during vacuum. The error * context messages for index phases should match the messages set in parallel diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index f26070bff2..968addf94f 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -28,6 +28,7 @@ #include "access/amapi.h" #include "access/table.h" +#include "access/tableam.h" #include "access/xact.h" #include "commands/progress.h" #include "commands/vacuum.h" @@ -64,6 +65,12 @@ typedef struct PVShared Oid relid; int elevel; + /* + * True if the caller wants parallel workers to invoke vacuum table scan + * callback. + */ + bool do_vacuum_table_scan; + /* * Fields for both index vacuum and cleanup. * @@ -163,6 +170,9 @@ struct ParallelVacuumState /* NULL for worker processes */ ParallelContext *pcxt; + /* Passed to parallel table scan workers. NULL for leader process */ + ParallelWorkerContext *pwcxt; + /* Parent Heap Relation */ Relation heaprel; @@ -192,6 +202,16 @@ struct ParallelVacuumState /* Points to WAL usage area in DSM */ WalUsage *wal_usage; + /* + * The number of workers for parallel table scan/vacuuming and index vacuuming, + * respectively. + */ + int nworkers_for_table; + int nworkers_for_index; + + /* How many parallel table vacuum scan is called? */ + int num_table_scans; + /* * False if the index is totally unsuitable target for all parallel * processing. For example, the index could be < @@ -220,8 +240,9 @@ struct ParallelVacuumState PVIndVacStatus status; }; -static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, - bool *will_parallel_vacuum); +static void parallel_vacuum_compute_workers(Relation rel, Relation *indrels, int nindexes, + int nrequested, int *nworkers_table, + int *nworkers_index, bool *will_parallel_vacuum); static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool vacuum); static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs); @@ -241,7 +262,7 @@ static void parallel_vacuum_error_callback(void *arg); ParallelVacuumState * parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int vac_work_mem, - int elevel, BufferAccessStrategy bstrategy) + int elevel, BufferAccessStrategy bstrategy, void *state) { ParallelVacuumState *pvs; ParallelContext *pcxt; @@ -255,6 +276,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, Size est_shared_len; int nindexes_mwm = 0; int parallel_workers = 0; + int nworkers_table; + int nworkers_index; int querylen; /* @@ -262,15 +285,17 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, * relation */ Assert(nrequested_workers >= 0); - Assert(nindexes > 0); /* * Compute the number of parallel vacuum workers to launch */ will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes); - parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes, - nrequested_workers, - will_parallel_vacuum); + parallel_vacuum_compute_workers(rel, indrels, nindexes, nrequested_workers, + &nworkers_table, &nworkers_index, + will_parallel_vacuum); + + parallel_workers = Max(nworkers_table, nworkers_index); + if (parallel_workers <= 0) { /* Can't perform vacuum in parallel -- return NULL */ @@ -284,6 +309,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, pvs->will_parallel_vacuum = will_parallel_vacuum; pvs->bstrategy = bstrategy; pvs->heaprel = rel; + pvs->nworkers_for_table = nworkers_table; + pvs->nworkers_for_index = nworkers_index; EnterParallelMode(); pcxt = CreateParallelContext("postgres", "parallel_vacuum_main", @@ -326,6 +353,10 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, else querylen = 0; /* keep compiler quiet */ + /* Estimate AM-specific space for parallel table vacuum */ + if (nworkers_table > 0) + table_parallel_vacuum_estimate(rel, pcxt, nworkers_table, state); + InitializeParallelDSM(pcxt); /* Prepare index vacuum stats */ @@ -417,6 +448,10 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery); } + /* Prepare AM-specific DSM for parallel table vacuum */ + if (nworkers_table > 0) + table_parallel_vacuum_initialize(rel, pcxt, nworkers_table, state); + /* Success -- return parallel vacuum state */ return pvs; } @@ -538,27 +573,41 @@ parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tup * min_parallel_index_scan_size as invoking workers for very small indexes * can hurt performance. * + * XXX needs to mention about the number of workers for table. + * * nrequested is the number of parallel workers that user requested. If * nrequested is 0, we compute the parallel degree based on nindexes, that is * the number of indexes that support parallel vacuum. This function also * sets will_parallel_vacuum to remember indexes that participate in parallel * vacuum. */ -static int -parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, - bool *will_parallel_vacuum) +static void +parallel_vacuum_compute_workers(Relation rel, Relation *indrels, int nindexes, + int nrequested, int *nworkers_table, + int *nworkers_index, bool *will_parallel_vacuum) { int nindexes_parallel = 0; int nindexes_parallel_bulkdel = 0; int nindexes_parallel_cleanup = 0; - int parallel_workers; + int parallel_workers_table = 0; + int parallel_workers_index = 0; + + *nworkers_table = 0; + *nworkers_index = 0; /* * We don't allow performing parallel operation in standalone backend or * when parallelism is disabled. */ if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0) - return 0; + return; + + /* + * Compute the number of workers for parallel table scan. Cap by + * max_parallel_maintenance_workers. + */ + parallel_workers_table = Min(table_paralle_vacuum_compute_workers(rel, nrequested), + max_parallel_maintenance_workers); /* * Compute the number of indexes that can participate in parallel vacuum. @@ -589,17 +638,18 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, nindexes_parallel--; /* No index supports parallel vacuum */ - if (nindexes_parallel <= 0) - return 0; - - /* Compute the parallel degree */ - parallel_workers = (nrequested > 0) ? - Min(nrequested, nindexes_parallel) : nindexes_parallel; + if (nindexes_parallel > 0) + { + /* Compute the parallel degree for parallel index vacuum */ + parallel_workers_index = (nrequested > 0) ? + Min(nrequested, nindexes_parallel) : nindexes_parallel; - /* Cap by max_parallel_maintenance_workers */ - parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers); + /* Cap by max_parallel_maintenance_workers */ + parallel_workers_index = Min(parallel_workers_index, max_parallel_maintenance_workers); + } - return parallel_workers; + *nworkers_table = parallel_workers_table; + *nworkers_index = parallel_workers_index; } /* @@ -669,7 +719,7 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan if (nworkers > 0) { /* Reinitialize parallel context to relaunch parallel workers */ - if (num_index_scans > 0) + if (num_index_scans > 0 || pvs->num_table_scans > 0) ReinitializeParallelDSM(pvs->pcxt); /* @@ -978,6 +1028,120 @@ parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, return true; } +/* + * A parallel worker invokes table-AM specified vacuum scan callback. + */ +static void +parallel_vacuum_process_table(ParallelVacuumState *pvs) +{ + /* + * Increment the active worker count if we are able to launch any worker. + */ + if (VacuumActiveNWorkers) + pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); + + /* Do table vacuum scan */ + table_parallel_vacuum_scan(pvs->heaprel, pvs, pvs->pwcxt); + + /* + * We have completed the table vacuum so decrement the active worker + * count. + */ + if (VacuumActiveNWorkers) + pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); +} + +/* + * Prepare DSM and vacuum delay, and launch parallel workers for parallel + * table vacuum scan. + */ +int +parallel_vacuum_table_scan_begin(ParallelVacuumState *pvs) +{ + Assert(!IsParallelWorker()); + + if (pvs->nworkers_for_table == 0) + return 0; + + pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance); + pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0); + + pvs->shared->do_vacuum_table_scan = true; + + if (pvs->num_table_scans > 0) + ReinitializeParallelDSM(pvs->pcxt); + + ReinitializeParallelWorkers(pvs->pcxt, pvs->nworkers_for_table); + + LaunchParallelWorkers(pvs->pcxt); + + if (pvs->pcxt->nworkers_launched > 0) + { + /* + * Reset the local cost values for leader backend as we have + * already accumulated the remaining balance of heap. + */ + VacuumCostBalance = 0; + VacuumCostBalanceLocal = 0; + + /* Enable shared cost balance for leader backend */ + VacuumSharedCostBalance = &(pvs->shared->cost_balance); + VacuumActiveNWorkers = &(pvs->shared->active_nworkers); + } + + ereport(pvs->shared->elevel, + (errmsg(ngettext("launched %d parallel vacuum worker for table scanning (planned: %d)", + "launched %d parallel vacuum workers for table scanning (planned: %d)", + pvs->pcxt->nworkers_launched), + pvs->pcxt->nworkers_launched, pvs->nworkers_for_table))); + + return pvs->pcxt->nworkers_launched; +} + +/* + * Wait for all workers for parallel table vacuum scan, and gather statistics. + */ +void +parallel_vacuum_table_scan_end(ParallelVacuumState *pvs) +{ + Assert(!IsParallelWorker()); + + if (pvs->nworkers_for_table == 0) + return; + + WaitForParallelWorkersToFinish(pvs->pcxt); + + for (int i = 0; i < pvs->pcxt->nworkers_launched; i++) + InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]); + + /* + * Carry the shared balance value to heap scan and disable shared costing + */ + if (VacuumSharedCostBalance) + { + VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance); + VacuumSharedCostBalance = NULL; + VacuumActiveNWorkers = NULL; + } + + pvs->shared->do_vacuum_table_scan = false; + pvs->num_table_scans++; +} + +Relation * +parallel_vacuum_get_table_indexes(ParallelVacuumState *pvs, int *nindexes) +{ + *nindexes = pvs->nindexes; + + return pvs->indrels; +} + +int +parallel_vacuum_get_nworkers_table(ParallelVacuumState *pvs) +{ + return pvs->nworkers_for_table; +} + /* * Perform work within a launched parallel process. * @@ -1026,7 +1190,6 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) * matched to the leader's one. */ vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels); - Assert(nindexes > 0); if (shared->maintenance_work_mem_worker > 0) maintenance_work_mem = shared->maintenance_work_mem_worker; @@ -1060,6 +1223,10 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) pvs.relname = pstrdup(RelationGetRelationName(rel)); pvs.heaprel = rel; + pvs.pwcxt = palloc(sizeof(ParallelWorkerContext)); + pvs.pwcxt->toc = toc; + pvs.pwcxt->seg = seg; + /* These fields will be filled during index vacuum or cleanup */ pvs.indname = NULL; pvs.status = PARALLEL_INDVAC_STATUS_INITIAL; @@ -1077,8 +1244,15 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) /* Prepare to track buffer usage during parallel execution */ InstrStartParallelQuery(); - /* Process indexes to perform vacuum/cleanup */ - parallel_vacuum_process_safe_indexes(&pvs); + if (pvs.shared->do_vacuum_table_scan) + { + parallel_vacuum_process_table(&pvs); + } + else + { + /* Process indexes to perform vacuum/cleanup */ + parallel_vacuum_process_safe_indexes(&pvs); + } /* Report buffer/WAL usage during parallel execution */ buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index d5165aa0d9..37035cc186 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -164,15 +164,6 @@ typedef struct ProcArrayStruct * * The typedef is in the header. */ -struct GlobalVisState -{ - /* XIDs >= are considered running by some backend */ - FullTransactionId definitely_needed; - - /* XIDs < are not considered to be running by any backend */ - FullTransactionId maybe_needed; -}; - /* * Result of ComputeXidHorizons(). */ diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 9e9aec88a6..6c5e48e478 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -20,6 +20,7 @@ #include "access/skey.h" #include "access/table.h" /* for backward compatibility */ #include "access/tableam.h" +#include "commands/vacuum.h" #include "nodes/lockoptions.h" #include "nodes/primnodes.h" #include "storage/bufpage.h" @@ -393,6 +394,13 @@ extern void log_heap_prune_and_freeze(Relation relation, Buffer buffer, struct VacuumParams; extern void heap_vacuum_rel(Relation rel, struct VacuumParams *params, BufferAccessStrategy bstrategy); +extern int heap_parallel_vacuum_compute_workers(Relation rel, int requested); +extern void heap_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt, + int nworkers, void *state); +extern void heap_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt, + int nworkers, void *state); +extern void heap_parallel_vacuum_scan_worker(Relation rel, ParallelVacuumState *pvs, + ParallelWorkerContext *pwcxt); /* in heap/heapam_visibility.c */ extern bool HeapTupleSatisfiesVisibility(HeapTuple htup, Snapshot snapshot, diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 8e583b45cd..b10b047ca1 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -20,6 +20,7 @@ #include "access/relscan.h" #include "access/sdir.h" #include "access/xact.h" +#include "commands/vacuum.h" #include "executor/tuptable.h" #include "storage/read_stream.h" #include "utils/rel.h" @@ -655,6 +656,46 @@ typedef struct TableAmRoutine struct VacuumParams *params, BufferAccessStrategy bstrategy); + /* ------------------------------------------------------------------------ + * Callbacks for parallel table vacuum. + * ------------------------------------------------------------------------ + */ + + /* + * Compute the number of parallel workers for parallel table vacuum. + * The function must return 0 to disable parallel table vacuum. + */ + int (*parallel_vacuum_compute_workers) (Relation rel, int requested); + + /* + * Compute the amount of DSM space AM need in the parallel table vacuum. + * + * Not called if parallel table vacuum is disabled. + */ + void (*parallel_vacuum_estimate) (Relation rel, + ParallelContext *pcxt, + int nworkers, + void *state); + + /* + * Initialize DSM space for parallel table vacuum. + * + * Not called if parallel table vacuum is disabled. + */ + void (*parallel_vacuum_initialize) (Relation rel, + ParallelContext *pctx, + int nworkers, + void *state); + + /* + * This callback is called for parallel table vacuum workers. + * + * Not called if parallel table vacuum is disabled. + */ + void (*parallel_vacuum_scan_worker) (Relation rel, + ParallelVacuumState *pvs, + ParallelWorkerContext *pwcxt); + /* * Prepare to analyze the next block in the read stream. Returns false if * the stream is exhausted and true otherwise. The scan must have been @@ -1720,6 +1761,33 @@ table_relation_vacuum(Relation rel, struct VacuumParams *params, rel->rd_tableam->relation_vacuum(rel, params, bstrategy); } +static inline int +table_paralle_vacuum_compute_workers(Relation rel, int requested) +{ + return rel->rd_tableam->parallel_vacuum_compute_workers(rel, requested); +} + +static inline void +table_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt, int nworkers, + void *state) +{ + rel->rd_tableam->parallel_vacuum_estimate(rel, pcxt, nworkers, state); +} + +static inline void +table_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt, int nworkers, + void *state) +{ + rel->rd_tableam->parallel_vacuum_initialize(rel, pcxt, nworkers, state); +} + +static inline void +table_parallel_vacuum_scan(Relation rel, ParallelVacuumState *pvs, + ParallelWorkerContext *pwcxt) +{ + rel->rd_tableam->parallel_vacuum_scan_worker(rel, pvs, pwcxt); +} + /* * Prepare to analyze the next block in the read stream. The scan needs to * have been started with table_beginscan_analyze(). Note that this routine diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 759f9a87d3..598bb5218f 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -360,7 +360,8 @@ extern void VacuumUpdateCosts(void); extern ParallelVacuumState *parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int vac_work_mem, int elevel, - BufferAccessStrategy bstrategy); + BufferAccessStrategy bstrategy, + void *state); extern void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats); extern TidStore *parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p); @@ -372,6 +373,10 @@ extern void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, bool estimated_count); +extern int parallel_vacuum_table_scan_begin(ParallelVacuumState *pvs); +extern void parallel_vacuum_table_scan_end(ParallelVacuumState *pvs); +extern int parallel_vacuum_get_nworkers_table(ParallelVacuumState *pvs); +extern Relation *parallel_vacuum_get_table_indexes(ParallelVacuumState *pvs, int *nindexes); extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc); /* in commands/analyze.c */ diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index 9398a84051..6ccb19a29f 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -102,8 +102,20 @@ extern char *ExportSnapshot(Snapshot snapshot); /* * These live in procarray.c because they're intimately linked to the * procarray contents, but thematically they better fit into snapmgr.h. + * + * XXX the struct definition is temporarily moved from procarray.c for + * parallel table vacuum development. We need to find a suitable way for + * parallel table vacuum workers to share the GlobalVisState. */ -typedef struct GlobalVisState GlobalVisState; +typedef struct GlobalVisState +{ + /* XIDs >= are considered running by some backend */ + FullTransactionId definitely_needed; + + /* XIDs < are not considered to be running by any backend */ + FullTransactionId maybe_needed; +} GlobalVisState; + extern GlobalVisState *GlobalVisTestFor(Relation rel); extern bool GlobalVisTestIsRemovableXid(GlobalVisState *state, TransactionId xid); extern bool GlobalVisTestIsRemovableFullXid(GlobalVisState *state, FullTransactionId fxid);