Hi all, I'd like to propose block level parallel VACUUM. This feature makes VACUUM possible to use multiple CPU cores.
Vacuum Processing Logic =================== PostgreSQL VACUUM processing logic consists of 2 phases, 1. Collecting dead tuple locations on heap. 2. Reclaiming dead tuples from heap and indexes. These phases 1 and 2 are executed alternately, and once amount of dead tuple location reached maintenance_work_mem in phase 1, phase 2 will be executed. Basic Design ========== As for PoC, I implemented parallel vacuum so that each worker processes both 1 and 2 phases for particular block range. Suppose we vacuum 1000 blocks table with 4 workers, each worker processes 250 consecutive blocks in phase 1 and then reclaims dead tuples from heap and indexes (phase 2). To use visibility map efficiency, each worker scan particular block range of relation and collect dead tuple locations. After each worker finished task, the leader process gathers these vacuum statistics information and update relfrozenxid if possible. I also changed the buffer lock infrastructure so that multiple processes can wait for cleanup lock on a buffer. And the new GUC parameter vacuum_parallel_workers controls the number of vacuum workers. Performance(PoC) ========= I ran parallel vacuum on 13GB table (pgbench scale 1000) with several workers (on my poor virtual machine). The result is, 1. Vacuum whole table without index (disable page skipping) 1 worker : 33 sec 2 workers : 27 sec 3 workers : 23 sec 4 workers : 22 sec 2. Vacuum table and index (after 10000 transaction executed) 1 worker : 12 sec 2 workers : 49 sec 3 workers : 54 sec 4 workers : 53 sec As a result of my test, since multiple process could frequently try to acquire the cleanup lock on same index buffer, execution time of parallel vacuum got worse. And it seems to be effective for only table vacuum so far, but is not improved as expected (maybe disk bottleneck). Another Design ============ ISTM that processing index vacuum by multiple process is not good idea in most cases because many index items can be stored in a page and multiple vacuum worker could try to require the cleanup lock on the same index buffer. It's rather better that multiple workers process particular block range and then multiple workers process each particular block range, and then one worker per index processes index vacuum. Still lots of work to do but attached PoC patch. Feedback and suggestion are very welcome. Regards, -- Masahiko Sawada
From b25d491a05a43fb7adf014b2580c71ec7adb75a2 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <sawada.m...@gmail.com> Date: Mon, 8 Aug 2016 16:43:35 -0700 Subject: [PATCH 1/2] Allow muliple backends to wait for cleanup lock. --- src/backend/storage/buffer/buf_init.c | 3 +- src/backend/storage/buffer/bufmgr.c | 57 +++++++++++++++++++++++------------ src/include/storage/buf_internals.h | 4 ++- src/include/storage/proc.h | 2 ++ 4 files changed, 45 insertions(+), 21 deletions(-) diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c index a4163cf..2aad030 100644 --- a/src/backend/storage/buffer/buf_init.c +++ b/src/backend/storage/buffer/buf_init.c @@ -134,7 +134,8 @@ InitBufferPool(void) CLEAR_BUFFERTAG(buf->tag); pg_atomic_init_u32(&buf->state, 0); - buf->wait_backend_pid = 0; + dlist_init(&buf->pin_count_waiters); + pg_atomic_write_u32(&buf->nwaiters, 0); buf->buf_id = i; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 76ade37..f2f4ab9 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -38,6 +38,7 @@ #include "catalog/storage.h" #include "executor/instrument.h" #include "lib/binaryheap.h" +#include "lib/ilist.h" #include "miscadmin.h" #include "pg_trace.h" #include "pgstat.h" @@ -1730,15 +1731,19 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner) */ buf_state = LockBufHdr(buf); - if ((buf_state & BM_PIN_COUNT_WAITER) && - BUF_STATE_GET_REFCOUNT(buf_state) == 1) + if (buf_state & BM_PIN_COUNT_WAITER) { - /* we just released the last pin other than the waiter's */ - int wait_backend_pid = buf->wait_backend_pid; + dlist_mutable_iter iter; - buf_state &= ~BM_PIN_COUNT_WAITER; + if (pg_atomic_read_u32(&buf->nwaiters) == 1) + buf_state &= ~BM_PIN_COUNT_WAITER; + + dlist_foreach_modify(iter, &buf->pin_count_waiters) + { + PGPROC *waiter = dlist_container(PGPROC, clWaitLink, iter.cur); + ProcSendSignal(waiter->pid); + } UnlockBufHdr(buf, buf_state); - ProcSendSignal(wait_backend_pid); } else UnlockBufHdr(buf, buf_state); @@ -3513,8 +3518,17 @@ UnlockBuffers(void) * got a cancel/die interrupt before getting the signal. */ if ((buf_state & BM_PIN_COUNT_WAITER) != 0 && - buf->wait_backend_pid == MyProcPid) - buf_state &= ~BM_PIN_COUNT_WAITER; + pg_atomic_read_u32(&buf->nwaiters) == 1) + { + dlist_mutable_iter iter; + + dlist_foreach_modify(iter, &buf->pin_count_waiters) + { + PGPROC *waiter = dlist_container(PGPROC, clWaitLink, iter.cur); + if (waiter->pid == MyProcPid) + buf_state &= ~BM_PIN_COUNT_WAITER; + } + } UnlockBufHdr(buf, buf_state); @@ -3616,20 +3630,24 @@ LockBufferForCleanup(Buffer buffer) buf_state = LockBufHdr(bufHdr); Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); - if (BUF_STATE_GET_REFCOUNT(buf_state) == 1) + /* + * If refcount == 1 then we can break immediately. + * In case of refcount > 1, if refcount == (nwaiters + 1) then break. + * Because refcount include other processes and itself, but nwaiters + * includes only other processes. + */ + if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 || + ((BUF_STATE_GET_REFCOUNT(buf_state) - 1)== + pg_atomic_read_u32(&bufHdr->nwaiters))) { /* Successfully acquired exclusive lock with pincount 1 */ UnlockBufHdr(bufHdr, buf_state); return; } /* Failed, so mark myself as waiting for pincount 1 */ - if (buf_state & BM_PIN_COUNT_WAITER) - { - UnlockBufHdr(bufHdr, buf_state); - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - elog(ERROR, "multiple backends attempting to wait for pincount 1"); - } - bufHdr->wait_backend_pid = MyProcPid; + pg_atomic_fetch_add_u32(&bufHdr->nwaiters, 1); + dlist_push_tail(&bufHdr->pin_count_waiters, &MyProc->clWaitLink); + PinCountWaitBuf = bufHdr; buf_state |= BM_PIN_COUNT_WAITER; UnlockBufHdr(bufHdr, buf_state); @@ -3662,9 +3680,10 @@ LockBufferForCleanup(Buffer buffer) * better be safe. */ buf_state = LockBufHdr(bufHdr); - if ((buf_state & BM_PIN_COUNT_WAITER) != 0 && - bufHdr->wait_backend_pid == MyProcPid) - buf_state &= ~BM_PIN_COUNT_WAITER; + + dlist_delete(&MyProc->clWaitLink); + pg_atomic_fetch_sub_u32(&bufHdr->nwaiters, 1); + UnlockBufHdr(bufHdr, buf_state); PinCountWaitBuf = NULL; diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index e0dfb2f..90fcbd7 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -182,7 +182,9 @@ typedef struct BufferDesc /* state of the tag, containing flags, refcount and usagecount */ pg_atomic_uint32 state; - int wait_backend_pid; /* backend PID of pin-count waiter */ + dlist_head pin_count_waiters; /* backend PIDs of pin-count waiters */ + pg_atomic_uint32 nwaiters; + int freeNext; /* link in freelist chain */ LWLock content_lock; /* to lock access to buffer contents */ diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index f576f05..4cd9416 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -123,6 +123,8 @@ struct PGPROC LOCKMASK heldLocks; /* bitmask for lock types already held on this * lock object by this backend */ + dlist_node clWaitLink; /* position in Cleanup Lock wait list */ + /* * Info to allow us to wait for synchronous replication, if needed. * waitLSN is InvalidXLogRecPtr if not waiting; set only by user backend. -- 2.8.1
From 1955cb9f68f9c027c566c4909d7ead475cf20f3b Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <sawada.m...@gmail.com> Date: Mon, 8 Aug 2016 16:43:55 -0700 Subject: [PATCH 2/2] Block level parallel Vacuum. --- src/backend/access/nbtree/nbtutils.c | 19 --- src/backend/commands/vacuum.c | 1 + src/backend/commands/vacuumlazy.c | 239 ++++++++++++++++++++++++++++------- src/backend/utils/misc/guc.c | 10 ++ src/include/commands/vacuum.h | 41 +++++- src/include/storage/buf_internals.h | 1 + 6 files changed, 245 insertions(+), 66 deletions(-) diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c index 5d335c7..987aceb 100644 --- a/src/backend/access/nbtree/nbtutils.c +++ b/src/backend/access/nbtree/nbtutils.c @@ -1918,25 +1918,6 @@ _bt_start_vacuum(Relation rel) if (result == 0 || result > MAX_BT_CYCLE_ID) result = btvacinfo->cycle_ctr = 1; - /* Let's just make sure there's no entry already for this index */ - for (i = 0; i < btvacinfo->num_vacuums; i++) - { - vac = &btvacinfo->vacuums[i]; - if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId && - vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId) - { - /* - * Unlike most places in the backend, we have to explicitly - * release our LWLock before throwing an error. This is because - * we expect _bt_end_vacuum() to be called before transaction - * abort cleanup can run to release LWLocks. - */ - LWLockRelease(BtreeVacuumLock); - elog(ERROR, "multiple active vacuums for index \"%s\"", - RelationGetRelationName(rel)); - } - } - /* OK, add an entry */ if (btvacinfo->num_vacuums >= btvacinfo->max_vacuums) { diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 0563e63..1562773 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -58,6 +58,7 @@ int vacuum_freeze_min_age; int vacuum_freeze_table_age; int vacuum_multixact_freeze_min_age; int vacuum_multixact_freeze_table_age; +int parallel_vacuum_workers; /* A few variables that don't seem worth passing around as parameters */ diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c index 231e92d..4fc880d 100644 --- a/src/backend/commands/vacuumlazy.c +++ b/src/backend/commands/vacuumlazy.c @@ -42,8 +42,10 @@ #include "access/heapam_xlog.h" #include "access/htup_details.h" #include "access/multixact.h" +#include "access/parallel.h" #include "access/transam.h" #include "access/visibilitymap.h" +#include "access/xact.h" #include "access/xlog.h" #include "catalog/catalog.h" #include "catalog/storage.h" @@ -98,33 +100,9 @@ */ #define SKIP_PAGES_THRESHOLD ((BlockNumber) 32) -typedef struct LVRelStats -{ - /* hasindex = true means two-pass strategy; false means one-pass */ - bool hasindex; - /* Overall statistics about rel */ - BlockNumber old_rel_pages; /* previous value of pg_class.relpages */ - BlockNumber rel_pages; /* total number of pages */ - BlockNumber scanned_pages; /* number of pages we examined */ - BlockNumber pinskipped_pages; /* # of pages we skipped due to a pin */ - BlockNumber frozenskipped_pages; /* # of frozen pages we skipped */ - double scanned_tuples; /* counts only tuples on scanned pages */ - double old_rel_tuples; /* previous value of pg_class.reltuples */ - double new_rel_tuples; /* new estimated total # of tuples */ - double new_dead_tuples; /* new estimated total # of dead tuples */ - BlockNumber pages_removed; - double tuples_deleted; - BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */ - /* List of TIDs of tuples we intend to delete */ - /* NB: this list is ordered by TID address */ - int num_dead_tuples; /* current # of entries */ - int max_dead_tuples; /* # slots allocated in array */ - ItemPointer dead_tuples; /* array of ItemPointerData */ - int num_index_scans; - TransactionId latestRemovedXid; - bool lock_waiter_detected; -} LVRelStats; - +/* DSM key for block-level parallel vacuum */ +#define VACUUM_KEY_TASK 50 +#define VACUUM_KEY_WORKER_STATS 51 /* A few variables that don't seem worth passing around as parameters */ static int elevel = -1; @@ -137,9 +115,6 @@ static BufferAccessStrategy vac_strategy; /* non-export function prototypes */ -static void lazy_scan_heap(Relation onerel, int options, - LVRelStats *vacrelstats, Relation *Irel, int nindexes, - bool aggressive); static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats); static bool lazy_check_needs_freeze(Buffer buf, bool *hastup); static void lazy_vacuum_index(Relation indrel, @@ -162,6 +137,18 @@ static int vac_cmp_itemptr(const void *left, const void *right); static bool heap_page_is_all_visible(Relation rel, Buffer buf, TransactionId *visibility_cutoff_xid, bool *all_frozen); +/* functions for parallel vacuum */ +static void parallel_lazy_scan_heap(Relation rel, LVRelStats *vacrelstats, + Relation *Irel, int nindexes, int options, + bool aggressive, int wnum); +static void vacuum_worker(dsm_segment *seg, shm_toc *toc); +static void lazy_scan_heap(Relation onerel, int options, + LVRelStats *vacrelstats, Relation *Irel, + int nindexes, bool aggressive, + BlockNumber begin, BlockNumber nblocks); +static void gather_vacuum_stats(LVRelStats *valrelstats, LVRelStats *worker_stats, + int wnum); + /* * lazy_vacuum_rel() -- perform LAZY VACUUM for one heap relation @@ -248,8 +235,17 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params, vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel); vacrelstats->hasindex = (nindexes > 0); - /* Do the vacuuming */ - lazy_scan_heap(onerel, options, vacrelstats, Irel, nindexes, aggressive); + /* Do the parallel vacuuming. */ + if (parallel_vacuum_workers > 1) + parallel_lazy_scan_heap(onerel, vacrelstats, Irel, nindexes, options, + aggressive, parallel_vacuum_workers); + else + { + BlockNumber nblocks = RelationGetNumberOfBlocks(onerel); + + lazy_scan_heap(onerel, options, vacrelstats, Irel, nindexes, + aggressive, 0, nblocks); + } /* Done with indexes */ vac_close_indexes(nindexes, Irel, NoLock); @@ -428,7 +424,132 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats) } /* - * lazy_scan_heap() -- scan an open heap relation + * Launch parallel vacuum workers specified by vacuum_parallel_workers and then + * gather the result stats of each workers. The idea of vacuuming one relation + * with multiple workers parallely is that each worker is assigned particlar block + * range of relation which is calculated using by parallel_vacuum_workers and + * the number of relation blocks. The informations and some threshoulds (e.g. + * OldestXmin, FreezeLimit, MultiXactCufoff) are stored into DSM tagged by + * VACUUM_KEY_TASK. Each worker can collect the garbage tid and reclaims them as + * well. Vacuum statistics for each workers are stored into DSm tagged by + * VACUUM_KEY_WORKER_STATS, that will be gathered by the leader process after all + * worker finished its task. + */ +static void +parallel_lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats, + Relation *Irel, int nindexes, int options, + bool aggressive, int wnum) +{ + ParallelContext *pcxt; + LVRelStats *wstats_space; + VacuumTask *task_space; + IndexBulkDeleteResult **indstats; + int size = 0; + int i; + + EnterParallelMode(); + + /* Create parallel context and initialize it */ + pcxt = CreateParallelContext(vacuum_worker, wnum); + size += BUFFERALIGN(sizeof(VacuumTask)); /* For task */ + size += BUFFERALIGN(sizeof(LVRelStats) * pcxt->nworkers); /* For worker stats */ + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, 2); + InitializeParallelDSM(pcxt); + + /* Prepare for VacuumTask space */ + task_space = (VacuumTask *)shm_toc_allocate(pcxt->toc, sizeof(VacuumTask)); + shm_toc_insert(pcxt->toc, VACUUM_KEY_TASK, task_space); + task_space->relid = RelationGetRelid(onerel); + task_space->aggressive = aggressive; + task_space->options = options; + task_space->oldestxmin = OldestXmin; + task_space->freezelimit = FreezeLimit; + task_space->multixactcutoff = MultiXactCutoff; + task_space->wnum = wnum; + task_space->elevel = elevel; + + /* Prepare for worker LVRelStats space */ + wstats_space = (LVRelStats *)shm_toc_allocate(pcxt->toc, + sizeof(LVRelStats) * pcxt->nworkers); + shm_toc_insert(pcxt->toc, VACUUM_KEY_WORKER_STATS, wstats_space); + for (i = 0; i < pcxt->nworkers; i++) + { + LVRelStats *wstats = wstats_space + sizeof(LVRelStats) * i; + memcpy(wstats, vacrelstats, sizeof(LVRelStats)); + } + + /* Do parallel vacuum */ + LaunchParallelWorkers(pcxt); + + /* Wait for workers finising vacuuming */ + WaitForParallelWorkersToFinish(pcxt); + gather_vacuum_stats(vacrelstats, wstats_space, wnum); + + DestroyParallelContext(pcxt); + ExitParallelMode(); + + indstats = (IndexBulkDeleteResult **) + palloc0(nindexes * sizeof(IndexBulkDeleteResult *)); + + /* Do post-vacuum cleanup and statistics update for each index */ + for (i = 0; i < nindexes; i++) + lazy_cleanup_index(Irel[i], indstats[i], vacrelstats); +} + +/* + * Entry function for parallel vacuum worker. Each worker calculates the + * starting block number and number of blocks need to process, and then + * does vacuuming particular block range of relation. + */ +static void +vacuum_worker(dsm_segment *seg, shm_toc *toc) +{ + VacuumTask *task; + LVRelStats *wstats_space; + LVRelStats *wstats; + Relation rel; + BlockNumber begin; + BlockNumber nblocks_per_worker; + BlockNumber nblocks; + int nindexes; + Relation *Irel; + + /* Set up task information */ + task = (VacuumTask *)shm_toc_lookup(toc, VACUUM_KEY_TASK); + OldestXmin = task->oldestxmin; + FreezeLimit = task->freezelimit; + MultiXactCutoff = task->multixactcutoff; + + /* Set up message queue */ + wstats_space = (LVRelStats *)shm_toc_lookup(toc, VACUUM_KEY_WORKER_STATS); + wstats = wstats_space + sizeof(LVRelStats) * ParallelWorkerNumber; + + /* Calculate how many blocks the worker should process */ + rel = heap_open(task->relid, NoLock); + vac_open_indexes(rel, RowExclusiveLock, &nindexes, &Irel); + nblocks_per_worker = RelationGetNumberOfBlocks(rel) / parallel_vacuum_workers; + begin = nblocks_per_worker * ParallelWorkerNumber; + + /* The last worker processes remaining blocks */ + if (ParallelWorkerNumber == (task->wnum - 1)) + nblocks = RelationGetNumberOfBlocks(rel) - begin; + else + nblocks = nblocks_per_worker; + + /* Set up elevel */ + elevel = task->elevel; + + /* Do vacuuming particular area */ + lazy_scan_heap(rel, task->options, wstats, Irel, nindexes, + task->aggressive, begin, nblocks); + + heap_close(rel, NoLock); + vac_close_indexes(nindexes, Irel, NoLock); +} + +/* + * lazy_scan_heap() -- scan paritclar range of open heap relation * * This routine prunes each page in the heap, which will among other * things truncate dead tuples to dead line pointers, defragment the @@ -445,10 +566,10 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats) */ static void lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, - Relation *Irel, int nindexes, bool aggressive) + Relation *Irel, int nindexes, bool aggressive, + BlockNumber begin, BlockNumber nblocks) { - BlockNumber nblocks, - blkno; + BlockNumber blkno; HeapTupleData tuple; char *relname; BlockNumber empty_pages, @@ -471,14 +592,15 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, PROGRESS_VACUUM_MAX_DEAD_TUPLES }; int64 initprog_val[3]; + BlockNumber end = begin + nblocks; pg_rusage_init(&ru0); relname = RelationGetRelationName(onerel); ereport(elevel, - (errmsg("vacuuming \"%s.%s\"", + (errmsg("vacuuming \"%s.%s\", from block %u to %u, %u blocks", get_namespace_name(RelationGetNamespace(onerel)), - relname))); + relname, begin, end, nblocks))); empty_pages = vacuumed_pages = 0; num_tuples = tups_vacuumed = nkeep = nunused = 0; @@ -486,7 +608,6 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, indstats = (IndexBulkDeleteResult **) palloc0(nindexes * sizeof(IndexBulkDeleteResult *)); - nblocks = RelationGetNumberOfBlocks(onerel); vacrelstats->rel_pages = nblocks; vacrelstats->scanned_pages = 0; vacrelstats->nonempty_pages = 0; @@ -545,10 +666,10 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, * the last page. This is worth avoiding mainly because such a lock must * be replayed on any hot standby, where it can be disruptive. */ - next_unskippable_block = 0; + next_unskippable_block = begin; if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0) { - while (next_unskippable_block < nblocks) + while (next_unskippable_block < end) { uint8 vmstatus; @@ -574,7 +695,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, else skipping_blocks = false; - for (blkno = 0; blkno < nblocks; blkno++) + for (blkno = begin; blkno < end; blkno++) { Buffer buf; Page page; @@ -1306,10 +1427,6 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, PROGRESS_VACUUM_PHASE_INDEX_CLEANUP); - /* Do post-vacuum cleanup and statistics update for each index */ - for (i = 0; i < nindexes; i++) - lazy_cleanup_index(Irel[i], indstats[i], vacrelstats); - /* If no indexes, make log report that lazy_vacuum_heap would've made */ if (vacuumed_pages) ereport(elevel, @@ -1317,6 +1434,13 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, RelationGetRelationName(onerel), tups_vacuumed, vacuumed_pages))); + /* Do post-vacuum cleanup and statistics update for each index */ + if (!IsParallelWorker()) + { + for (i = 0; i < nindexes; i++) + lazy_cleanup_index(Irel[i], indstats[i], vacrelstats); + } + /* * This is pretty messy, but we split it up so that we can skip emitting * individual parts of the message when not applicable. @@ -1347,6 +1471,29 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, pfree(buf.data); } +/* + * gather_vacuum_stats() -- Gather vacuum statistics from workers + */ +static void +gather_vacuum_stats(LVRelStats *vacrelstats, LVRelStats *worker_stats, int wnum) +{ + int i; + + /* Gather each worker stats */ + for (i = 0; i < wnum; i++) + { + LVRelStats *wstats = worker_stats + sizeof(LVRelStats) * i; + + vacrelstats->rel_pages += wstats->rel_pages; + vacrelstats->scanned_pages += wstats->scanned_pages; + vacrelstats->pinskipped_pages += wstats->pinskipped_pages; + vacrelstats->frozenskipped_pages += wstats->frozenskipped_pages; + vacrelstats->scanned_tuples += wstats->scanned_tuples; + vacrelstats->new_rel_tuples += wstats->new_rel_tuples; + vacrelstats->pages_removed += wstats->pages_removed; + vacrelstats->nonempty_pages += wstats->nonempty_pages; + } +} /* * lazy_vacuum_heap() -- second pass over the heap diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index c5178f7..0dd64bc 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2661,6 +2661,16 @@ static struct config_int ConfigureNamesInt[] = }, { + {"parallel_vacuum_workers", PGC_USERSET, RESOURCES_ASYNCHRONOUS, + gettext_noop("Sets the number of parallel worker for vacuum."), + NULL + }, + ¶llel_vacuum_workers, + 1, 1, 1024, + NULL, NULL, NULL + }, + + { {"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM, gettext_noop("Sets the maximum memory to be used by each autovacuum worker process."), NULL, diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 80cd4a8..fc46c09 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -147,6 +147,45 @@ typedef struct VacuumParams * activated, -1 to use default */ } VacuumParams; +typedef struct LVRelStats +{ + /* hasindex = true means two-pass strategy; false means one-pass */ + bool hasindex; + /* Overall statistics about rel */ + BlockNumber old_rel_pages; /* previous value of pg_class.relpages */ + BlockNumber rel_pages; /* total number of pages */ + BlockNumber scanned_pages; /* number of pages we examined */ + BlockNumber pinskipped_pages; /* # of pages we skipped due to a pin */ + BlockNumber frozenskipped_pages; /* # of frozen pages we skipped */ + double scanned_tuples; /* counts only tuples on scanned pages */ + double old_rel_tuples; /* previous value of pg_class.reltuples */ + double new_rel_tuples; /* new estimated total # of tuples */ + double new_dead_tuples; /* new estimated total # of dead tuples */ + BlockNumber pages_removed; + double tuples_deleted; + BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */ + /* List of TIDs of tuples we intend to delete */ + /* NB: this list is ordered by TID address */ + int num_dead_tuples; /* current # of entries */ + int max_dead_tuples; /* # slots allocated in array */ + ItemPointer dead_tuples; /* array of ItemPointerData */ + int num_index_scans; + TransactionId latestRemovedXid; + bool lock_waiter_detected; +} LVRelStats; + +typedef struct VacuumTask +{ + Oid relid; /* Target relation oid */ + bool aggressive; /* does each worker need to aggressive vacuum? */ + int options; /* Specified vacuum options */ + TransactionId oldestxmin; + TransactionId freezelimit; + MultiXactId multixactcutoff; + int wnum; + int elevel; +} VacuumTask; + /* GUC parameters */ extern PGDLLIMPORT int default_statistics_target; /* PGDLLIMPORT for * PostGIS */ @@ -154,7 +193,7 @@ extern int vacuum_freeze_min_age; extern int vacuum_freeze_table_age; extern int vacuum_multixact_freeze_min_age; extern int vacuum_multixact_freeze_table_age; - +extern int parallel_vacuum_workers; /* in commands/vacuum.c */ extern void ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel); diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 90fcbd7..4f9d986 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -15,6 +15,7 @@ #ifndef BUFMGR_INTERNALS_H #define BUFMGR_INTERNALS_H +#include "lib/ilist.h" #include "storage/buf.h" #include "storage/bufmgr.h" #include "storage/latch.h" -- 2.8.1
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers