Hi all,

The parallel vacuum we have today supports only for index vacuuming.
Therefore, while multiple workers can work on different indexes in
parallel, the heap table is always processed by the single process.
I'd like to propose $subject, which enables us to have multiple
workers running on the single heap table. This would be helpful to
speedup vacuuming for tables without indexes or tables with
INDEX_CLENAUP = off.

I've attached a PoC patch for this feature. It implements only
parallel heap scans in lazyvacum. We can extend this feature to
support parallel heap vacuum as well in the future or in the same
patch.

# Overall idea (for parallel heap scan in lazy vacuum)

At the beginning of vacuum, we determine how many workers to launch
based on the table size like other parallel query operations. The
number of workers is capped by max_parallel_maitenance_workers. Once
we decided to use parallel heap scan, we prepared DSM to share data
among parallel workers and leader. The information include at least
the vacuum option such as aggressive, the counters collected during
lazy vacuum such as scanned_pages, vacuum cutoff such as VacuumCutoffs
and GlobalVisState, and parallel scan description.

Before starting heap scan in lazy vacuum, we launch parallel workers
and then each worker (and the leader) process different blocks. Each
worker does HOT-pruning on pages and collects dead tuple TIDs. When
adding dead tuple TIDs, workers need to hold an exclusive lock on
TidStore. At the end of heap scan phase, workers exit and the leader
will wait for all workers to exit. After that, the leader process
gather the counters collected by parallel workers, and compute the
oldest relfrozenxid (and relminmxid). Then if parallel index vacuum is
also enabled, we launch other parallel workers for parallel index
vacuuming.

When it comes to parallel heap scan in lazy vacuum, I think we can use
the table_block_parallelscan_XXX() family. One tricky thing we need to
deal with is that if the TideStore memory usage reaches the limit, we
stop the parallel scan, do index vacuum and table vacuum, and then
resume the parallel scan from the previous state. In order to do that,
in the patch, we store ParallelBlockTableScanWorker, per-worker
parallel scan state, into DSM so that different parallel workers can
resume the scan using the same parallel scan state.

In addition to that, since we could end up launching fewer workers
than requested, it could happen that some ParallelBlockTableScanWorker
data is used once and never be used while remaining unprocessed
blocks. To handle this case, in the patch, the leader process checks
at the end of the parallel scan if there is an uncompleted parallel
scan. If so, the leader process does the scan using worker's
ParallelBlockTableScanWorker data on behalf of workers.

# Discussions

I'm somewhat convinced the brief design of this feature, but there are
some points regarding the implementation we need to discuss.

In the patch, I extended vacuumparalle.c to support parallel table
scan (and vacuum in the future). So I was required to add some table
AM callbacks such as DSM size estimation, DSM initialization, and
actual table scans etc. We need to verify these APIs are appropriate.
Specifically, if we want to support both parallel heap scan and
parallel heap vacuum, do we want to add separate callbacks for them?
It could be overkill since such a 2-pass vacuum strategy is specific
to heap AM.

As another implementation idea, we might want to implement parallel
heap scan/vacuum in lazyvacuum.c while minimizing changes for
vacuumparallel.c. That way, we would not need to add table AM
callbacks. However, we would end up having duplicate codes related to
parallel operation in vacuum such as vacuum delays.

Also, we might need to add some functions to share GlobalVisState
among parallel workers, since GlobalVisState is a private struct.

Other points I'm somewhat uncomfortable with or need to be discussed
remain in the code with XXX comments.

# Benchmark results

* Test-1: parallel heap scan on the table without indexes

I created 20GB table, made garbage on the table, and run vacuum while
changing parallel degree:

create unlogged table test (a int) with (autovacuum_enabled = off);
insert into test select generate_series(1, 600000000); --- 20GB table
delete from test where a % 5 = 0;
vacuum (verbose, parallel 0) test;

Here are the results (total time and heap scan time):

PARALLEL 0: 21.99 s (single process)
PARALLEL 1: 11.39 s
PARALLEL 2:   8.36 s
PARALLEL 3:   6.14 s
PARALLEL 4:   5.08 s

* Test-2: parallel heap scan on the table with one index

I used a similar table to the test case 1 but created one btree index on it:

create unlogged table test (a int) with (autovacuum_enabled = off);
insert into test select generate_series(1, 600000000); --- 20GB table
create index on test (a);
delete from test where a % 5 = 0;
vacuum (verbose, parallel 0) test;

I've measured the total execution time as well as the time of each
vacuum phase (from left heap scan time, index vacuum time, and heap
vacuum time):

PARALLEL 0: 45.11 s (21.89, 16.74, 6.48)
PARALLEL 1: 42.13 s (12.75, 22.04, 7.23)
PARALLEL 2: 39.27 s (8.93, 22.78, 7.45)
PARALLEL 3: 36.53 s (6.76, 22.00, 7.65)
PARALLEL 4: 35.84 s (5.85, 22.04, 7.83)

Overall, I can see the parallel heap scan in lazy vacuum has a decent
scalability; In both test-1 and test-2, the execution time of heap
scan got ~4x faster with 4 parallel workers. On the other hand, when
it comes to the total vacuum execution time, I could not see much
performance improvement in test-2 (45.11 vs. 35.84). Looking at the
results PARALLEL 0 vs. PARALLEL 1 in test-2, the heap scan got faster
(21.89 vs. 12.75) whereas index vacuum got slower (16.74 vs. 22.04),
and heap scan in case 2 was not as fast as in case 1 with 1 parallel
worker (12.75 vs. 11.39).

I think the reason is the shared TidStore is not very scalable since
we have a single lock on it. In all cases in the test-1, we don't use
the shared TidStore since all dead tuples are removed during heap
pruning. So the scalability was better overall than in test-2. In
parallel 0 case in test-2, we use the local TidStore, and from
parallel degree of 1 in test-2, we use the shared TidStore and
parallel worker concurrently update it. Also, I guess that the lookup
performance of the local TidStore is better than the shared TidStore's
lookup performance because of the differences between a bump context
and an DSA area. I think that this difference contributed the fact
that index vacuuming got slower (16.74 vs. 22.04).

There are two obvious improvement ideas to improve overall vacuum
execution time: (1) improve the shared TidStore scalability and (2)
support parallel heap vacuum. For (1), several ideas are proposed by
the ART authors[1]. I've not tried these ideas but it might be
applicable to our ART implementation. But I prefer to start with (2)
since it would be easier. Feedback is very welcome.

Regards,

[1] https://db.in.tum.de/~leis/papers/artsync.pdf

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 6f8b1b7929..cf8c6614cd 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2630,6 +2630,12 @@ static const TableAmRoutine heapam_methods = {
 	.relation_copy_data = heapam_relation_copy_data,
 	.relation_copy_for_cluster = heapam_relation_copy_for_cluster,
 	.relation_vacuum = heap_vacuum_rel,
+
+	.parallel_vacuum_compute_workers = heap_parallel_vacuum_compute_workers,
+	.parallel_vacuum_estimate = heap_parallel_vacuum_estimate,
+	.parallel_vacuum_initialize = heap_parallel_vacuum_initialize,
+	.parallel_vacuum_scan_worker = heap_parallel_vacuum_scan_worker,
+
 	.scan_analyze_next_block = heapam_scan_analyze_next_block,
 	.scan_analyze_next_tuple = heapam_scan_analyze_next_tuple,
 	.index_build_range_scan = heapam_index_build_range_scan,
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 3f88cf1e8e..4ccf15ffe3 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -49,6 +49,7 @@
 #include "common/int.h"
 #include "executor/instrument.h"
 #include "miscadmin.h"
+#include "optimizer/paths.h"
 #include "pgstat.h"
 #include "portability/instr_time.h"
 #include "postmaster/autovacuum.h"
@@ -117,10 +118,22 @@
 #define PREFETCH_SIZE			((BlockNumber) 32)
 
 /*
- * Macro to check if we are in a parallel vacuum.  If true, we are in the
- * parallel mode and the DSM segment is initialized.
+ * DSM keys for heap parallel vacuum scan. Unlike other parallel execution code, we
+ * we don't need to worry about DSM keys conflicting with plan_node_id, but need to
+ * avoid conflicting with DSM keys used in vacuumparallel.c.
+ */
+#define LV_PARALLEL_SCAN_SHARED			0xFFFF0001
+#define LV_PARALLEL_SCAN_DESC			0xFFFF0002
+#define LV_PARALLEL_SCAN_DESC_WORKER	0xFFFF0003
+
+/*
+ * Macro to check if we are in a parallel vacuum.  If ParallelVacuumIsActive() is
+ * true, we are in the parallel mode, meaning that we do either parallel index
+ * vacuuming or parallel table vacuuming, or both. If ParallelHeapVacuumIsActive()
+ * is true, we do at least parallel table vacuuming.
  */
 #define ParallelVacuumIsActive(vacrel) ((vacrel)->pvs != NULL)
+#define ParallelHeapVacuumIsActive(vacrel) ((vacrel)->phvstate != NULL)
 
 /* Phases of vacuum during which we report error context. */
 typedef enum
@@ -133,6 +146,80 @@ typedef enum
 	VACUUM_ERRCB_PHASE_TRUNCATE,
 } VacErrPhase;
 
+/*
+ * Relation statistics collected during heap scanning and need to be shared among
+ * parallel vacuum workers.
+ */
+typedef struct LVRelCounters
+{
+	BlockNumber scanned_pages;	/* # pages examined (not skipped via VM) */
+	BlockNumber removed_pages;	/* # pages removed by relation truncation */
+	BlockNumber frozen_pages;	/* # pages with newly frozen tuples */
+	BlockNumber lpdead_item_pages;	/* # pages with LP_DEAD items */
+	BlockNumber missed_dead_pages;	/* # pages with missed dead tuples */
+	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
+
+	/* Counters that follow are only for scanned_pages */
+	int64		tuples_deleted; /* # deleted from table */
+	int64		tuples_frozen;	/* # newly frozen */
+	int64		lpdead_items;	/* # deleted from indexes */
+	int64		live_tuples;	/* # live tuples remaining */
+	int64		recently_dead_tuples;	/* # dead, but not yet removable */
+	int64		missed_dead_tuples; /* # removable, but not removed */
+
+	/* Tracks oldest extant XID/MXID for setting relfrozenxid/relminmxid. */
+	TransactionId NewRelfrozenXid;
+	MultiXactId NewRelminMxid;
+	bool		skippedallvis;
+} LVRelCounters;
+
+/*
+ * Struct for information that need to be shared among parallel vacuum workers
+ */
+typedef struct PHVShared
+{
+	bool	aggressive;
+	bool	skipwithvm;
+
+	/* The initial values shared by the leader process */
+	TransactionId NewRelfrozenXid;
+	MultiXactId NewRelminMxid;
+	bool		skippedallvis;
+
+	/* VACUUM operation's cutoffs for freezing and pruning */
+	struct VacuumCutoffs cutoffs;
+	GlobalVisState 	vistest;
+
+	LVRelCounters	worker_relcnts[FLEXIBLE_ARRAY_MEMBER];
+} PHVShared;
+#define SizeOfPHVShared (offsetof(PHVShared, worker_relcnts))
+
+/* Per-worker scan state */
+typedef struct PHVScanWorkerState
+{
+	ParallelBlockTableScanWorkerData	state;
+	bool	maybe_have_blocks;
+} PHVScanWorkerState;
+
+/* Struct for parallel heap vacuum */
+typedef struct PHVState
+{
+	/* Parallel scan description shared among parallel workers */
+	ParallelBlockTableScanDesc		pscandesc;
+
+	/* Shared information */
+	PHVShared			*shared;
+
+	/* Per-worker scan state */
+	PHVScanWorkerState	*myscanstate;
+
+	/* Points to all per-worker scan state array */
+	PHVScanWorkerState *scanstates;
+
+	/* The number of workers launched for parallel heap vacuum */
+	int		nworkers_launched;
+} PHVState;
+
 typedef struct LVRelState
 {
 	/* Target heap relation and its indexes */
@@ -144,6 +231,12 @@ typedef struct LVRelState
 	BufferAccessStrategy bstrategy;
 	ParallelVacuumState *pvs;
 
+	/* Parallel heap vacuum state and sizes for each struct */
+	PHVState	*phvstate;
+	Size		pscan_len;
+	Size		shared_len;
+	Size		pscanwork_len;
+
 	/* Aggressive VACUUM? (must set relfrozenxid >= FreezeLimit) */
 	bool		aggressive;
 	/* Use visibility map to skip? (disabled by DISABLE_PAGE_SKIPPING) */
@@ -159,10 +252,6 @@ typedef struct LVRelState
 	/* VACUUM operation's cutoffs for freezing and pruning */
 	struct VacuumCutoffs cutoffs;
 	GlobalVisState *vistest;
-	/* Tracks oldest extant XID/MXID for setting relfrozenxid/relminmxid */
-	TransactionId NewRelfrozenXid;
-	MultiXactId NewRelminMxid;
-	bool		skippedallvis;
 
 	/* Error reporting state */
 	char	   *dbname;
@@ -188,12 +277,10 @@ typedef struct LVRelState
 	VacDeadItemsInfo *dead_items_info;
 
 	BlockNumber rel_pages;		/* total number of pages */
-	BlockNumber scanned_pages;	/* # pages examined (not skipped via VM) */
-	BlockNumber removed_pages;	/* # pages removed by relation truncation */
-	BlockNumber frozen_pages;	/* # pages with newly frozen tuples */
-	BlockNumber lpdead_item_pages;	/* # pages with LP_DEAD items */
-	BlockNumber missed_dead_pages;	/* # pages with missed dead tuples */
-	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
+	BlockNumber next_fsm_block_to_vacuum;
+
+	/* Block and tuple counters for the relation */
+	LVRelCounters	*counters;
 
 	/* Statistics output by us, for table */
 	double		new_rel_tuples; /* new estimated total # of tuples */
@@ -203,13 +290,6 @@ typedef struct LVRelState
 
 	/* Instrumentation counters */
 	int			num_index_scans;
-	/* Counters that follow are only for scanned_pages */
-	int64		tuples_deleted; /* # deleted from table */
-	int64		tuples_frozen;	/* # newly frozen */
-	int64		lpdead_items;	/* # deleted from indexes */
-	int64		live_tuples;	/* # live tuples remaining */
-	int64		recently_dead_tuples;	/* # dead, but not yet removable */
-	int64		missed_dead_tuples; /* # removable, but not removed */
 
 	/* State maintained by heap_vac_scan_next_block() */
 	BlockNumber current_block;	/* last block returned */
@@ -229,6 +309,7 @@ typedef struct LVSavedErrInfo
 
 /* non-export function prototypes */
 static void lazy_scan_heap(LVRelState *vacrel);
+static bool do_lazy_scan_heap(LVRelState *vacrel);
 static bool heap_vac_scan_next_block(LVRelState *vacrel, BlockNumber *blkno,
 									 bool *all_visible_according_to_vm);
 static void find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis);
@@ -271,6 +352,12 @@ static void dead_items_cleanup(LVRelState *vacrel);
 static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
 									 TransactionId *visibility_cutoff_xid, bool *all_frozen);
 static void update_relstats_all_indexes(LVRelState *vacrel);
+
+
+static void	do_parallel_lazy_scan_heap(LVRelState *vacrel);
+static void parallel_heap_vacuum_gather_scan_stats(LVRelState *vacrel);
+static void parallel_heap_complete_unfinised_scan(LVRelState *vacrel);
+
 static void vacuum_error_callback(void *arg);
 static void update_vacuum_error_info(LVRelState *vacrel,
 									 LVSavedErrInfo *saved_vacrel,
@@ -296,6 +383,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 				BufferAccessStrategy bstrategy)
 {
 	LVRelState *vacrel;
+	LVRelCounters *counters;
 	bool		verbose,
 				instrument,
 				skipwithvm,
@@ -406,14 +494,28 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 		Assert(params->index_cleanup == VACOPTVALUE_AUTO);
 	}
 
+	vacrel->next_fsm_block_to_vacuum = 0;
+
 	/* Initialize page counters explicitly (be tidy) */
-	vacrel->scanned_pages = 0;
-	vacrel->removed_pages = 0;
-	vacrel->frozen_pages = 0;
-	vacrel->lpdead_item_pages = 0;
-	vacrel->missed_dead_pages = 0;
-	vacrel->nonempty_pages = 0;
-	/* dead_items_alloc allocates vacrel->dead_items later on */
+	counters = palloc(sizeof(LVRelCounters));
+	counters->scanned_pages = 0;
+	counters->removed_pages = 0;
+	counters->frozen_pages = 0;
+	counters->lpdead_item_pages = 0;
+	counters->missed_dead_pages = 0;
+	counters->nonempty_pages = 0;
+
+	/* Initialize remaining counters (be tidy) */
+	counters->tuples_deleted = 0;
+	counters->tuples_frozen = 0;
+	counters->lpdead_items = 0;
+	counters->live_tuples = 0;
+	counters->recently_dead_tuples = 0;
+	counters->missed_dead_tuples = 0;
+
+	vacrel->counters = counters;
+
+	vacrel->num_index_scans = 0;
 
 	/* Allocate/initialize output statistics state */
 	vacrel->new_rel_tuples = 0;
@@ -421,14 +523,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	vacrel->indstats = (IndexBulkDeleteResult **)
 		palloc0(vacrel->nindexes * sizeof(IndexBulkDeleteResult *));
 
-	/* Initialize remaining counters (be tidy) */
-	vacrel->num_index_scans = 0;
-	vacrel->tuples_deleted = 0;
-	vacrel->tuples_frozen = 0;
-	vacrel->lpdead_items = 0;
-	vacrel->live_tuples = 0;
-	vacrel->recently_dead_tuples = 0;
-	vacrel->missed_dead_tuples = 0;
+	/* dead_items_alloc allocates vacrel->dead_items later on */
 
 	/*
 	 * Get cutoffs that determine which deleted tuples are considered DEAD,
@@ -450,9 +545,9 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	vacrel->rel_pages = orig_rel_pages = RelationGetNumberOfBlocks(rel);
 	vacrel->vistest = GlobalVisTestFor(rel);
 	/* Initialize state used to track oldest extant XID/MXID */
-	vacrel->NewRelfrozenXid = vacrel->cutoffs.OldestXmin;
-	vacrel->NewRelminMxid = vacrel->cutoffs.OldestMxact;
-	vacrel->skippedallvis = false;
+	vacrel->counters->NewRelfrozenXid = vacrel->cutoffs.OldestXmin;
+	vacrel->counters->NewRelminMxid = vacrel->cutoffs.OldestMxact;
+	vacrel->counters->skippedallvis = false;
 	skipwithvm = true;
 	if (params->options & VACOPT_DISABLE_PAGE_SKIPPING)
 	{
@@ -533,15 +628,15 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	 * value >= FreezeLimit, and relminmxid to a value >= MultiXactCutoff.
 	 * Non-aggressive VACUUMs may advance them by any amount, or not at all.
 	 */
-	Assert(vacrel->NewRelfrozenXid == vacrel->cutoffs.OldestXmin ||
+	Assert(vacrel->counters->NewRelfrozenXid == vacrel->cutoffs.OldestXmin ||
 		   TransactionIdPrecedesOrEquals(vacrel->aggressive ? vacrel->cutoffs.FreezeLimit :
 										 vacrel->cutoffs.relfrozenxid,
-										 vacrel->NewRelfrozenXid));
-	Assert(vacrel->NewRelminMxid == vacrel->cutoffs.OldestMxact ||
+										 vacrel->counters->NewRelfrozenXid));
+	Assert(vacrel->counters->NewRelminMxid == vacrel->cutoffs.OldestMxact ||
 		   MultiXactIdPrecedesOrEquals(vacrel->aggressive ? vacrel->cutoffs.MultiXactCutoff :
 									   vacrel->cutoffs.relminmxid,
-									   vacrel->NewRelminMxid));
-	if (vacrel->skippedallvis)
+									   vacrel->counters->NewRelminMxid));
+	if (vacrel->counters->skippedallvis)
 	{
 		/*
 		 * Must keep original relfrozenxid in a non-aggressive VACUUM that
@@ -549,8 +644,8 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 		 * values will have missed unfrozen XIDs from the pages we skipped.
 		 */
 		Assert(!vacrel->aggressive);
-		vacrel->NewRelfrozenXid = InvalidTransactionId;
-		vacrel->NewRelminMxid = InvalidMultiXactId;
+		vacrel->counters->NewRelfrozenXid = InvalidTransactionId;
+		vacrel->counters->NewRelminMxid = InvalidMultiXactId;
 	}
 
 	/*
@@ -571,7 +666,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	 */
 	vac_update_relstats(rel, new_rel_pages, vacrel->new_live_tuples,
 						new_rel_allvisible, vacrel->nindexes > 0,
-						vacrel->NewRelfrozenXid, vacrel->NewRelminMxid,
+						vacrel->counters->NewRelfrozenXid, vacrel->counters->NewRelminMxid,
 						&frozenxid_updated, &minmulti_updated, false);
 
 	/*
@@ -587,8 +682,8 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	pgstat_report_vacuum(RelationGetRelid(rel),
 						 rel->rd_rel->relisshared,
 						 Max(vacrel->new_live_tuples, 0),
-						 vacrel->recently_dead_tuples +
-						 vacrel->missed_dead_tuples);
+						 vacrel->counters->recently_dead_tuples +
+						 vacrel->counters->missed_dead_tuples);
 	pgstat_progress_end_command();
 
 	if (instrument)
@@ -651,21 +746,21 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 							 vacrel->relname,
 							 vacrel->num_index_scans);
 			appendStringInfo(&buf, _("pages: %u removed, %u remain, %u scanned (%.2f%% of total)\n"),
-							 vacrel->removed_pages,
+							 vacrel->counters->removed_pages,
 							 new_rel_pages,
-							 vacrel->scanned_pages,
+							 vacrel->counters->scanned_pages,
 							 orig_rel_pages == 0 ? 100.0 :
-							 100.0 * vacrel->scanned_pages / orig_rel_pages);
+							 100.0 * vacrel->counters->scanned_pages / orig_rel_pages);
 			appendStringInfo(&buf,
 							 _("tuples: %lld removed, %lld remain, %lld are dead but not yet removable\n"),
-							 (long long) vacrel->tuples_deleted,
+							 (long long) vacrel->counters->tuples_deleted,
 							 (long long) vacrel->new_rel_tuples,
-							 (long long) vacrel->recently_dead_tuples);
-			if (vacrel->missed_dead_tuples > 0)
+							 (long long) vacrel->counters->recently_dead_tuples);
+			if (vacrel->counters->missed_dead_tuples > 0)
 				appendStringInfo(&buf,
 								 _("tuples missed: %lld dead from %u pages not removed due to cleanup lock contention\n"),
-								 (long long) vacrel->missed_dead_tuples,
-								 vacrel->missed_dead_pages);
+								 (long long) vacrel->counters->missed_dead_tuples,
+								 vacrel->counters->missed_dead_pages);
 			diff = (int32) (ReadNextTransactionId() -
 							vacrel->cutoffs.OldestXmin);
 			appendStringInfo(&buf,
@@ -673,25 +768,25 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 							 vacrel->cutoffs.OldestXmin, diff);
 			if (frozenxid_updated)
 			{
-				diff = (int32) (vacrel->NewRelfrozenXid -
+				diff = (int32) (vacrel->counters->NewRelfrozenXid -
 								vacrel->cutoffs.relfrozenxid);
 				appendStringInfo(&buf,
 								 _("new relfrozenxid: %u, which is %d XIDs ahead of previous value\n"),
-								 vacrel->NewRelfrozenXid, diff);
+								 vacrel->counters->NewRelfrozenXid, diff);
 			}
 			if (minmulti_updated)
 			{
-				diff = (int32) (vacrel->NewRelminMxid -
+				diff = (int32) (vacrel->counters->NewRelminMxid -
 								vacrel->cutoffs.relminmxid);
 				appendStringInfo(&buf,
 								 _("new relminmxid: %u, which is %d MXIDs ahead of previous value\n"),
-								 vacrel->NewRelminMxid, diff);
+								 vacrel->counters->NewRelminMxid, diff);
 			}
 			appendStringInfo(&buf, _("frozen: %u pages from table (%.2f%% of total) had %lld tuples frozen\n"),
-							 vacrel->frozen_pages,
+							 vacrel->counters->frozen_pages,
 							 orig_rel_pages == 0 ? 100.0 :
-							 100.0 * vacrel->frozen_pages / orig_rel_pages,
-							 (long long) vacrel->tuples_frozen);
+							 100.0 * vacrel->counters->frozen_pages / orig_rel_pages,
+							 (long long) vacrel->counters->tuples_frozen);
 			if (vacrel->do_index_vacuuming)
 			{
 				if (vacrel->nindexes == 0 || vacrel->num_index_scans == 0)
@@ -711,10 +806,10 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 				msgfmt = _("%u pages from table (%.2f%% of total) have %lld dead item identifiers\n");
 			}
 			appendStringInfo(&buf, msgfmt,
-							 vacrel->lpdead_item_pages,
+							 vacrel->counters->lpdead_item_pages,
 							 orig_rel_pages == 0 ? 100.0 :
-							 100.0 * vacrel->lpdead_item_pages / orig_rel_pages,
-							 (long long) vacrel->lpdead_items);
+							 100.0 * vacrel->counters->lpdead_item_pages / orig_rel_pages,
+							 (long long) vacrel->counters->lpdead_items);
 			for (int i = 0; i < vacrel->nindexes; i++)
 			{
 				IndexBulkDeleteResult *istat = vacrel->indstats[i];
@@ -815,14 +910,8 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 static void
 lazy_scan_heap(LVRelState *vacrel)
 {
-	BlockNumber rel_pages = vacrel->rel_pages,
-				blkno,
-				next_fsm_block_to_vacuum = 0;
-	bool		all_visible_according_to_vm;
-
-	TidStore   *dead_items = vacrel->dead_items;
+	BlockNumber rel_pages = vacrel->rel_pages;
 	VacDeadItemsInfo *dead_items_info = vacrel->dead_items_info;
-	Buffer		vmbuffer = InvalidBuffer;
 	const int	initprog_index[] = {
 		PROGRESS_VACUUM_PHASE,
 		PROGRESS_VACUUM_TOTAL_HEAP_BLKS,
@@ -842,14 +931,78 @@ lazy_scan_heap(LVRelState *vacrel)
 	vacrel->next_unskippable_allvis = false;
 	vacrel->next_unskippable_vmbuffer = InvalidBuffer;
 
+	if (ParallelHeapVacuumIsActive(vacrel))
+		do_parallel_lazy_scan_heap(vacrel);
+	else
+		do_lazy_scan_heap(vacrel);
+
+	vacrel->blkno = InvalidBlockNumber;
+
+	/* report that everything is now scanned */
+	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, rel_pages);
+
+	/* now we can compute the new value for pg_class.reltuples */
+	vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages,
+													 vacrel->counters->scanned_pages,
+													 vacrel->counters->live_tuples);
+
+	/*
+	 * Also compute the total number of surviving heap entries.  In the
+	 * (unlikely) scenario that new_live_tuples is -1, take it as zero.
+	 */
+	vacrel->new_rel_tuples =
+		Max(vacrel->new_live_tuples, 0) + vacrel->counters->recently_dead_tuples +
+		vacrel->counters->missed_dead_tuples;
+
+	/*
+	 * Do index vacuuming (call each index's ambulkdelete routine), then do
+	 * related heap vacuuming
+	 */
+	if (dead_items_info->num_items > 0)
+		lazy_vacuum(vacrel);
+
+	/*
+	 * Vacuum the remainder of the Free Space Map.  We must do this whether or
+	 * not there were indexes, and whether or not we bypassed index vacuuming.
+	 */
+	if (rel_pages > vacrel->next_fsm_block_to_vacuum)
+		FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum,
+								rel_pages);
+
+	/* report all blocks vacuumed */
+	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, rel_pages);
+
+	/* Do final index cleanup (call each index's amvacuumcleanup routine) */
+	if (vacrel->nindexes > 0 && vacrel->do_index_cleanup)
+		lazy_cleanup_all_indexes(vacrel);
+}
+
+/*
+ * Workhorse for lazy_scan_heap().
+ *
+ * Return true if we processed all blocks, otherwise false if we exit from this function
+ * while not completing the heap scan due to full of dead item TIDs. In serial heap scan
+ * case, this function always returns true. In parallel heap vacuum scan, this function
+ * is called by both worker processes and the leader process, and could return false.
+ */
+static bool
+do_lazy_scan_heap(LVRelState *vacrel)
+{
+	bool	all_visible_according_to_vm;
+	TidStore   *dead_items = vacrel->dead_items;
+	VacDeadItemsInfo *dead_items_info = vacrel->dead_items_info;
+	BlockNumber	blkno;
+	Buffer	vmbuffer = InvalidBuffer;
+	bool	scan_done = true;
+
 	while (heap_vac_scan_next_block(vacrel, &blkno, &all_visible_according_to_vm))
 	{
-		Buffer		buf;
-		Page		page;
-		bool		has_lpdead_items;
-		bool		got_cleanup_lock = false;
+		Buffer	buf;
+		Page	page;
+		bool	has_lpdead_items;
+		bool	got_cleanup_lock = false;
 
-		vacrel->scanned_pages++;
+		vacrel->counters->scanned_pages++;
 
 		/* Report as block scanned, update error traceback information */
 		pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
@@ -867,46 +1020,10 @@ lazy_scan_heap(LVRelState *vacrel)
 		 * one-pass strategy, and the two-pass strategy with the index_cleanup
 		 * param set to 'off'.
 		 */
-		if (vacrel->scanned_pages % FAILSAFE_EVERY_PAGES == 0)
+		if (!IsParallelWorker() &&
+			vacrel->counters->scanned_pages % FAILSAFE_EVERY_PAGES == 0)
 			lazy_check_wraparound_failsafe(vacrel);
 
-		/*
-		 * Consider if we definitely have enough space to process TIDs on page
-		 * already.  If we are close to overrunning the available space for
-		 * dead_items TIDs, pause and do a cycle of vacuuming before we tackle
-		 * this page.
-		 */
-		if (TidStoreMemoryUsage(dead_items) > dead_items_info->max_bytes)
-		{
-			/*
-			 * Before beginning index vacuuming, we release any pin we may
-			 * hold on the visibility map page.  This isn't necessary for
-			 * correctness, but we do it anyway to avoid holding the pin
-			 * across a lengthy, unrelated operation.
-			 */
-			if (BufferIsValid(vmbuffer))
-			{
-				ReleaseBuffer(vmbuffer);
-				vmbuffer = InvalidBuffer;
-			}
-
-			/* Perform a round of index and heap vacuuming */
-			vacrel->consider_bypass_optimization = false;
-			lazy_vacuum(vacrel);
-
-			/*
-			 * Vacuum the Free Space Map to make newly-freed space visible on
-			 * upper-level FSM pages.  Note we have not yet processed blkno.
-			 */
-			FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum,
-									blkno);
-			next_fsm_block_to_vacuum = blkno;
-
-			/* Report that we are once again scanning the heap */
-			pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
-										 PROGRESS_VACUUM_PHASE_SCAN_HEAP);
-		}
-
 		/*
 		 * Pin the visibility map page in case we need to mark the page
 		 * all-visible.  In most cases this will be very cheap, because we'll
@@ -994,10 +1111,14 @@ lazy_scan_heap(LVRelState *vacrel)
 		 * also be no opportunity to update the FSM later, because we'll never
 		 * revisit this page. Since updating the FSM is desirable but not
 		 * absolutely required, that's OK.
+		 *
+		 * XXX: in parallel heap scan, some blocks before blkno might not
+		 * been processed yet. Is it worth vacuuming FSM?
 		 */
-		if (vacrel->nindexes == 0
-			|| !vacrel->do_index_vacuuming
-			|| !has_lpdead_items)
+		if (!IsParallelWorker() &&
+			(vacrel->nindexes == 0
+			 || !vacrel->do_index_vacuuming
+			 || !has_lpdead_items))
 		{
 			Size		freespace = PageGetHeapFreeSpace(page);
 
@@ -1011,57 +1132,144 @@ lazy_scan_heap(LVRelState *vacrel)
 			 * held the cleanup lock and lazy_scan_prune() was called.
 			 */
 			if (got_cleanup_lock && vacrel->nindexes == 0 && has_lpdead_items &&
-				blkno - next_fsm_block_to_vacuum >= VACUUM_FSM_EVERY_PAGES)
+				blkno - vacrel->next_fsm_block_to_vacuum >= VACUUM_FSM_EVERY_PAGES)
 			{
-				FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum,
+				FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum,
 										blkno);
-				next_fsm_block_to_vacuum = blkno;
+				vacrel->next_fsm_block_to_vacuum = blkno;
 			}
 		}
 		else
 			UnlockReleaseBuffer(buf);
+
+		/*
+		 * Consider if we definitely have enough space to process TIDs on page
+		 * already.  If we are close to overrunning the available space for
+		 * dead_items TIDs, pause and do a cycle of vacuuming before we tackle
+		 * this page.
+		 */
+		if (TidStoreMemoryUsage(dead_items) > dead_items_info->max_bytes)
+		{
+			/*
+			 * Before beginning index vacuuming, we release any pin we may
+			 * hold on the visibility map page.  This isn't necessary for
+			 * correctness, but we do it anyway to avoid holding the pin
+			 * across a lengthy, unrelated operation.
+			 */
+			if (BufferIsValid(vmbuffer))
+			{
+				ReleaseBuffer(vmbuffer);
+				vmbuffer = InvalidBuffer;
+			}
+
+			if (ParallelHeapVacuumIsActive(vacrel))
+			{
+				/*
+				 * In parallel heap vacuum case, both the leader process and the
+				 * worker processes have to exit without invoking index and heap
+				 * vacuuming. The leader process will wait for all workers to
+				 * finish and perform index and heap vacuuming.
+				 */
+				scan_done = false;
+				break;
+			}
+
+			/* Perform a round of index and heap vacuuming */
+			vacrel->consider_bypass_optimization = false;
+			lazy_vacuum(vacrel);
+
+			/*
+			 * Vacuum the Free Space Map to make newly-freed space visible on
+			 * upper-level FSM pages.
+			 *
+			 * XXX: in parallel heap scan, some blocks before blkno might not
+			 * been processed yet. Is it worth vacuuming FSM?
+			 */
+			FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum,
+									blkno + 1);
+			vacrel->next_fsm_block_to_vacuum = blkno;
+
+			/* Report that we are once again scanning the heap */
+			pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
+										 PROGRESS_VACUUM_PHASE_SCAN_HEAP);
+
+			continue;
+		}
 	}
 
-	vacrel->blkno = InvalidBlockNumber;
 	if (BufferIsValid(vmbuffer))
 		ReleaseBuffer(vmbuffer);
 
-	/* report that everything is now scanned */
-	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
+	return scan_done;
+}
 
-	/* now we can compute the new value for pg_class.reltuples */
-	vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages,
-													 vacrel->scanned_pages,
-													 vacrel->live_tuples);
+/*
+ * A parallel scan variant of heap_vac_scan_next_block.
+ *
+ * In parallel vacuum scan, we don't use the SKIP_PAGES_THRESHOLD optimization.
+ */
+static bool
+heap_vac_scan_next_block_parallel(LVRelState *vacrel, BlockNumber *blkno,
+								  bool *all_visible_according_to_vm)
+{
+	PHVState	*phvstate = vacrel->phvstate;
+	BlockNumber next_block;
+	Buffer vmbuffer = InvalidBuffer;
+	uint8	mapbits = 0;
 
-	/*
-	 * Also compute the total number of surviving heap entries.  In the
-	 * (unlikely) scenario that new_live_tuples is -1, take it as zero.
-	 */
-	vacrel->new_rel_tuples =
-		Max(vacrel->new_live_tuples, 0) + vacrel->recently_dead_tuples +
-		vacrel->missed_dead_tuples;
+	Assert(ParallelHeapVacuumIsActive(vacrel));
 
-	/*
-	 * Do index vacuuming (call each index's ambulkdelete routine), then do
-	 * related heap vacuuming
-	 */
-	if (dead_items_info->num_items > 0)
-		lazy_vacuum(vacrel);
+	for (;;)
+	{
+		next_block = table_block_parallelscan_nextpage(vacrel->rel,
+													   &(phvstate->myscanstate->state),
+													   phvstate->pscandesc);
 
-	/*
-	 * Vacuum the remainder of the Free Space Map.  We must do this whether or
-	 * not there were indexes, and whether or not we bypassed index vacuuming.
-	 */
-	if (blkno > next_fsm_block_to_vacuum)
-		FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, blkno);
+		/* Have we reached the end of the table? */
+		if (!BlockNumberIsValid(next_block) || next_block >= vacrel->rel_pages)
+		{
+			if (BufferIsValid(vmbuffer))
+				ReleaseBuffer(vmbuffer);
 
-	/* report all blocks vacuumed */
-	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno);
+			*blkno = vacrel->rel_pages;
+			return false;
+		}
 
-	/* Do final index cleanup (call each index's amvacuumcleanup routine) */
-	if (vacrel->nindexes > 0 && vacrel->do_index_cleanup)
-		lazy_cleanup_all_indexes(vacrel);
+		/* We always treat the last block as unsafe to skip */
+		if (next_block == vacrel->rel_pages - 1)
+			break;
+
+		mapbits = visibilitymap_get_status(vacrel->rel, next_block, &vmbuffer);
+
+		/* DISABLE_PAGE_SKIPPING makes all skipping unsafe */
+		if (!vacrel->skipwithvm)
+			break;
+
+		/*
+		 * Aggressive VACUUM caller can't skip pages just because they are
+		 * all-visible.
+		 */
+		if ((mapbits & VISIBILITYMAP_ALL_FROZEN) == 0)
+		{
+
+			if (vacrel->aggressive)
+				break;
+
+			/*
+			 * All-visible block is safe to skip in non-aggressive case. But
+			 * remember that the final range contains such a block for later.
+			 */
+			vacrel->counters->skippedallvis = true;
+		}
+	}
+
+	if (BufferIsValid(vmbuffer))
+		ReleaseBuffer(vmbuffer);
+
+	*blkno = next_block;
+	*all_visible_according_to_vm = (mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0;
+
+	return true;
 }
 
 /*
@@ -1088,6 +1296,9 @@ heap_vac_scan_next_block(LVRelState *vacrel, BlockNumber *blkno,
 {
 	BlockNumber next_block;
 
+	if (ParallelHeapVacuumIsActive(vacrel))
+		return heap_vac_scan_next_block_parallel(vacrel, blkno, all_visible_according_to_vm);
+
 	/* relies on InvalidBlockNumber + 1 overflowing to 0 on first call */
 	next_block = vacrel->current_block + 1;
 
@@ -1137,7 +1348,7 @@ heap_vac_scan_next_block(LVRelState *vacrel, BlockNumber *blkno,
 		{
 			next_block = vacrel->next_unskippable_block;
 			if (skipsallvis)
-				vacrel->skippedallvis = true;
+				vacrel->counters->skippedallvis = true;
 		}
 	}
 
@@ -1210,7 +1421,7 @@ find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis)
 
 		/*
 		 * Caller must scan the last page to determine whether it has tuples
-		 * (caller must have the opportunity to set vacrel->nonempty_pages).
+		 * (caller must have the opportunity to set vacrel->counters->nonempty_pages).
 		 * This rule avoids having lazy_truncate_heap() take access-exclusive
 		 * lock on rel to attempt a truncation that fails anyway, just because
 		 * there are tuples on the last page (it is likely that there will be
@@ -1439,10 +1650,10 @@ lazy_scan_prune(LVRelState *vacrel,
 	heap_page_prune_and_freeze(rel, buf, vacrel->vistest, prune_options,
 							   &vacrel->cutoffs, &presult, PRUNE_VACUUM_SCAN,
 							   &vacrel->offnum,
-							   &vacrel->NewRelfrozenXid, &vacrel->NewRelminMxid);
+							   &vacrel->counters->NewRelfrozenXid, &vacrel->counters->NewRelminMxid);
 
 	Assert(MultiXactIdIsValid(vacrel->NewRelminMxid));
-	Assert(TransactionIdIsValid(vacrel->NewRelfrozenXid));
+	Assert(TransactionIdIsValid(vacrel->counters->NewRelfrozenXid));
 
 	if (presult.nfrozen > 0)
 	{
@@ -1451,7 +1662,7 @@ lazy_scan_prune(LVRelState *vacrel,
 		 * nfrozen == 0, since it only counts pages with newly frozen tuples
 		 * (don't confuse that with pages newly set all-frozen in VM).
 		 */
-		vacrel->frozen_pages++;
+		vacrel->counters->frozen_pages++;
 	}
 
 	/*
@@ -1486,7 +1697,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	 */
 	if (presult.lpdead_items > 0)
 	{
-		vacrel->lpdead_item_pages++;
+		vacrel->counters->lpdead_item_pages++;
 
 		/*
 		 * deadoffsets are collected incrementally in
@@ -1501,15 +1712,15 @@ lazy_scan_prune(LVRelState *vacrel,
 	}
 
 	/* Finally, add page-local counts to whole-VACUUM counts */
-	vacrel->tuples_deleted += presult.ndeleted;
-	vacrel->tuples_frozen += presult.nfrozen;
-	vacrel->lpdead_items += presult.lpdead_items;
-	vacrel->live_tuples += presult.live_tuples;
-	vacrel->recently_dead_tuples += presult.recently_dead_tuples;
+	vacrel->counters->tuples_deleted += presult.ndeleted;
+	vacrel->counters->tuples_frozen += presult.nfrozen;
+	vacrel->counters->lpdead_items += presult.lpdead_items;
+	vacrel->counters->live_tuples += presult.live_tuples;
+	vacrel->counters->recently_dead_tuples += presult.recently_dead_tuples;
 
 	/* Can't truncate this page */
 	if (presult.hastup)
-		vacrel->nonempty_pages = blkno + 1;
+		vacrel->counters->nonempty_pages = blkno + 1;
 
 	/* Did we find LP_DEAD items? */
 	*has_lpdead_items = (presult.lpdead_items > 0);
@@ -1659,8 +1870,8 @@ lazy_scan_noprune(LVRelState *vacrel,
 				missed_dead_tuples;
 	bool		hastup;
 	HeapTupleHeader tupleheader;
-	TransactionId NoFreezePageRelfrozenXid = vacrel->NewRelfrozenXid;
-	MultiXactId NoFreezePageRelminMxid = vacrel->NewRelminMxid;
+	TransactionId NoFreezePageRelfrozenXid = vacrel->counters->NewRelfrozenXid;
+	MultiXactId NoFreezePageRelminMxid = vacrel->counters->NewRelminMxid;
 	OffsetNumber deadoffsets[MaxHeapTuplesPerPage];
 
 	Assert(BufferGetBlockNumber(buf) == blkno);
@@ -1787,8 +1998,8 @@ lazy_scan_noprune(LVRelState *vacrel,
 	 * this particular page until the next VACUUM.  Remember its details now.
 	 * (lazy_scan_prune expects a clean slate, so we have to do this last.)
 	 */
-	vacrel->NewRelfrozenXid = NoFreezePageRelfrozenXid;
-	vacrel->NewRelminMxid = NoFreezePageRelminMxid;
+	vacrel->counters->NewRelfrozenXid = NoFreezePageRelfrozenXid;
+	vacrel->counters->NewRelminMxid = NoFreezePageRelminMxid;
 
 	/* Save any LP_DEAD items found on the page in dead_items */
 	if (vacrel->nindexes == 0)
@@ -1815,25 +2026,25 @@ lazy_scan_noprune(LVRelState *vacrel,
 		 * indexes will be deleted during index vacuuming (and then marked
 		 * LP_UNUSED in the heap)
 		 */
-		vacrel->lpdead_item_pages++;
+		vacrel->counters->lpdead_item_pages++;
 
 		dead_items_add(vacrel, blkno, deadoffsets, lpdead_items);
 
-		vacrel->lpdead_items += lpdead_items;
+		vacrel->counters->lpdead_items += lpdead_items;
 	}
 
 	/*
 	 * Finally, add relevant page-local counts to whole-VACUUM counts
 	 */
-	vacrel->live_tuples += live_tuples;
-	vacrel->recently_dead_tuples += recently_dead_tuples;
-	vacrel->missed_dead_tuples += missed_dead_tuples;
+	vacrel->counters->live_tuples += live_tuples;
+	vacrel->counters->recently_dead_tuples += recently_dead_tuples;
+	vacrel->counters->missed_dead_tuples += missed_dead_tuples;
 	if (missed_dead_tuples > 0)
-		vacrel->missed_dead_pages++;
+		vacrel->counters->missed_dead_pages++;
 
 	/* Can't truncate this page */
 	if (hastup)
-		vacrel->nonempty_pages = blkno + 1;
+		vacrel->counters->nonempty_pages = blkno + 1;
 
 	/* Did we find LP_DEAD items? */
 	*has_lpdead_items = (lpdead_items > 0);
@@ -1862,7 +2073,7 @@ lazy_vacuum(LVRelState *vacrel)
 
 	/* Should not end up here with no indexes */
 	Assert(vacrel->nindexes > 0);
-	Assert(vacrel->lpdead_item_pages > 0);
+	Assert(vacrel->counters->lpdead_item_pages > 0);
 
 	if (!vacrel->do_index_vacuuming)
 	{
@@ -1896,7 +2107,7 @@ lazy_vacuum(LVRelState *vacrel)
 		BlockNumber threshold;
 
 		Assert(vacrel->num_index_scans == 0);
-		Assert(vacrel->lpdead_items == vacrel->dead_items_info->num_items);
+		Assert(vacrel->counters->lpdead_items == vacrel->dead_items_info->num_items);
 		Assert(vacrel->do_index_vacuuming);
 		Assert(vacrel->do_index_cleanup);
 
@@ -1923,7 +2134,7 @@ lazy_vacuum(LVRelState *vacrel)
 		 * cases then this may need to be reconsidered.
 		 */
 		threshold = (double) vacrel->rel_pages * BYPASS_THRESHOLD_PAGES;
-		bypass = (vacrel->lpdead_item_pages < threshold &&
+		bypass = (vacrel->counters->lpdead_item_pages < threshold &&
 				  (TidStoreMemoryUsage(vacrel->dead_items) < (32L * 1024L * 1024L)));
 	}
 
@@ -2061,7 +2272,7 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 	 * place).
 	 */
 	Assert(vacrel->num_index_scans > 0 ||
-		   vacrel->dead_items_info->num_items == vacrel->lpdead_items);
+		   vacrel->dead_items_info->num_items == vacrel->counters->lpdead_items);
 	Assert(allindexes || VacuumFailsafeActive);
 
 	/*
@@ -2165,8 +2376,8 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 	 * the second heap pass.  No more, no less.
 	 */
 	Assert(vacrel->num_index_scans > 1 ||
-		   (vacrel->dead_items_info->num_items == vacrel->lpdead_items &&
-			vacuumed_pages == vacrel->lpdead_item_pages));
+		   (vacrel->dead_items_info->num_items == vacrel->counters->lpdead_items &&
+			vacuumed_pages == vacrel->counters->lpdead_item_pages));
 
 	ereport(DEBUG2,
 			(errmsg("table \"%s\": removed %lld dead item identifiers in %u pages",
@@ -2347,7 +2558,7 @@ static void
 lazy_cleanup_all_indexes(LVRelState *vacrel)
 {
 	double		reltuples = vacrel->new_rel_tuples;
-	bool		estimated_count = vacrel->scanned_pages < vacrel->rel_pages;
+	bool		estimated_count = vacrel->counters->scanned_pages < vacrel->rel_pages;
 	const int	progress_start_index[] = {
 		PROGRESS_VACUUM_PHASE,
 		PROGRESS_VACUUM_INDEXES_TOTAL
@@ -2528,7 +2739,7 @@ should_attempt_truncation(LVRelState *vacrel)
 	if (!vacrel->do_rel_truncate || VacuumFailsafeActive)
 		return false;
 
-	possibly_freeable = vacrel->rel_pages - vacrel->nonempty_pages;
+	possibly_freeable = vacrel->rel_pages - vacrel->counters->nonempty_pages;
 	if (possibly_freeable > 0 &&
 		(possibly_freeable >= REL_TRUNCATE_MINIMUM ||
 		 possibly_freeable >= vacrel->rel_pages / REL_TRUNCATE_FRACTION))
@@ -2554,7 +2765,7 @@ lazy_truncate_heap(LVRelState *vacrel)
 
 	/* Update error traceback information one last time */
 	update_vacuum_error_info(vacrel, NULL, VACUUM_ERRCB_PHASE_TRUNCATE,
-							 vacrel->nonempty_pages, InvalidOffsetNumber);
+							 vacrel->counters->nonempty_pages, InvalidOffsetNumber);
 
 	/*
 	 * Loop until no more truncating can be done.
@@ -2655,7 +2866,7 @@ lazy_truncate_heap(LVRelState *vacrel)
 		 * without also touching reltuples, since the tuple count wasn't
 		 * changed by the truncation.
 		 */
-		vacrel->removed_pages += orig_rel_pages - new_rel_pages;
+		vacrel->counters->removed_pages += orig_rel_pages - new_rel_pages;
 		vacrel->rel_pages = new_rel_pages;
 
 		ereport(vacrel->verbose ? INFO : DEBUG2,
@@ -2663,7 +2874,7 @@ lazy_truncate_heap(LVRelState *vacrel)
 						vacrel->relname,
 						orig_rel_pages, new_rel_pages)));
 		orig_rel_pages = new_rel_pages;
-	} while (new_rel_pages > vacrel->nonempty_pages && lock_waiter_detected);
+	} while (new_rel_pages > vacrel->counters->nonempty_pages && lock_waiter_detected);
 }
 
 /*
@@ -2691,7 +2902,7 @@ count_nondeletable_pages(LVRelState *vacrel, bool *lock_waiter_detected)
 	StaticAssertStmt((PREFETCH_SIZE & (PREFETCH_SIZE - 1)) == 0,
 					 "prefetch size must be power of 2");
 	prefetchedUntil = InvalidBlockNumber;
-	while (blkno > vacrel->nonempty_pages)
+	while (blkno > vacrel->counters->nonempty_pages)
 	{
 		Buffer		buf;
 		Page		page;
@@ -2803,7 +3014,7 @@ count_nondeletable_pages(LVRelState *vacrel, bool *lock_waiter_detected)
 	 * pages still are; we need not bother to look at the last known-nonempty
 	 * page.
 	 */
-	return vacrel->nonempty_pages;
+	return vacrel->counters->nonempty_pages;
 }
 
 /*
@@ -2821,12 +3032,8 @@ dead_items_alloc(LVRelState *vacrel, int nworkers)
 		autovacuum_work_mem != -1 ?
 		autovacuum_work_mem : maintenance_work_mem;
 
-	/*
-	 * Initialize state for a parallel vacuum.  As of now, only one worker can
-	 * be used for an index, so we invoke parallelism only if there are at
-	 * least two indexes on a table.
-	 */
-	if (nworkers >= 0 && vacrel->nindexes > 1 && vacrel->do_index_vacuuming)
+	/* Initialize state for a parallel vacuum */
+	if (nworkers >= 0)
 	{
 		/*
 		 * Since parallel workers cannot access data in temporary tables, we
@@ -2844,11 +3051,18 @@ dead_items_alloc(LVRelState *vacrel, int nworkers)
 								vacrel->relname)));
 		}
 		else
+		{
+			/*
+			 * For parallel index vacuuming, only one worker can be used for an
+			 * index, we invoke parallelism only if there are at least two indexes
+			 * on a table.
+			 */
 			vacrel->pvs = parallel_vacuum_init(vacrel->rel, vacrel->indrels,
 											   vacrel->nindexes, nworkers,
 											   vac_work_mem,
 											   vacrel->verbose ? INFO : DEBUG2,
-											   vacrel->bstrategy);
+											   vacrel->bstrategy, (void *) vacrel);
+		}
 
 		/*
 		 * If parallel mode started, dead_items and dead_items_info spaces are
@@ -2889,9 +3103,19 @@ dead_items_add(LVRelState *vacrel, BlockNumber blkno, OffsetNumber *offsets,
 	};
 	int64		prog_val[2];
 
+	/*
+	 * Protect both dead_items and dead_items_info from concurrent updates
+	 * in parallel heap scan cases.
+	 */
+	if (ParallelHeapVacuumIsActive(vacrel))
+		TidStoreLockExclusive(dead_items);
+
 	TidStoreSetBlockOffsets(dead_items, blkno, offsets, num_offsets);
 	vacrel->dead_items_info->num_items += num_offsets;
 
+	if (ParallelHeapVacuumIsActive(vacrel))
+		TidStoreUnlock(dead_items);
+
 	/* update the progress information */
 	prog_val[0] = vacrel->dead_items_info->num_items;
 	prog_val[1] = TidStoreMemoryUsage(dead_items);
@@ -3093,6 +3317,359 @@ update_relstats_all_indexes(LVRelState *vacrel)
 	}
 }
 
+/*
+ * Compute the number of parallel workers for parallel vacuum heap scan.
+ *
+ * The calculation logic is borrowed from compute_parallel_worker().
+ */
+int
+heap_parallel_vacuum_compute_workers(Relation rel, int nrequested)
+{
+	int parallel_workers = 0;
+	int heap_parallel_threshold;
+	int heap_pages;
+
+	if (nrequested == 0)
+	{
+		/*
+		 * Select the number of workers based on the log of the size of
+		 * the relation.  This probably needs to be a good deal more
+		 * sophisticated, but we need something here for now.  Note that
+		 * the upper limit of the min_parallel_table_scan_size GUC is
+		 * chosen to prevent overflow here.
+		 */
+		heap_parallel_threshold = Max(min_parallel_table_scan_size, 1);
+		heap_pages = RelationGetNumberOfBlocks(rel);
+		while (heap_pages >= (BlockNumber) (heap_parallel_threshold * 3))
+		{
+			parallel_workers++;
+			heap_parallel_threshold *= 3;
+			if (heap_parallel_threshold > INT_MAX / 3)
+				break;
+		}
+	}
+	else
+		parallel_workers = nrequested;
+
+	return parallel_workers;
+}
+
+/*
+ * Compute the amount of space we'll need in the parallel heap vacuum
+ * DSM, and inform pcxt->estimator about our needs.
+ *
+ * nworkers is the number of workers for the table vacuum. Note that it could
+ * be different than pcxt->nworkers since it is the maximum of number of
+ * workers for table vacuum and index vacuum.
+ */
+void
+heap_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt,
+							  int nworkers, void *state)
+{
+	Size	size = 0;
+	LVRelState *vacrel = (LVRelState *) state;
+
+	/* space for PHVShared */
+	size = add_size(size, SizeOfPHVShared);
+	size = add_size(size, mul_size(sizeof(LVRelCounters), nworkers));
+	vacrel->shared_len = size;
+	shm_toc_estimate_chunk(&pcxt->estimator, size);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* space for ParallelBlockTableScanDesc */
+	vacrel->pscan_len = table_block_parallelscan_estimate(rel);
+	shm_toc_estimate_chunk(&pcxt->estimator, vacrel->pscan_len);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* space for per-worker scan state, PHVScanWorkerState */
+	vacrel->pscanwork_len = mul_size(sizeof(PHVScanWorkerState), nworkers);
+	shm_toc_estimate_chunk(&pcxt->estimator, vacrel->pscanwork_len);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/*
+ * Set up shared memory for parallel heap vacuum.
+ */
+void
+heap_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt,
+								int nworkers, void *state)
+{
+	LVRelState *vacrel = (LVRelState *) state;
+	ParallelBlockTableScanDesc pscan;
+	PHVScanWorkerState *pscanwork;
+	PHVShared 	*shared;
+	PHVState	*phvstate;
+
+	phvstate = (PHVState *) palloc(sizeof(PHVState));
+
+	shared = shm_toc_allocate(pcxt->toc, vacrel->shared_len);
+
+	/* Prepare the shared information */
+
+	MemSet(shared, 0, vacrel->shared_len);
+	shared->aggressive = vacrel->aggressive;
+	shared->skipwithvm = vacrel->skipwithvm;
+	shared->cutoffs = vacrel->cutoffs;
+	shared->NewRelfrozenXid = vacrel->counters->NewRelfrozenXid;
+	shared->NewRelminMxid = vacrel->counters->NewRelminMxid;
+	shared->skippedallvis = vacrel->counters->skippedallvis;
+
+	/*
+	 * XXX: we copy the contents of vistest to the shared area, but in order to do
+	 * that, we need to either expose GlobalVisTest or to provide functions to copy
+	 * contents of GlobalVisTest to somewhere. Currently we do the former but not
+	 * sure it's the best choice.
+	 *
+	 * Alternative idea is to have each worker determine cutoff and have their own
+	 * vistest. But we need to carefully consider it since parallel workers end up
+	 * having different cutoff and horizon.
+	 */
+	shared->vistest = *vacrel->vistest;
+
+	shm_toc_insert(pcxt->toc, LV_PARALLEL_SCAN_SHARED, shared);
+
+	phvstate->shared = shared;
+
+	/* prepare the  parallel block table scan description */
+	pscan = shm_toc_allocate(pcxt->toc, vacrel->pscan_len);
+	shm_toc_insert(pcxt->toc, LV_PARALLEL_SCAN_DESC, pscan);
+
+	/* initialize parallel scan description */
+	table_block_parallelscan_initialize(rel, (ParallelTableScanDesc) pscan);
+	phvstate->pscandesc = pscan;
+
+	/* prepare the workers' parallel block table scan state */
+	pscanwork = shm_toc_allocate(pcxt->toc, vacrel->pscanwork_len);
+	MemSet(pscanwork, 0, vacrel->pscanwork_len);
+	shm_toc_insert(pcxt->toc, LV_PARALLEL_SCAN_DESC_WORKER, pscanwork);
+	phvstate->scanstates = pscanwork;
+
+	vacrel->phvstate = phvstate;
+}
+
+/*
+ * Main function for parallel heap vacuum workers.
+ */
+void
+heap_parallel_vacuum_scan_worker(Relation rel, ParallelVacuumState *pvs,
+								 ParallelWorkerContext *pwcxt)
+{
+	LVRelState	vacrel = {0};
+	PHVState	*phvstate;
+	PHVShared	*shared;
+	ParallelBlockTableScanDesc pscandesc;
+	PHVScanWorkerState	*scanstate;
+	LVRelCounters *counters;
+	bool		scan_done;
+
+	phvstate = palloc(sizeof(PHVState));
+
+	pscandesc = (ParallelBlockTableScanDesc) shm_toc_lookup(pwcxt->toc,
+														   LV_PARALLEL_SCAN_DESC,
+														   false);
+	phvstate->pscandesc = pscandesc;
+
+	shared = (PHVShared *) shm_toc_lookup(pwcxt->toc, LV_PARALLEL_SCAN_SHARED,
+										  false);
+	phvstate->shared = shared;
+
+	scanstate = (PHVScanWorkerState *) shm_toc_lookup(pwcxt->toc,
+													  LV_PARALLEL_SCAN_DESC_WORKER,
+													  false);
+
+	phvstate->myscanstate = &(scanstate[ParallelWorkerNumber]);
+	counters = &(shared->worker_relcnts[ParallelWorkerNumber]);
+
+	/* Prepare LVRelState */
+	vacrel.rel = rel;
+	vacrel.indrels = parallel_vacuum_get_table_indexes(pvs, &vacrel.nindexes);
+	vacrel.pvs = pvs;
+	vacrel.phvstate = phvstate;
+	vacrel.aggressive = shared->aggressive;
+	vacrel.skipwithvm = shared->skipwithvm;
+	vacrel.cutoffs = shared->cutoffs;
+	vacrel.vistest = &(shared->vistest);
+	vacrel.dead_items = parallel_vacuum_get_dead_items(pvs,
+													   &vacrel.dead_items_info);
+	vacrel.rel_pages = RelationGetNumberOfBlocks(rel);
+	vacrel.counters = counters;
+
+	/* initialize per-worker relation statistics */
+	MemSet(counters, 0, sizeof(LVRelCounters));
+
+	vacrel.counters->NewRelfrozenXid = shared->NewRelfrozenXid;
+	vacrel.counters->NewRelminMxid = shared->NewRelminMxid;
+	vacrel.counters->skippedallvis = shared->skippedallvis;
+
+	/*
+	 * XXX: the following fields are not set yet:
+	 * - index vacuum related fields such as consider_bypass_optimization,
+	 * 	 do_index_vacuuming etc.
+	 * - error reporting state.
+	 * - statistics such as scanned_pages etc.
+	 * - oldest extant XID/MXID.
+	 * - states maintained by heap_vac_scan_next_block()
+	 */
+
+	/* Initialize the start block if not yet */
+	if (!phvstate->myscanstate->maybe_have_blocks)
+	{
+		table_block_parallelscan_startblock_init(rel,
+												 &(phvstate->myscanstate->state),
+												 phvstate->pscandesc);
+
+		phvstate->myscanstate->maybe_have_blocks = false;
+	}
+
+	/*
+	 * XXX: if we want to support parallel heap *vacuum*, we need to allow
+	 * workers to call different function based on the shared information.
+	 */
+	scan_done = do_lazy_scan_heap(&vacrel);
+
+	phvstate->myscanstate->maybe_have_blocks = !scan_done;
+}
+
+/*
+ * Complete parallel heaps scans that have remaining blocks in their
+ * chunks.
+ */
+static void
+parallel_heap_complete_unfinised_scan(LVRelState *vacrel)
+{
+	int nworkers;
+
+	Assert(!IsParallelWorker());
+
+	nworkers = parallel_vacuum_get_nworkers_table(vacrel->pvs);
+
+	for (int i = 0; i < nworkers; i++)
+	{
+		PHVScanWorkerState *wstate = &(vacrel->phvstate->scanstates[i]);
+		bool	scan_done PG_USED_FOR_ASSERTS_ONLY;
+
+		if (!wstate->maybe_have_blocks)
+			continue;
+
+		vacrel->phvstate->myscanstate = wstate;
+
+		scan_done = do_lazy_scan_heap(vacrel);
+
+		Assert(scan_done);
+	}
+}
+
+/*
+ * Accumulate relation counters that parallel workers collected into the
+ * leader's counters.
+ */
+static void
+parallel_heap_vacuum_gather_scan_stats(LVRelState *vacrel)
+{
+	PHVState *phvstate = vacrel->phvstate;
+
+	Assert(ParallelHeapVacuumIsActive(vacrel));
+
+	for (int i = 0; i < phvstate->nworkers_launched; i++)
+	{
+		LVRelCounters *counters = &(phvstate->shared->worker_relcnts[i]);
+
+#define LV_ACCUM_ITEM(item) (vacrel)->counters->item += (counters)->item
+
+		LV_ACCUM_ITEM(scanned_pages);
+		LV_ACCUM_ITEM(removed_pages);
+		LV_ACCUM_ITEM(frozen_pages);
+		LV_ACCUM_ITEM(lpdead_item_pages);
+		LV_ACCUM_ITEM(missed_dead_pages);
+		LV_ACCUM_ITEM(nonempty_pages);
+		LV_ACCUM_ITEM(tuples_deleted);
+		LV_ACCUM_ITEM(tuples_frozen);
+		LV_ACCUM_ITEM(lpdead_items);
+		LV_ACCUM_ITEM(live_tuples);
+		LV_ACCUM_ITEM(recently_dead_tuples);
+		LV_ACCUM_ITEM(missed_dead_tuples);
+
+#undef LV_ACCUM_ITEM
+
+		if (TransactionIdPrecedes(counters->NewRelfrozenXid, vacrel->counters->NewRelfrozenXid))
+			vacrel->counters->NewRelfrozenXid = counters->NewRelfrozenXid;
+
+		if (MultiXactIdPrecedesOrEquals(counters->NewRelminMxid, vacrel->counters->NewRelminMxid))
+			vacrel->counters->NewRelminMxid = counters->NewRelminMxid;
+
+		if (!vacrel->counters->skippedallvis && counters->skippedallvis)
+			vacrel->counters->skippedallvis = true;
+	}
+}
+
+/*
+ * A parallel variant of do_lazy_scan_heap(). The leader process launches parallel
+ * workers to scan the heap in parallel.
+ */
+static void
+do_parallel_lazy_scan_heap(LVRelState *vacrel)
+{
+	PHVScanWorkerState *scanstate;
+	TidStore   *dead_items = vacrel->dead_items;
+	VacDeadItemsInfo *dead_items_info = vacrel->dead_items_info;
+
+	Assert(ParallelHeapVacuumIsActive(vacrel));
+	Assert(!IsParallelWorker());
+
+	/* launcher workers */
+	vacrel->phvstate->nworkers_launched = parallel_vacuum_table_scan_begin(vacrel->pvs);
+
+	/* initialize parallel scan description to join as a worker */
+	scanstate = palloc(sizeof(PHVScanWorkerState));
+	table_block_parallelscan_startblock_init(vacrel->rel, &(scanstate->state),
+											 vacrel->phvstate->pscandesc);
+	vacrel->phvstate->myscanstate = scanstate;
+
+	for (;;)
+	{
+		bool scan_done PG_USED_FOR_ASSERTS_ONLY;
+
+		/*
+		 * Scan the table until either we are close to overrunning the available
+		 * space for dead_items TIDs or we reach the end of the table.
+		 */
+		scan_done = do_lazy_scan_heap(vacrel);
+
+		/* stop parallel workers and gather the collected stats */
+		parallel_vacuum_table_scan_end(vacrel->pvs);
+		parallel_heap_vacuum_gather_scan_stats(vacrel);
+
+		/*
+		 * Consider if we definitely have enough space to process TIDs on page
+		 * already.  If we are close to overrunning the available space for
+		 * dead_items TIDs, pause and do a cycle of vacuuming before we tackle
+		 * this page.
+		 */
+		if (TidStoreMemoryUsage(dead_items) > dead_items_info->max_bytes)
+		{
+			/* Perform a round of index and heap vacuuming */
+			vacrel->consider_bypass_optimization = false;
+			lazy_vacuum(vacrel);
+
+			/* Report that we are once again scanning the heap */
+			pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
+										 PROGRESS_VACUUM_PHASE_SCAN_HEAP);
+
+			/* re-launcher workers */
+			vacrel->phvstate->nworkers_launched =
+				parallel_vacuum_table_scan_begin(vacrel->pvs);
+
+			continue;
+		}
+
+		/* We reach the end of the table */
+		Assert(scan_done);
+		break;
+	}
+
+	parallel_heap_complete_unfinised_scan(vacrel);
+}
+
 /*
  * Error context callback for errors occurring during vacuum.  The error
  * context messages for index phases should match the messages set in parallel
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
index f26070bff2..968addf94f 100644
--- a/src/backend/commands/vacuumparallel.c
+++ b/src/backend/commands/vacuumparallel.c
@@ -28,6 +28,7 @@
 
 #include "access/amapi.h"
 #include "access/table.h"
+#include "access/tableam.h"
 #include "access/xact.h"
 #include "commands/progress.h"
 #include "commands/vacuum.h"
@@ -64,6 +65,12 @@ typedef struct PVShared
 	Oid			relid;
 	int			elevel;
 
+	/*
+	 * True if the caller wants parallel workers to invoke vacuum table scan
+	 * callback.
+	 */
+	bool		do_vacuum_table_scan;
+
 	/*
 	 * Fields for both index vacuum and cleanup.
 	 *
@@ -163,6 +170,9 @@ struct ParallelVacuumState
 	/* NULL for worker processes */
 	ParallelContext *pcxt;
 
+	/* Passed to parallel table scan workers. NULL for leader process */
+	ParallelWorkerContext *pwcxt;
+
 	/* Parent Heap Relation */
 	Relation	heaprel;
 
@@ -192,6 +202,16 @@ struct ParallelVacuumState
 	/* Points to WAL usage area in DSM */
 	WalUsage   *wal_usage;
 
+	/*
+	 * The number of workers for parallel table scan/vacuuming and index vacuuming,
+	 * respectively.
+	 */
+	int			nworkers_for_table;
+	int			nworkers_for_index;
+
+	/* How many parallel table vacuum scan is called? */
+	int			num_table_scans;
+
 	/*
 	 * False if the index is totally unsuitable target for all parallel
 	 * processing. For example, the index could be <
@@ -220,8 +240,9 @@ struct ParallelVacuumState
 	PVIndVacStatus status;
 };
 
-static int	parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
-											bool *will_parallel_vacuum);
+static void	parallel_vacuum_compute_workers(Relation rel,  Relation *indrels, int nindexes,
+											int nrequested, int *nworkers_table,
+											int *nworkers_index, bool *will_parallel_vacuum);
 static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
 												bool vacuum);
 static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
@@ -241,7 +262,7 @@ static void parallel_vacuum_error_callback(void *arg);
 ParallelVacuumState *
 parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 					 int nrequested_workers, int vac_work_mem,
-					 int elevel, BufferAccessStrategy bstrategy)
+					 int elevel, BufferAccessStrategy bstrategy, void *state)
 {
 	ParallelVacuumState *pvs;
 	ParallelContext *pcxt;
@@ -255,6 +276,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	Size		est_shared_len;
 	int			nindexes_mwm = 0;
 	int			parallel_workers = 0;
+	int			nworkers_table;
+	int			nworkers_index;
 	int			querylen;
 
 	/*
@@ -262,15 +285,17 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	 * relation
 	 */
 	Assert(nrequested_workers >= 0);
-	Assert(nindexes > 0);
 
 	/*
 	 * Compute the number of parallel vacuum workers to launch
 	 */
 	will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
-	parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
-													   nrequested_workers,
-													   will_parallel_vacuum);
+	parallel_vacuum_compute_workers(rel, indrels, nindexes, nrequested_workers,
+									&nworkers_table, &nworkers_index,
+									will_parallel_vacuum);
+
+	parallel_workers = Max(nworkers_table, nworkers_index);
+
 	if (parallel_workers <= 0)
 	{
 		/* Can't perform vacuum in parallel -- return NULL */
@@ -284,6 +309,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	pvs->will_parallel_vacuum = will_parallel_vacuum;
 	pvs->bstrategy = bstrategy;
 	pvs->heaprel = rel;
+	pvs->nworkers_for_table = nworkers_table;
+	pvs->nworkers_for_index = nworkers_index;
 
 	EnterParallelMode();
 	pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
@@ -326,6 +353,10 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	else
 		querylen = 0;			/* keep compiler quiet */
 
+	/* Estimate AM-specific space for parallel table vacuum */
+	if (nworkers_table > 0)
+		table_parallel_vacuum_estimate(rel, pcxt, nworkers_table, state);
+
 	InitializeParallelDSM(pcxt);
 
 	/* Prepare index vacuum stats */
@@ -417,6 +448,10 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 					   PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
 	}
 
+	/* Prepare AM-specific DSM for parallel table vacuum */
+	if (nworkers_table > 0)
+		table_parallel_vacuum_initialize(rel, pcxt, nworkers_table, state);
+
 	/* Success -- return parallel vacuum state */
 	return pvs;
 }
@@ -538,27 +573,41 @@ parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tup
  * min_parallel_index_scan_size as invoking workers for very small indexes
  * can hurt performance.
  *
+ * XXX needs to mention about the number of workers for table.
+ *
  * nrequested is the number of parallel workers that user requested.  If
  * nrequested is 0, we compute the parallel degree based on nindexes, that is
  * the number of indexes that support parallel vacuum.  This function also
  * sets will_parallel_vacuum to remember indexes that participate in parallel
  * vacuum.
  */
-static int
-parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
-								bool *will_parallel_vacuum)
+static void
+parallel_vacuum_compute_workers(Relation rel, Relation *indrels, int nindexes,
+								int nrequested,	int *nworkers_table,
+								int *nworkers_index, bool *will_parallel_vacuum)
 {
 	int			nindexes_parallel = 0;
 	int			nindexes_parallel_bulkdel = 0;
 	int			nindexes_parallel_cleanup = 0;
-	int			parallel_workers;
+	int			parallel_workers_table = 0;
+	int			parallel_workers_index = 0;
+
+	*nworkers_table = 0;
+	*nworkers_index = 0;
 
 	/*
 	 * We don't allow performing parallel operation in standalone backend or
 	 * when parallelism is disabled.
 	 */
 	if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
-		return 0;
+		return;
+
+	/*
+	 * Compute the number of workers for parallel table scan. Cap by
+	 * max_parallel_maintenance_workers.
+	 */
+	parallel_workers_table = Min(table_paralle_vacuum_compute_workers(rel, nrequested),
+								 max_parallel_maintenance_workers);
 
 	/*
 	 * Compute the number of indexes that can participate in parallel vacuum.
@@ -589,17 +638,18 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
 	nindexes_parallel--;
 
 	/* No index supports parallel vacuum */
-	if (nindexes_parallel <= 0)
-		return 0;
-
-	/* Compute the parallel degree */
-	parallel_workers = (nrequested > 0) ?
-		Min(nrequested, nindexes_parallel) : nindexes_parallel;
+	if (nindexes_parallel > 0)
+	{
+		/* Compute the parallel degree for parallel index vacuum */
+		parallel_workers_index = (nrequested > 0) ?
+			Min(nrequested, nindexes_parallel) : nindexes_parallel;
 
-	/* Cap by max_parallel_maintenance_workers */
-	parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
+		/* Cap by max_parallel_maintenance_workers */
+		parallel_workers_index = Min(parallel_workers_index, max_parallel_maintenance_workers);
+	}
 
-	return parallel_workers;
+	*nworkers_table = parallel_workers_table;
+	*nworkers_index = parallel_workers_index;
 }
 
 /*
@@ -669,7 +719,7 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
 	if (nworkers > 0)
 	{
 		/* Reinitialize parallel context to relaunch parallel workers */
-		if (num_index_scans > 0)
+		if (num_index_scans > 0 || pvs->num_table_scans > 0)
 			ReinitializeParallelDSM(pvs->pcxt);
 
 		/*
@@ -978,6 +1028,120 @@ parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
 	return true;
 }
 
+/*
+ * A parallel worker invokes table-AM specified vacuum scan callback.
+ */
+static void
+parallel_vacuum_process_table(ParallelVacuumState *pvs)
+{
+	/*
+	 * Increment the active worker count if we are able to launch any worker.
+	 */
+	if (VacuumActiveNWorkers)
+		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
+
+	/* Do table vacuum scan */
+	table_parallel_vacuum_scan(pvs->heaprel, pvs, pvs->pwcxt);
+
+	/*
+	 * We have completed the table vacuum so decrement the active worker
+	 * count.
+	 */
+	if (VacuumActiveNWorkers)
+		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
+}
+
+/*
+ * Prepare DSM and vacuum delay, and launch parallel workers for parallel
+ * table vacuum scan.
+ */
+int
+parallel_vacuum_table_scan_begin(ParallelVacuumState *pvs)
+{
+	Assert(!IsParallelWorker());
+
+	if (pvs->nworkers_for_table == 0)
+		return 0;
+
+	pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
+	pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
+
+	pvs->shared->do_vacuum_table_scan = true;
+
+	if (pvs->num_table_scans > 0)
+		ReinitializeParallelDSM(pvs->pcxt);
+
+	ReinitializeParallelWorkers(pvs->pcxt, pvs->nworkers_for_table);
+
+	LaunchParallelWorkers(pvs->pcxt);
+
+	if (pvs->pcxt->nworkers_launched > 0)
+	{
+		/*
+		 * Reset the local cost values for leader backend as we have
+		 * already accumulated the remaining balance of heap.
+		 */
+		VacuumCostBalance = 0;
+		VacuumCostBalanceLocal = 0;
+
+		/* Enable shared cost balance for leader backend */
+		VacuumSharedCostBalance = &(pvs->shared->cost_balance);
+		VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
+	}
+
+	ereport(pvs->shared->elevel,
+			(errmsg(ngettext("launched %d parallel vacuum worker for table scanning (planned: %d)",
+							 "launched %d parallel vacuum workers for table scanning (planned: %d)",
+							 pvs->pcxt->nworkers_launched),
+					pvs->pcxt->nworkers_launched, pvs->nworkers_for_table)));
+
+	return pvs->pcxt->nworkers_launched;
+}
+
+/*
+ * Wait for all workers for parallel table vacuum scan, and gather statistics.
+ */
+void
+parallel_vacuum_table_scan_end(ParallelVacuumState *pvs)
+{
+	Assert(!IsParallelWorker());
+
+	if (pvs->nworkers_for_table == 0)
+		return;
+
+	WaitForParallelWorkersToFinish(pvs->pcxt);
+
+	for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
+		InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
+
+	/*
+	 * Carry the shared balance value to heap scan and disable shared costing
+	 */
+	if (VacuumSharedCostBalance)
+	{
+		VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
+		VacuumSharedCostBalance = NULL;
+		VacuumActiveNWorkers = NULL;
+	}
+
+	pvs->shared->do_vacuum_table_scan = false;
+	pvs->num_table_scans++;
+}
+
+Relation *
+parallel_vacuum_get_table_indexes(ParallelVacuumState *pvs, int *nindexes)
+{
+	*nindexes = pvs->nindexes;
+
+	return pvs->indrels;
+}
+
+int
+parallel_vacuum_get_nworkers_table(ParallelVacuumState *pvs)
+{
+	return pvs->nworkers_for_table;
+}
+
 /*
  * Perform work within a launched parallel process.
  *
@@ -1026,7 +1190,6 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	 * matched to the leader's one.
 	 */
 	vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
-	Assert(nindexes > 0);
 
 	if (shared->maintenance_work_mem_worker > 0)
 		maintenance_work_mem = shared->maintenance_work_mem_worker;
@@ -1060,6 +1223,10 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	pvs.relname = pstrdup(RelationGetRelationName(rel));
 	pvs.heaprel = rel;
 
+	pvs.pwcxt = palloc(sizeof(ParallelWorkerContext));
+	pvs.pwcxt->toc = toc;
+	pvs.pwcxt->seg = seg;
+
 	/* These fields will be filled during index vacuum or cleanup */
 	pvs.indname = NULL;
 	pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
@@ -1077,8 +1244,15 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	/* Prepare to track buffer usage during parallel execution */
 	InstrStartParallelQuery();
 
-	/* Process indexes to perform vacuum/cleanup */
-	parallel_vacuum_process_safe_indexes(&pvs);
+	if (pvs.shared->do_vacuum_table_scan)
+	{
+		parallel_vacuum_process_table(&pvs);
+	}
+	else
+	{
+		/* Process indexes to perform vacuum/cleanup */
+		parallel_vacuum_process_safe_indexes(&pvs);
+	}
 
 	/* Report buffer/WAL usage during parallel execution */
 	buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index d5165aa0d9..37035cc186 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -164,15 +164,6 @@ typedef struct ProcArrayStruct
  *
  * The typedef is in the header.
  */
-struct GlobalVisState
-{
-	/* XIDs >= are considered running by some backend */
-	FullTransactionId definitely_needed;
-
-	/* XIDs < are not considered to be running by any backend */
-	FullTransactionId maybe_needed;
-};
-
 /*
  * Result of ComputeXidHorizons().
  */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 9e9aec88a6..6c5e48e478 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -20,6 +20,7 @@
 #include "access/skey.h"
 #include "access/table.h"		/* for backward compatibility */
 #include "access/tableam.h"
+#include "commands/vacuum.h"
 #include "nodes/lockoptions.h"
 #include "nodes/primnodes.h"
 #include "storage/bufpage.h"
@@ -393,6 +394,13 @@ extern void log_heap_prune_and_freeze(Relation relation, Buffer buffer,
 struct VacuumParams;
 extern void heap_vacuum_rel(Relation rel,
 							struct VacuumParams *params, BufferAccessStrategy bstrategy);
+extern int heap_parallel_vacuum_compute_workers(Relation rel, int requested);
+extern void heap_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt,
+										  int nworkers, void *state);
+extern void heap_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt,
+											int nworkers, void *state);
+extern void heap_parallel_vacuum_scan_worker(Relation rel, ParallelVacuumState *pvs,
+											 ParallelWorkerContext *pwcxt);
 
 /* in heap/heapam_visibility.c */
 extern bool HeapTupleSatisfiesVisibility(HeapTuple htup, Snapshot snapshot,
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 8e583b45cd..b10b047ca1 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -20,6 +20,7 @@
 #include "access/relscan.h"
 #include "access/sdir.h"
 #include "access/xact.h"
+#include "commands/vacuum.h"
 #include "executor/tuptable.h"
 #include "storage/read_stream.h"
 #include "utils/rel.h"
@@ -655,6 +656,46 @@ typedef struct TableAmRoutine
 									struct VacuumParams *params,
 									BufferAccessStrategy bstrategy);
 
+	/* ------------------------------------------------------------------------
+	 * Callbacks for parallel table vacuum.
+	 * ------------------------------------------------------------------------
+	 */
+
+	/*
+	 * Compute the number of parallel workers for parallel table vacuum.
+	 * The function must return 0 to disable parallel table vacuum.
+	 */
+	int			(*parallel_vacuum_compute_workers) (Relation rel, int requested);
+
+	/*
+	 * Compute the amount of DSM space AM need in the parallel table vacuum.
+	 *
+	 * Not called if parallel table vacuum is disabled.
+	 */
+	void		(*parallel_vacuum_estimate) (Relation rel,
+											 ParallelContext *pcxt,
+											 int nworkers,
+											 void *state);
+
+	/*
+	 * Initialize DSM space for parallel table vacuum.
+	 *
+	 * Not called if parallel table vacuum is disabled.
+	 */
+	void		(*parallel_vacuum_initialize) (Relation rel,
+											   ParallelContext *pctx,
+											   int nworkers,
+											   void *state);
+
+	/*
+	 * This callback is called for parallel table vacuum workers.
+	 *
+	 * Not called if parallel table vacuum is disabled.
+	 */
+	void		(*parallel_vacuum_scan_worker) (Relation rel,
+												ParallelVacuumState *pvs,
+												ParallelWorkerContext *pwcxt);
+
 	/*
 	 * Prepare to analyze the next block in the read stream.  Returns false if
 	 * the stream is exhausted and true otherwise. The scan must have been
@@ -1720,6 +1761,33 @@ table_relation_vacuum(Relation rel, struct VacuumParams *params,
 	rel->rd_tableam->relation_vacuum(rel, params, bstrategy);
 }
 
+static inline int
+table_paralle_vacuum_compute_workers(Relation rel, int requested)
+{
+	return rel->rd_tableam->parallel_vacuum_compute_workers(rel, requested);
+}
+
+static inline void
+table_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt, int nworkers,
+							   void *state)
+{
+	rel->rd_tableam->parallel_vacuum_estimate(rel, pcxt, nworkers, state);
+}
+
+static inline void
+table_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt, int nworkers,
+								 void *state)
+{
+	rel->rd_tableam->parallel_vacuum_initialize(rel, pcxt, nworkers, state);
+}
+
+static inline void
+table_parallel_vacuum_scan(Relation rel, ParallelVacuumState *pvs,
+						   ParallelWorkerContext *pwcxt)
+{
+	rel->rd_tableam->parallel_vacuum_scan_worker(rel, pvs, pwcxt);
+}
+
 /*
  * Prepare to analyze the next block in the read stream. The scan needs to
  * have been  started with table_beginscan_analyze().  Note that this routine
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 759f9a87d3..598bb5218f 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -360,7 +360,8 @@ extern void VacuumUpdateCosts(void);
 extern ParallelVacuumState *parallel_vacuum_init(Relation rel, Relation *indrels,
 												 int nindexes, int nrequested_workers,
 												 int vac_work_mem, int elevel,
-												 BufferAccessStrategy bstrategy);
+												 BufferAccessStrategy bstrategy,
+												 void *state);
 extern void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats);
 extern TidStore *parallel_vacuum_get_dead_items(ParallelVacuumState *pvs,
 												VacDeadItemsInfo **dead_items_info_p);
@@ -372,6 +373,10 @@ extern void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs,
 												long num_table_tuples,
 												int num_index_scans,
 												bool estimated_count);
+extern int parallel_vacuum_table_scan_begin(ParallelVacuumState *pvs);
+extern void parallel_vacuum_table_scan_end(ParallelVacuumState *pvs);
+extern int parallel_vacuum_get_nworkers_table(ParallelVacuumState *pvs);
+extern Relation *parallel_vacuum_get_table_indexes(ParallelVacuumState *pvs, int *nindexes);
 extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc);
 
 /* in commands/analyze.c */
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index 9398a84051..6ccb19a29f 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -102,8 +102,20 @@ extern char *ExportSnapshot(Snapshot snapshot);
 /*
  * These live in procarray.c because they're intimately linked to the
  * procarray contents, but thematically they better fit into snapmgr.h.
+ *
+ * XXX the struct definition is temporarily moved from procarray.c for
+ * parallel table vacuum development. We need to find a suitable way for
+ * parallel table vacuum workers to share the GlobalVisState.
  */
-typedef struct GlobalVisState GlobalVisState;
+typedef struct GlobalVisState
+{
+	/* XIDs >= are considered running by some backend */
+	FullTransactionId definitely_needed;
+
+	/* XIDs < are not considered to be running by any backend */
+	FullTransactionId maybe_needed;
+} GlobalVisState;
+
 extern GlobalVisState *GlobalVisTestFor(Relation rel);
 extern bool GlobalVisTestIsRemovableXid(GlobalVisState *state, TransactionId xid);
 extern bool GlobalVisTestIsRemovableFullXid(GlobalVisState *state, FullTransactionId fxid);

Reply via email to