Hi all,

I'd like to propose block level parallel VACUUM.
This feature makes VACUUM possible to use multiple CPU cores.

Vacuum Processing Logic
===================

PostgreSQL VACUUM processing logic consists of 2 phases,
1. Collecting dead tuple locations on heap.
2. Reclaiming dead tuples from heap and indexes.
These phases 1 and 2 are executed alternately, and once amount of dead
tuple location reached maintenance_work_mem in phase 1, phase 2 will
be executed.

Basic Design
==========

As for PoC, I implemented parallel vacuum so that each worker
processes both 1 and 2 phases for particular block range.
Suppose we vacuum 1000 blocks table with 4 workers, each worker
processes 250 consecutive blocks in phase 1 and then reclaims dead
tuples from heap and indexes (phase 2).
To use visibility map efficiency, each worker scan particular block
range of relation and collect dead tuple locations.
After each worker finished task, the leader process gathers these
vacuum statistics information and update relfrozenxid if possible.

I also changed the buffer lock infrastructure so that multiple
processes can wait for cleanup lock on a buffer.
And the new GUC parameter vacuum_parallel_workers controls the number
of vacuum workers.

Performance(PoC)
=========

I ran parallel vacuum on 13GB table (pgbench scale 1000) with several
workers (on my poor virtual machine).
The result is,

1. Vacuum whole table without index (disable page skipping)
  1 worker   : 33 sec
  2 workers : 27 sec
  3 workers : 23 sec
  4 workers : 22 sec

2. Vacuum table and index (after 10000 transaction executed)
  1 worker   : 12 sec
  2 workers : 49 sec
  3 workers : 54 sec
  4 workers : 53 sec

As a result of my test, since multiple process could frequently try to
acquire the cleanup lock on same index buffer, execution time of
parallel vacuum got worse.
And it seems to be effective for only table vacuum so far, but is not
improved as expected (maybe disk bottleneck).

Another Design
============
ISTM that processing index vacuum by multiple process is not good idea
in most cases because many index items can be stored in a page and
multiple vacuum worker could try to require the cleanup lock on the
same index buffer.
It's rather better that multiple workers process particular block
range and then multiple workers process each particular block range,
and then one worker per index processes index vacuum.

Still lots of work to do but attached PoC patch.
Feedback and suggestion are very welcome.

Regards,

--
Masahiko Sawada
From b25d491a05a43fb7adf014b2580c71ec7adb75a2 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.m...@gmail.com>
Date: Mon, 8 Aug 2016 16:43:35 -0700
Subject: [PATCH 1/2] Allow muliple backends to wait for cleanup lock.

---
 src/backend/storage/buffer/buf_init.c |  3 +-
 src/backend/storage/buffer/bufmgr.c   | 57 +++++++++++++++++++++++------------
 src/include/storage/buf_internals.h   |  4 ++-
 src/include/storage/proc.h            |  2 ++
 4 files changed, 45 insertions(+), 21 deletions(-)

diff --git a/src/backend/storage/buffer/buf_init.c 
b/src/backend/storage/buffer/buf_init.c
index a4163cf..2aad030 100644
--- a/src/backend/storage/buffer/buf_init.c
+++ b/src/backend/storage/buffer/buf_init.c
@@ -134,7 +134,8 @@ InitBufferPool(void)
                        CLEAR_BUFFERTAG(buf->tag);
 
                        pg_atomic_init_u32(&buf->state, 0);
-                       buf->wait_backend_pid = 0;
+                       dlist_init(&buf->pin_count_waiters);
+                       pg_atomic_write_u32(&buf->nwaiters, 0);
 
                        buf->buf_id = i;
 
diff --git a/src/backend/storage/buffer/bufmgr.c 
b/src/backend/storage/buffer/bufmgr.c
index 76ade37..f2f4ab9 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -38,6 +38,7 @@
 #include "catalog/storage.h"
 #include "executor/instrument.h"
 #include "lib/binaryheap.h"
+#include "lib/ilist.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "pgstat.h"
@@ -1730,15 +1731,19 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner)
                         */
                        buf_state = LockBufHdr(buf);
 
-                       if ((buf_state & BM_PIN_COUNT_WAITER) &&
-                               BUF_STATE_GET_REFCOUNT(buf_state) == 1)
+                       if (buf_state & BM_PIN_COUNT_WAITER)
                        {
-                               /* we just released the last pin other than the 
waiter's */
-                               int                     wait_backend_pid = 
buf->wait_backend_pid;
+                               dlist_mutable_iter iter;
 
-                               buf_state &= ~BM_PIN_COUNT_WAITER;
+                               if (pg_atomic_read_u32(&buf->nwaiters) == 1)
+                                       buf_state &= ~BM_PIN_COUNT_WAITER;
+
+                               dlist_foreach_modify(iter, 
&buf->pin_count_waiters)
+                               {
+                                       PGPROC *waiter = 
dlist_container(PGPROC, clWaitLink, iter.cur);
+                                       ProcSendSignal(waiter->pid);
+                               }
                                UnlockBufHdr(buf, buf_state);
-                               ProcSendSignal(wait_backend_pid);
                        }
                        else
                                UnlockBufHdr(buf, buf_state);
@@ -3513,8 +3518,17 @@ UnlockBuffers(void)
                 * got a cancel/die interrupt before getting the signal.
                 */
                if ((buf_state & BM_PIN_COUNT_WAITER) != 0 &&
-                       buf->wait_backend_pid == MyProcPid)
-                       buf_state &= ~BM_PIN_COUNT_WAITER;
+                       pg_atomic_read_u32(&buf->nwaiters) == 1)
+               {
+                       dlist_mutable_iter iter;
+
+                       dlist_foreach_modify(iter, &buf->pin_count_waiters)
+                       {
+                               PGPROC *waiter = dlist_container(PGPROC, 
clWaitLink, iter.cur);
+                               if (waiter->pid == MyProcPid)
+                                       buf_state &= ~BM_PIN_COUNT_WAITER;
+                       }
+               }
 
                UnlockBufHdr(buf, buf_state);
 
@@ -3616,20 +3630,24 @@ LockBufferForCleanup(Buffer buffer)
                buf_state = LockBufHdr(bufHdr);
 
                Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
-               if (BUF_STATE_GET_REFCOUNT(buf_state) == 1)
+               /*
+                * If refcount == 1 then we can break immediately.
+                * In case of refcount > 1, if refcount == (nwaiters + 1) then 
break.
+                * Because refcount include other processes and itself, but 
nwaiters
+                * includes only other processes.
+                */
+               if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 ||
+                       ((BUF_STATE_GET_REFCOUNT(buf_state) - 1)==
+                        pg_atomic_read_u32(&bufHdr->nwaiters)))
                {
                        /* Successfully acquired exclusive lock with pincount 1 
*/
                        UnlockBufHdr(bufHdr, buf_state);
                        return;
                }
                /* Failed, so mark myself as waiting for pincount 1 */
-               if (buf_state & BM_PIN_COUNT_WAITER)
-               {
-                       UnlockBufHdr(bufHdr, buf_state);
-                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-                       elog(ERROR, "multiple backends attempting to wait for 
pincount 1");
-               }
-               bufHdr->wait_backend_pid = MyProcPid;
+               pg_atomic_fetch_add_u32(&bufHdr->nwaiters, 1);
+               dlist_push_tail(&bufHdr->pin_count_waiters, 
&MyProc->clWaitLink);
+
                PinCountWaitBuf = bufHdr;
                buf_state |= BM_PIN_COUNT_WAITER;
                UnlockBufHdr(bufHdr, buf_state);
@@ -3662,9 +3680,10 @@ LockBufferForCleanup(Buffer buffer)
                 * better be safe.
                 */
                buf_state = LockBufHdr(bufHdr);
-               if ((buf_state & BM_PIN_COUNT_WAITER) != 0 &&
-                       bufHdr->wait_backend_pid == MyProcPid)
-                       buf_state &= ~BM_PIN_COUNT_WAITER;
+
+               dlist_delete(&MyProc->clWaitLink);
+               pg_atomic_fetch_sub_u32(&bufHdr->nwaiters, 1);
+
                UnlockBufHdr(bufHdr, buf_state);
 
                PinCountWaitBuf = NULL;
diff --git a/src/include/storage/buf_internals.h 
b/src/include/storage/buf_internals.h
index e0dfb2f..90fcbd7 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -182,7 +182,9 @@ typedef struct BufferDesc
        /* state of the tag, containing flags, refcount and usagecount */
        pg_atomic_uint32 state;
 
-       int                     wait_backend_pid;               /* backend PID 
of pin-count waiter */
+       dlist_head      pin_count_waiters;      /* backend PIDs of pin-count 
waiters */
+       pg_atomic_uint32        nwaiters;
+
        int                     freeNext;               /* link in freelist 
chain */
 
        LWLock          content_lock;   /* to lock access to buffer contents */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index f576f05..4cd9416 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -123,6 +123,8 @@ struct PGPROC
        LOCKMASK        heldLocks;              /* bitmask for lock types 
already held on this
                                                                 * lock object 
by this backend */
 
+       dlist_node      clWaitLink;             /* position in Cleanup Lock 
wait list */
+
        /*
         * Info to allow us to wait for synchronous replication, if needed.
         * waitLSN is InvalidXLogRecPtr if not waiting; set only by user 
backend.
-- 
2.8.1

From 1955cb9f68f9c027c566c4909d7ead475cf20f3b Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.m...@gmail.com>
Date: Mon, 8 Aug 2016 16:43:55 -0700
Subject: [PATCH 2/2] Block level parallel Vacuum.

---
 src/backend/access/nbtree/nbtutils.c |  19 ---
 src/backend/commands/vacuum.c        |   1 +
 src/backend/commands/vacuumlazy.c    | 239 ++++++++++++++++++++++++++++-------
 src/backend/utils/misc/guc.c         |  10 ++
 src/include/commands/vacuum.h        |  41 +++++-
 src/include/storage/buf_internals.h  |   1 +
 6 files changed, 245 insertions(+), 66 deletions(-)

diff --git a/src/backend/access/nbtree/nbtutils.c 
b/src/backend/access/nbtree/nbtutils.c
index 5d335c7..987aceb 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -1918,25 +1918,6 @@ _bt_start_vacuum(Relation rel)
        if (result == 0 || result > MAX_BT_CYCLE_ID)
                result = btvacinfo->cycle_ctr = 1;
 
-       /* Let's just make sure there's no entry already for this index */
-       for (i = 0; i < btvacinfo->num_vacuums; i++)
-       {
-               vac = &btvacinfo->vacuums[i];
-               if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
-                       vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
-               {
-                       /*
-                        * Unlike most places in the backend, we have to 
explicitly
-                        * release our LWLock before throwing an error.  This 
is because
-                        * we expect _bt_end_vacuum() to be called before 
transaction
-                        * abort cleanup can run to release LWLocks.
-                        */
-                       LWLockRelease(BtreeVacuumLock);
-                       elog(ERROR, "multiple active vacuums for index \"%s\"",
-                                RelationGetRelationName(rel));
-               }
-       }
-
        /* OK, add an entry */
        if (btvacinfo->num_vacuums >= btvacinfo->max_vacuums)
        {
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 0563e63..1562773 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -58,6 +58,7 @@ int                   vacuum_freeze_min_age;
 int                    vacuum_freeze_table_age;
 int                    vacuum_multixact_freeze_min_age;
 int                    vacuum_multixact_freeze_table_age;
+int                    parallel_vacuum_workers;
 
 
 /* A few variables that don't seem worth passing around as parameters */
diff --git a/src/backend/commands/vacuumlazy.c 
b/src/backend/commands/vacuumlazy.c
index 231e92d..4fc880d 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -42,8 +42,10 @@
 #include "access/heapam_xlog.h"
 #include "access/htup_details.h"
 #include "access/multixact.h"
+#include "access/parallel.h"
 #include "access/transam.h"
 #include "access/visibilitymap.h"
+#include "access/xact.h"
 #include "access/xlog.h"
 #include "catalog/catalog.h"
 #include "catalog/storage.h"
@@ -98,33 +100,9 @@
  */
 #define SKIP_PAGES_THRESHOLD   ((BlockNumber) 32)
 
-typedef struct LVRelStats
-{
-       /* hasindex = true means two-pass strategy; false means one-pass */
-       bool            hasindex;
-       /* Overall statistics about rel */
-       BlockNumber old_rel_pages;      /* previous value of pg_class.relpages 
*/
-       BlockNumber rel_pages;          /* total number of pages */
-       BlockNumber scanned_pages;      /* number of pages we examined */
-       BlockNumber pinskipped_pages;           /* # of pages we skipped due to 
a pin */
-       BlockNumber frozenskipped_pages;        /* # of frozen pages we skipped 
*/
-       double          scanned_tuples; /* counts only tuples on scanned pages 
*/
-       double          old_rel_tuples; /* previous value of pg_class.reltuples 
*/
-       double          new_rel_tuples; /* new estimated total # of tuples */
-       double          new_dead_tuples;        /* new estimated total # of 
dead tuples */
-       BlockNumber pages_removed;
-       double          tuples_deleted;
-       BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
-       /* List of TIDs of tuples we intend to delete */
-       /* NB: this list is ordered by TID address */
-       int                     num_dead_tuples;        /* current # of entries 
*/
-       int                     max_dead_tuples;        /* # slots allocated in 
array */
-       ItemPointer dead_tuples;        /* array of ItemPointerData */
-       int                     num_index_scans;
-       TransactionId latestRemovedXid;
-       bool            lock_waiter_detected;
-} LVRelStats;
-
+/* DSM key for block-level parallel vacuum */
+#define VACUUM_KEY_TASK        50
+#define VACUUM_KEY_WORKER_STATS 51
 
 /* A few variables that don't seem worth passing around as parameters */
 static int     elevel = -1;
@@ -137,9 +115,6 @@ static BufferAccessStrategy vac_strategy;
 
 
 /* non-export function prototypes */
-static void lazy_scan_heap(Relation onerel, int options,
-                          LVRelStats *vacrelstats, Relation *Irel, int 
nindexes,
-                          bool aggressive);
 static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
 static bool lazy_check_needs_freeze(Buffer buf, bool *hastup);
 static void lazy_vacuum_index(Relation indrel,
@@ -162,6 +137,18 @@ static int vac_cmp_itemptr(const void *left, const void 
*right);
 static bool heap_page_is_all_visible(Relation rel, Buffer buf,
                                         TransactionId *visibility_cutoff_xid, 
bool *all_frozen);
 
+/* functions for parallel vacuum */
+static void parallel_lazy_scan_heap(Relation rel, LVRelStats *vacrelstats,
+                                                                       
Relation *Irel, int nindexes, int options,
+                                                                       bool 
aggressive, int wnum);
+static void vacuum_worker(dsm_segment *seg, shm_toc *toc);
+static void lazy_scan_heap(Relation onerel, int options,
+                                                                 LVRelStats 
*vacrelstats, Relation *Irel,
+                                                                 int nindexes, 
bool aggressive,
+                                                                 BlockNumber 
begin, BlockNumber nblocks);
+static void gather_vacuum_stats(LVRelStats *valrelstats, LVRelStats 
*worker_stats,
+                                                               int wnum);
+
 
 /*
  *     lazy_vacuum_rel() -- perform LAZY VACUUM for one heap relation
@@ -248,8 +235,17 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams 
*params,
        vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
        vacrelstats->hasindex = (nindexes > 0);
 
-       /* Do the vacuuming */
-       lazy_scan_heap(onerel, options, vacrelstats, Irel, nindexes, 
aggressive);
+       /* Do the parallel vacuuming. */
+       if (parallel_vacuum_workers > 1)
+               parallel_lazy_scan_heap(onerel, vacrelstats, Irel, nindexes, 
options,
+                                                               aggressive, 
parallel_vacuum_workers);
+       else
+       {
+               BlockNumber nblocks = RelationGetNumberOfBlocks(onerel);
+
+               lazy_scan_heap(onerel, options, vacrelstats, Irel, nindexes,
+                                          aggressive, 0, nblocks);
+       }
 
        /* Done with indexes */
        vac_close_indexes(nindexes, Irel, NoLock);
@@ -428,7 +424,132 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats 
*vacrelstats)
 }
 
 /*
- *     lazy_scan_heap() -- scan an open heap relation
+ * Launch parallel vacuum workers specified by vacuum_parallel_workers and then
+ * gather the result stats of each workers. The idea of vacuuming one relation
+ * with multiple workers parallely is that each worker is assigned particlar 
block
+ * range of relation which is calculated using by parallel_vacuum_workers and
+ * the number of relation blocks. The informations and some threshoulds (e.g.
+ * OldestXmin, FreezeLimit, MultiXactCufoff) are stored into DSM tagged by
+ * VACUUM_KEY_TASK. Each worker can collect the garbage tid and reclaims them 
as
+ * well.  Vacuum statistics for each workers are stored into DSm tagged by
+ * VACUUM_KEY_WORKER_STATS, that will be gathered by the leader process after 
all
+ * worker finished its task.
+ */
+static void
+parallel_lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
+                                               Relation *Irel, int nindexes, 
int options,
+                                               bool aggressive, int wnum)
+{
+       ParallelContext *pcxt;
+       LVRelStats *wstats_space;
+       VacuumTask      *task_space;
+       IndexBulkDeleteResult **indstats;
+       int size = 0;
+       int     i;
+
+       EnterParallelMode();
+
+       /* Create parallel context and initialize it */
+       pcxt = CreateParallelContext(vacuum_worker, wnum);
+       size += BUFFERALIGN(sizeof(VacuumTask)); /* For task */
+       size += BUFFERALIGN(sizeof(LVRelStats) * pcxt->nworkers); /* For worker 
stats */
+       shm_toc_estimate_chunk(&pcxt->estimator, size);
+       shm_toc_estimate_keys(&pcxt->estimator, 2);
+       InitializeParallelDSM(pcxt);
+
+       /* Prepare for VacuumTask space */
+       task_space = (VacuumTask *)shm_toc_allocate(pcxt->toc, 
sizeof(VacuumTask));
+       shm_toc_insert(pcxt->toc, VACUUM_KEY_TASK, task_space);
+       task_space->relid = RelationGetRelid(onerel);
+       task_space->aggressive = aggressive;
+       task_space->options = options;
+       task_space->oldestxmin = OldestXmin;
+       task_space->freezelimit = FreezeLimit;
+       task_space->multixactcutoff = MultiXactCutoff;
+       task_space->wnum = wnum;
+       task_space->elevel = elevel;
+
+       /* Prepare for worker LVRelStats space */
+       wstats_space = (LVRelStats *)shm_toc_allocate(pcxt->toc,
+                                                                               
                 sizeof(LVRelStats) * pcxt->nworkers);
+       shm_toc_insert(pcxt->toc, VACUUM_KEY_WORKER_STATS, wstats_space);
+       for (i = 0; i < pcxt->nworkers; i++)
+       {
+               LVRelStats *wstats = wstats_space + sizeof(LVRelStats) * i;
+               memcpy(wstats, vacrelstats, sizeof(LVRelStats));
+       }
+
+       /* Do parallel vacuum */
+       LaunchParallelWorkers(pcxt);
+
+       /* Wait for workers finising vacuuming */
+       WaitForParallelWorkersToFinish(pcxt);
+       gather_vacuum_stats(vacrelstats, wstats_space, wnum);
+
+       DestroyParallelContext(pcxt);
+       ExitParallelMode();
+
+       indstats = (IndexBulkDeleteResult **)
+               palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
+
+       /* Do post-vacuum cleanup and statistics update for each index */
+       for (i = 0; i < nindexes; i++)
+               lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
+}
+
+/*
+ * Entry function for parallel vacuum worker. Each worker calculates the
+ * starting block number and number of blocks need to process, and then
+ * does vacuuming particular block range of relation.
+ */
+static void
+vacuum_worker(dsm_segment *seg, shm_toc *toc)
+{
+       VacuumTask      *task;
+       LVRelStats      *wstats_space;
+       LVRelStats      *wstats;
+       Relation        rel;
+       BlockNumber     begin;
+       BlockNumber nblocks_per_worker;
+       BlockNumber     nblocks;
+       int                     nindexes;
+       Relation        *Irel;
+
+       /* Set up task information */
+       task = (VacuumTask *)shm_toc_lookup(toc, VACUUM_KEY_TASK);
+       OldestXmin = task->oldestxmin;
+       FreezeLimit = task->freezelimit;
+       MultiXactCutoff = task->multixactcutoff;
+
+       /* Set up message queue */
+       wstats_space = (LVRelStats *)shm_toc_lookup(toc, 
VACUUM_KEY_WORKER_STATS);
+       wstats = wstats_space + sizeof(LVRelStats) * ParallelWorkerNumber;
+
+       /* Calculate how many blocks the worker should process */
+       rel = heap_open(task->relid, NoLock);
+       vac_open_indexes(rel, RowExclusiveLock, &nindexes, &Irel);
+       nblocks_per_worker = RelationGetNumberOfBlocks(rel) / 
parallel_vacuum_workers;
+       begin = nblocks_per_worker * ParallelWorkerNumber;
+
+       /* The last worker processes remaining blocks */
+       if (ParallelWorkerNumber == (task->wnum - 1))
+               nblocks = RelationGetNumberOfBlocks(rel) - begin;
+       else
+               nblocks = nblocks_per_worker;
+
+       /* Set up elevel */
+       elevel = task->elevel;
+
+       /* Do vacuuming particular area */
+       lazy_scan_heap(rel, task->options, wstats, Irel, nindexes,
+                                  task->aggressive, begin, nblocks);
+
+       heap_close(rel, NoLock);
+       vac_close_indexes(nindexes, Irel, NoLock);
+}
+
+/*
+ *     lazy_scan_heap() -- scan paritclar range of open heap relation
  *
  *             This routine prunes each page in the heap, which will among 
other
  *             things truncate dead tuples to dead line pointers, defragment 
the
@@ -445,10 +566,10 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats 
*vacrelstats)
  */
 static void
 lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
-                          Relation *Irel, int nindexes, bool aggressive)
+                          Relation     *Irel, int nindexes, bool aggressive,
+                          BlockNumber begin, BlockNumber nblocks)
 {
-       BlockNumber nblocks,
-                               blkno;
+       BlockNumber blkno;
        HeapTupleData tuple;
        char       *relname;
        BlockNumber empty_pages,
@@ -471,14 +592,15 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats 
*vacrelstats,
                PROGRESS_VACUUM_MAX_DEAD_TUPLES
        };
        int64           initprog_val[3];
+       BlockNumber end = begin + nblocks;
 
        pg_rusage_init(&ru0);
 
        relname = RelationGetRelationName(onerel);
        ereport(elevel,
-                       (errmsg("vacuuming \"%s.%s\"",
+                       (errmsg("vacuuming \"%s.%s\", from block %u to %u, %u 
blocks",
                                        
get_namespace_name(RelationGetNamespace(onerel)),
-                                       relname)));
+                                       relname, begin, end, nblocks)));
 
        empty_pages = vacuumed_pages = 0;
        num_tuples = tups_vacuumed = nkeep = nunused = 0;
@@ -486,7 +608,6 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats 
*vacrelstats,
        indstats = (IndexBulkDeleteResult **)
                palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
 
-       nblocks = RelationGetNumberOfBlocks(onerel);
        vacrelstats->rel_pages = nblocks;
        vacrelstats->scanned_pages = 0;
        vacrelstats->nonempty_pages = 0;
@@ -545,10 +666,10 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats 
*vacrelstats,
         * the last page.  This is worth avoiding mainly because such a lock 
must
         * be replayed on any hot standby, where it can be disruptive.
         */
-       next_unskippable_block = 0;
+       next_unskippable_block = begin;
        if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0)
        {
-               while (next_unskippable_block < nblocks)
+               while (next_unskippable_block < end)
                {
                        uint8           vmstatus;
 
@@ -574,7 +695,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats 
*vacrelstats,
        else
                skipping_blocks = false;
 
-       for (blkno = 0; blkno < nblocks; blkno++)
+       for (blkno = begin; blkno < end; blkno++)
        {
                Buffer          buf;
                Page            page;
@@ -1306,10 +1427,6 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats 
*vacrelstats,
        pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
                                                                 
PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
 
-       /* Do post-vacuum cleanup and statistics update for each index */
-       for (i = 0; i < nindexes; i++)
-               lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
-
        /* If no indexes, make log report that lazy_vacuum_heap would've made */
        if (vacuumed_pages)
                ereport(elevel,
@@ -1317,6 +1434,13 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats 
*vacrelstats,
                                                RelationGetRelationName(onerel),
                                                tups_vacuumed, 
vacuumed_pages)));
 
+       /* Do post-vacuum cleanup and statistics update for each index */
+       if (!IsParallelWorker())
+       {
+               for (i = 0; i < nindexes; i++)
+                       lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
+       }
+
        /*
         * This is pretty messy, but we split it up so that we can skip emitting
         * individual parts of the message when not applicable.
@@ -1347,6 +1471,29 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats 
*vacrelstats,
        pfree(buf.data);
 }
 
+/*
+ * gather_vacuum_stats() -- Gather vacuum statistics from workers
+ */
+static void
+gather_vacuum_stats(LVRelStats *vacrelstats, LVRelStats *worker_stats, int 
wnum)
+{
+       int     i;
+
+       /* Gather each worker stats */
+       for (i = 0; i < wnum; i++)
+       {
+               LVRelStats *wstats = worker_stats + sizeof(LVRelStats) * i;
+
+               vacrelstats->rel_pages += wstats->rel_pages;
+               vacrelstats->scanned_pages += wstats->scanned_pages;
+               vacrelstats->pinskipped_pages += wstats->pinskipped_pages;
+               vacrelstats->frozenskipped_pages += wstats->frozenskipped_pages;
+               vacrelstats->scanned_tuples += wstats->scanned_tuples;
+               vacrelstats->new_rel_tuples += wstats->new_rel_tuples;
+               vacrelstats->pages_removed += wstats->pages_removed;
+               vacrelstats->nonempty_pages += wstats->nonempty_pages;
+       }
+}
 
 /*
  *     lazy_vacuum_heap() -- second pass over the heap
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index c5178f7..0dd64bc 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2661,6 +2661,16 @@ static struct config_int ConfigureNamesInt[] =
        },
 
        {
+               {"parallel_vacuum_workers", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
+                       gettext_noop("Sets the number of parallel worker for 
vacuum."),
+                       NULL
+               },
+               &parallel_vacuum_workers,
+               1, 1, 1024,
+               NULL, NULL, NULL
+       },
+
+       {
                {"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM,
                        gettext_noop("Sets the maximum memory to be used by 
each autovacuum worker process."),
                        NULL,
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 80cd4a8..fc46c09 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -147,6 +147,45 @@ typedef struct VacuumParams
                                                                                
 * activated, -1 to use default */
 } VacuumParams;
 
+typedef struct LVRelStats
+{
+       /* hasindex = true means two-pass strategy; false means one-pass */
+       bool            hasindex;
+       /* Overall statistics about rel */
+       BlockNumber old_rel_pages;      /* previous value of pg_class.relpages 
*/
+       BlockNumber rel_pages;          /* total number of pages */
+       BlockNumber scanned_pages;      /* number of pages we examined */
+       BlockNumber pinskipped_pages;           /* # of pages we skipped due to 
a pin */
+       BlockNumber frozenskipped_pages;        /* # of frozen pages we skipped 
*/
+       double          scanned_tuples; /* counts only tuples on scanned pages 
*/
+       double          old_rel_tuples; /* previous value of pg_class.reltuples 
*/
+       double          new_rel_tuples; /* new estimated total # of tuples */
+       double          new_dead_tuples;        /* new estimated total # of 
dead tuples */
+       BlockNumber pages_removed;
+       double          tuples_deleted;
+       BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
+       /* List of TIDs of tuples we intend to delete */
+       /* NB: this list is ordered by TID address */
+       int                     num_dead_tuples;        /* current # of entries 
*/
+       int                     max_dead_tuples;        /* # slots allocated in 
array */
+       ItemPointer dead_tuples;        /* array of ItemPointerData */
+       int                     num_index_scans;
+       TransactionId latestRemovedXid;
+       bool            lock_waiter_detected;
+} LVRelStats;
+
+typedef struct VacuumTask
+{
+       Oid                     relid;  /* Target relation oid */
+       bool            aggressive;     /* does each worker need to aggressive 
vacuum? */
+       int                     options; /* Specified vacuum options */
+       TransactionId   oldestxmin;
+       TransactionId   freezelimit;
+       MultiXactId             multixactcutoff;
+       int                     wnum;
+       int                     elevel;
+} VacuumTask;
+
 /* GUC parameters */
 extern PGDLLIMPORT int default_statistics_target;              /* PGDLLIMPORT 
for
                                                                                
                                 * PostGIS */
@@ -154,7 +193,7 @@ extern int  vacuum_freeze_min_age;
 extern int     vacuum_freeze_table_age;
 extern int     vacuum_multixact_freeze_min_age;
 extern int     vacuum_multixact_freeze_table_age;
-
+extern int     parallel_vacuum_workers;
 
 /* in commands/vacuum.c */
 extern void ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel);
diff --git a/src/include/storage/buf_internals.h 
b/src/include/storage/buf_internals.h
index 90fcbd7..4f9d986 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -15,6 +15,7 @@
 #ifndef BUFMGR_INTERNALS_H
 #define BUFMGR_INTERNALS_H
 
+#include "lib/ilist.h"
 #include "storage/buf.h"
 #include "storage/bufmgr.h"
 #include "storage/latch.h"
-- 
2.8.1

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to