diff --git a/doc/src/sgml/ref/vacuum.sgml b/doc/src/sgml/ref/vacuum.sgml
index f18180a..8f1dc7b 100644
--- a/doc/src/sgml/ref/vacuum.sgml
+++ b/doc/src/sgml/ref/vacuum.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-VACUUM [ ( { FULL | FREEZE | VERBOSE | ANALYZE | DISABLE_PAGE_SKIPPING } [, ...] ) ] [ <replaceable class="PARAMETER">table_name</replaceable> [ (<replaceable class="PARAMETER">column_name</replaceable> [, ...] ) ] ]
+VACUUM [ ( { FULL | FREEZE | VERBOSE | ANALYZE | PARALLEL <replaceable class="PARAMETER">N</replaceable> | DISABLE_PAGE_SKIPPING } [, ...] ) ] [ <replaceable class="PARAMETER">table_name</replaceable> [ (<replaceable class="PARAMETER">column_name</replaceable> [, ...] ) ] ]
 VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ <replaceable class="PARAMETER">table_name</replaceable> ]
 VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] ANALYZE [ <replaceable class="PARAMETER">table_name</replaceable> [ (<replaceable class="PARAMETER">column_name</replaceable> [, ...] ) ] ]
 </synopsis>
@@ -130,6 +130,20 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] ANALYZE [ <replaceable class="PARAMETER">
    </varlistentry>
 
    <varlistentry>
+    <term><literal>PARALLEL <replaceable class="PARAMETER">N</replaceable></literal></term>
+    <listitem>
+     <para>
+      Execute <command>VACUUM</command> in parallel by <replaceable class="PARAMETER">N
+      </replaceable> background workers. Collecting garbage on table is processed
+      in block-level parallel. For tables with indexes, parallel vacuum assigns each
+      index to each parallel vacuum worker and all garbages on a index are processed
+      by particular parallel vacuum worker. This option can not use with <literal>FULL</>
+      option.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
     <term><literal>DISABLE_PAGE_SKIPPING</literal></term>
     <listitem>
      <para>
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 1ce42ea..ff3f8d8 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -88,7 +88,6 @@ static HeapScanDesc heap_beginscan_internal(Relation relation,
 						bool is_bitmapscan,
 						bool is_samplescan,
 						bool temp_snap);
-static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
 static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
 					TransactionId xid, CommandId cid, int options);
 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
@@ -1670,7 +1669,7 @@ heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
  *		first backend gets an InvalidBlockNumber return.
  * ----------------
  */
-static BlockNumber
+BlockNumber
 heap_parallelscan_nextpage(HeapScanDesc scan)
 {
 	BlockNumber page = InvalidBlockNumber;
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 0f72c1c..bfcb77a 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -71,7 +71,7 @@ static void vac_truncate_clog(TransactionId frozenXID,
 				  MultiXactId minMulti,
 				  TransactionId lastSaneFrozenXid,
 				  MultiXactId lastSaneMinMulti);
-static bool vacuum_rel(Oid relid, RangeVar *relation, int options,
+static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumOptions options,
 		   VacuumParams *params);
 
 /*
@@ -86,17 +86,17 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
 	VacuumParams params;
 
 	/* sanity checks on options */
-	Assert(vacstmt->options & (VACOPT_VACUUM | VACOPT_ANALYZE));
-	Assert((vacstmt->options & VACOPT_VACUUM) ||
-		   !(vacstmt->options & (VACOPT_FULL | VACOPT_FREEZE)));
-	Assert((vacstmt->options & VACOPT_ANALYZE) || vacstmt->va_cols == NIL);
-	Assert(!(vacstmt->options & VACOPT_SKIPTOAST));
+	Assert(vacstmt->options.flags & (VACOPT_VACUUM | VACOPT_ANALYZE));
+	Assert((vacstmt->options.flags & VACOPT_VACUUM) ||
+		   !(vacstmt->options.flags & (VACOPT_FULL | VACOPT_FREEZE)));
+	Assert((vacstmt->options.flags & VACOPT_ANALYZE) || vacstmt->va_cols == NIL);
+	Assert(!(vacstmt->options.flags & VACOPT_SKIPTOAST));
 
 	/*
 	 * All freeze ages are zero if the FREEZE option is given; otherwise pass
 	 * them as -1 which means to use the default values.
 	 */
-	if (vacstmt->options & VACOPT_FREEZE)
+	if (vacstmt->options.flags & VACOPT_FREEZE)
 	{
 		params.freeze_min_age = 0;
 		params.freeze_table_age = 0;
@@ -145,7 +145,7 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
  * memory context that will not disappear at transaction commit.
  */
 void
-vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
+vacuum(VacuumOptions options, RangeVar *relation, Oid relid, VacuumParams *params,
 	   List *va_cols, BufferAccessStrategy bstrategy, bool isTopLevel)
 {
 	const char *stmttype;
@@ -156,7 +156,7 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
 
 	Assert(params != NULL);
 
-	stmttype = (options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
+	stmttype = (options.flags & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
 
 	/*
 	 * We cannot run VACUUM inside a user transaction block; if we were inside
@@ -166,7 +166,7 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
 	 *
 	 * ANALYZE (without VACUUM) can run either way.
 	 */
-	if (options & VACOPT_VACUUM)
+	if (options.flags & VACOPT_VACUUM)
 	{
 		PreventTransactionChain(isTopLevel, stmttype);
 		in_outer_xact = false;
@@ -188,17 +188,26 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
 	/*
 	 * Sanity check DISABLE_PAGE_SKIPPING option.
 	 */
-	if ((options & VACOPT_FULL) != 0 &&
-		(options & VACOPT_DISABLE_PAGE_SKIPPING) != 0)
+	if ((options.flags & VACOPT_FULL) != 0 &&
+		(options.flags & VACOPT_DISABLE_PAGE_SKIPPING) != 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("VACUUM option DISABLE_PAGE_SKIPPING cannot be used with FULL")));
 
 	/*
+	 * Sanity check PARALLEL option.
+	 */
+	if ((options.flags & VACOPT_FULL) != 0 &&
+		(options.flags & VACOPT_PARALLEL) != 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("VACUUM option PARALLEL cannnot be used with FULL")));
+
+	/*
 	 * Send info about dead objects to the statistics collector, unless we are
 	 * in autovacuum --- autovacuum.c does this for itself.
 	 */
-	if ((options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
+	if ((options.flags & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
 		pgstat_vacuum_stat();
 
 	/*
@@ -244,11 +253,11 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
 	 * transaction block, and also in an autovacuum worker, use own
 	 * transactions so we can release locks sooner.
 	 */
-	if (options & VACOPT_VACUUM)
+	if (options.flags & VACOPT_VACUUM)
 		use_own_xacts = true;
 	else
 	{
-		Assert(options & VACOPT_ANALYZE);
+		Assert(options.flags & VACOPT_ANALYZE);
 		if (IsAutoVacuumWorkerProcess())
 			use_own_xacts = true;
 		else if (in_outer_xact)
@@ -298,13 +307,13 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
 		{
 			Oid			relid = lfirst_oid(cur);
 
-			if (options & VACOPT_VACUUM)
+			if (options.flags & VACOPT_VACUUM)
 			{
 				if (!vacuum_rel(relid, relation, options, params))
 					continue;
 			}
 
-			if (options & VACOPT_ANALYZE)
+			if (options.flags & VACOPT_ANALYZE)
 			{
 				/*
 				 * If using separate xacts, start one for analyze. Otherwise,
@@ -317,7 +326,7 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
 					PushActiveSnapshot(GetTransactionSnapshot());
 				}
 
-				analyze_rel(relid, relation, options, params,
+				analyze_rel(relid, relation, options.flags, params,
 							va_cols, in_outer_xact, vac_strategy);
 
 				if (use_own_xacts)
@@ -353,7 +362,7 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
 		StartTransactionCommand();
 	}
 
-	if ((options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
+	if ((options.flags & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
 	{
 		/*
 		 * Update pg_database.datfrozenxid, and truncate pg_clog if possible.
@@ -1183,7 +1192,7 @@ vac_truncate_clog(TransactionId frozenXID,
  *		At entry and exit, we are not inside a transaction.
  */
 static bool
-vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
+vacuum_rel(Oid relid, RangeVar *relation, VacuumOptions options, VacuumParams *params)
 {
 	LOCKMODE	lmode;
 	Relation	onerel;
@@ -1204,7 +1213,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
 	 */
 	PushActiveSnapshot(GetTransactionSnapshot());
 
-	if (!(options & VACOPT_FULL))
+	if (!(options.flags & VACOPT_FULL))
 	{
 		/*
 		 * In lazy vacuum, we can set the PROC_IN_VACUUM flag, which lets
@@ -1244,7 +1253,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
 	 * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
 	 * way, we can be sure that no other backend is vacuuming the same table.
 	 */
-	lmode = (options & VACOPT_FULL) ? AccessExclusiveLock : ShareUpdateExclusiveLock;
+	lmode = (options.flags & VACOPT_FULL) ? AccessExclusiveLock : ShareUpdateExclusiveLock;
 
 	/*
 	 * Open the relation and get the appropriate lock on it.
@@ -1255,7 +1264,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
 	 * If we've been asked not to wait for the relation lock, acquire it first
 	 * in non-blocking mode, before calling try_relation_open().
 	 */
-	if (!(options & VACOPT_NOWAIT))
+	if (!(options.flags & VACOPT_NOWAIT))
 		onerel = try_relation_open(relid, lmode);
 	else if (ConditionalLockRelationOid(relid, lmode))
 		onerel = try_relation_open(relid, NoLock);
@@ -1359,7 +1368,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
 	 * us to process it.  In VACUUM FULL, though, the toast table is
 	 * automatically rebuilt by cluster_rel so we shouldn't recurse to it.
 	 */
-	if (!(options & VACOPT_SKIPTOAST) && !(options & VACOPT_FULL))
+	if (!(options.flags & VACOPT_SKIPTOAST) && !(options.flags & VACOPT_FULL))
 		toast_relid = onerel->rd_rel->reltoastrelid;
 	else
 		toast_relid = InvalidOid;
@@ -1378,7 +1387,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
 	/*
 	 * Do the actual work --- either FULL or "lazy" vacuum
 	 */
-	if (options & VACOPT_FULL)
+	if (options.flags & VACOPT_FULL)
 	{
 		/* close relation before vacuuming, but hold lock until commit */
 		relation_close(onerel, NoLock);
@@ -1386,7 +1395,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
 
 		/* VACUUM FULL is now a variant of CLUSTER; see cluster.c */
 		cluster_rel(relid, InvalidOid, false,
-					(options & VACOPT_VERBOSE) != 0);
+					(options.flags & VACOPT_VERBOSE) != 0);
 	}
 	else
 		lazy_vacuum_rel(onerel, options, params, vac_strategy);
@@ -1440,8 +1449,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
  * hit dangling index pointers.
  */
 void
-vac_open_indexes(Relation relation, LOCKMODE lockmode,
-				 int *nindexes, Relation **Irel)
+vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel)
 {
 	List	   *indexoidlist;
 	ListCell   *indexoidscan;
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index a2999b3..b5e6eed 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -23,6 +23,22 @@
  * of index scans performed.  So we don't use maintenance_work_mem memory for
  * the TID array, just enough to hold as many heap tuples as fit on one page.
  *
+ * In PostgreSQL 10, we support parallel option for lazy vacuum. In parallel
+ * lazy vacuum the multiple vacuum worker processes get page number in parallel
+ * using parallel heap scan and process it.  The memory spaces for the array
+ * of dead tuple TIDs of each worker are allocated in dynamic shared memory in
+ * advance by launcher process.  The vacuum workers has its vacuum state and
+ * round.  Since the vacuum state is the cyclical state the round value indicates
+ * how many laps vacuum worker did so far.  Vacuum worker increments its
+ * round after finished the reclaim phase.  For tables with indexes, each index
+ * on table is assigned to each vacuum worker.  That is, the number of indexes
+ * assigned could be different between vacuum workers.  Because the dead tuple
+ * TIDs need to be shared with all vacuum workers in order to reclaim index
+ * garbage and be cleared after all vacuum workers finished the reclaim phase,
+ * vacuum worker synchronizes other vacuum workers at two points where before
+ * reclaim phase begins and where after reclaim phase finished.  After all of
+ * vacuum workers finished to work, the launcher process gathers the lazy vacuum
+ * statistics and update them.
  *
  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -42,8 +58,11 @@
 #include "access/heapam_xlog.h"
 #include "access/htup_details.h"
 #include "access/multixact.h"
+#include "access/parallel.h"
+#include "access/relscan.h"
 #include "access/transam.h"
 #include "access/visibilitymap.h"
+#include "access/xact.h"
 #include "access/xlog.h"
 #include "catalog/catalog.h"
 #include "catalog/storage.h"
@@ -55,6 +74,7 @@
 #include "portability/instr_time.h"
 #include "postmaster/autovacuum.h"
 #include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
 #include "storage/freespace.h"
 #include "storage/lmgr.h"
 #include "utils/lsyscache.h"
@@ -98,10 +118,72 @@
  */
 #define SKIP_PAGES_THRESHOLD	((BlockNumber) 32)
 
+/* DSM key for parallel lazy vacuum */
+#define VACUUM_KEY_PARALLEL_SCAN	50
+#define VACUUM_KEY_VACUUM_STATS		51
+#define VACUUM_KEY_INDEX_STATS		52
+#define VACUUM_KEY_DEAD_TUPLES		53
+#define VACUUM_KEY_VACUUM_TASK		54
+#define VACUUM_KEY_PARALLEL_STATE	55
+
+/*
+ * see note of lazy_scan_heap_get_nextpage about forcing scanning of
+ * last page
+ */
+#define FORCE_CHECK_PAGE(blk) \
+	(blkno == (blk - 1) && should_attempt_truncation(vacrelstats))
+
+/* Check if given index is assigned to this parallel vacuum worker */
+#define IsAssignedIndex(i_num, nworkers) \
+	(!IsParallelWorker() ||\
+	 (IsParallelWorker() && ((i_num) % (nworkers) == ParallelWorkerNumber)))
+
+/* Data structure for updating index relation statistics */
+typedef struct LVIndStats
+{
+	bool		do_update;	/* Launcher process will update? */
+	BlockNumber	rel_pages;	/* # of index pages */
+	BlockNumber rel_tuples;	/* # of index tuples */
+} LVIndStats;
+
+/* Vacuum worker state for parallel lazy vacuum */
+#define VACSTATE_STARTUP			0x01	/* startup state */
+#define VACSTATE_SCANNING			0x02	/* heap scan phase */
+#define VACSTATE_VACUUM_PREPARED	0x04	/* finished to scan heap */
+#define VACSTATE_VACUUMING			0x08	/* vacuuming on table and index */
+#define VACSTATE_VACUUM_FINISHED	0x10	/* finished to vacuum */
+#define VACSTATE_COMPLETE			0x20	/* complete to vacuum */
+
+/* Vacuum phase for parallel lazy vacuum */
+#define VACPHASE_SCAN\
+	(VACSTATE_SCANNING | VACSTATE_VACUUM_PREPARED)
+#define VACPHASE_VACUUM \
+	(VACSTATE_VACUUM_PREPARED | VACSTATE_VACUUMING | VACSTATE_VACUUM_FINISHED)
+
+typedef struct VacWorker
+{
+	uint8 state;	/* current state of vacuum worker */
+	uint32 round;	/* current laps */
+} VacWorker;
+
+typedef struct LVParallelState
+{
+	int nworkers;			/* # of parallel vacuum workers */
+	ConditionVariable cv;	/* condition variable for making synchronization points*/
+	slock_t	mutex;			/* mutex for vacworkers state */
+	VacWorker vacworkers[FLEXIBLE_ARRAY_MEMBER];
+	/* each vacuum workers state follows */
+} LVParallelState;
+
+typedef struct LVDeadTuple
+{
+	int n_dt; 				/* # of dead tuple */
+	ItemPointer dt_array;	/* NB: Each list is ordered by TID address */
+} LVDeadTuple;
+
 typedef struct LVRelStats
 {
-	/* hasindex = true means two-pass strategy; false means one-pass */
-	bool		hasindex;
+	int			nindexes; /* > 0 means two-pass strategy; = 0 means one-pass */
 	/* Overall statistics about rel */
 	BlockNumber old_rel_pages;	/* previous value of pg_class.relpages */
 	BlockNumber rel_pages;		/* total number of pages */
@@ -116,15 +198,42 @@ typedef struct LVRelStats
 	double		tuples_deleted;
 	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
 	/* List of TIDs of tuples we intend to delete */
-	/* NB: this list is ordered by TID address */
-	int			num_dead_tuples;	/* current # of entries */
+	LVDeadTuple *dead_tuples;
 	int			max_dead_tuples;	/* # slots allocated in array */
-	ItemPointer dead_tuples;	/* array of ItemPointerData */
 	int			num_index_scans;
 	TransactionId latestRemovedXid;
 	bool		lock_waiter_detected;
+	/* Fields for parallel lazy vacuum */
+	LVIndStats	*vacindstats;
+	LVParallelState *pstate;
 } LVRelStats;
 
+/*
+ * Scan description data for lazy vacuum. In parallel lazy vacuum,
+ * we use only heapscan instead.
+ */
+typedef struct LVScanDescData
+{
+	BlockNumber lv_cblock;					/* current scanning block number */
+	BlockNumber lv_next_unskippable_block;	/* next block number we cannot skip */
+	BlockNumber lv_nblocks;					/* the number blocks of relation */
+	HeapScanDesc heapscan;					/* field for parallel lazy vacuum */
+} LVScanDescData;
+typedef struct LVScanDescData *LVScanDesc;
+
+/*
+ * Vacuum relevant options and thresholds we need share with parallel
+ * vacuum workers.
+ */
+typedef struct VacuumTask
+{
+	int				options;	/* VACUUM optoins */
+	bool			aggressive;	/* does each worker need to aggressive vacuum? */
+	TransactionId	oldestxmin;
+	TransactionId	freezelimit;
+	MultiXactId		multixactcutoff;
+	int				elevel;
+} VacuumTask;
 
 /* A few variables that don't seem worth passing around as parameters */
 static int	elevel = -1;
@@ -135,11 +244,10 @@ static MultiXactId MultiXactCutoff;
 
 static BufferAccessStrategy vac_strategy;
 
+static LVDeadTuple *MyDeadTuple = NULL; /* pointer to my dead tuple space */
+static VacWorker *MyVacWorker = NULL;	/* pointer to my vacuum worker state */
 
 /* non-export function prototypes */
-static void lazy_scan_heap(Relation onerel, int options,
-			   LVRelStats *vacrelstats, Relation *Irel, int nindexes,
-			   bool aggressive);
 static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
 static bool lazy_check_needs_freeze(Buffer buf, bool *hastup);
 static void lazy_vacuum_index(Relation indrel,
@@ -147,21 +255,54 @@ static void lazy_vacuum_index(Relation indrel,
 				  LVRelStats *vacrelstats);
 static void lazy_cleanup_index(Relation indrel,
 				   IndexBulkDeleteResult *stats,
-				   LVRelStats *vacrelstats);
+				   LVRelStats *vacrelstats,
+				   LVIndStats *vacindstats);
 static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
-				 int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer);
+							int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer);
 static bool should_attempt_truncation(LVRelStats *vacrelstats);
 static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
 static BlockNumber count_nondeletable_pages(Relation onerel,
 						 LVRelStats *vacrelstats);
 static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
-static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
-					   ItemPointer itemptr);
+static void lazy_record_dead_tuple(LVRelStats *vacrelstats, ItemPointer itemptr);
 static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
 static int	vac_cmp_itemptr(const void *left, const void *right);
 static bool heap_page_is_all_visible(Relation rel, Buffer buf,
 					 TransactionId *visibility_cutoff_xid, bool *all_frozen);
-
+static void lazy_scan_heap(LVRelStats *vacrelstats, Relation onerel,
+						   Relation *Irels, int nindexes,ParallelHeapScanDesc pscan,
+						   int options, bool aggressive);
+
+/* function prototypes for parallel vacuum */
+static void parallel_lazy_scan_heap(Relation rel, LVRelStats *vacrelstats,
+									int options, bool aggressive, int wnum);
+static void lazy_vacuum_worker(dsm_segment *seg, shm_toc *toc);
+static void lazy_gather_vacuum_stats(ParallelContext *pxct,
+									 LVRelStats *valrelstats,
+									 LVIndStats *vacindstats);
+static void lazy_update_index_stats(Relation onerel, LVIndStats *vacindstats);
+static void lazy_estimate_dsm(ParallelContext *pcxt, long maxtuples, int nindexes);
+static void lazy_initialize_dsm(ParallelContext *pcxt, Relation onrel,
+								LVRelStats *vacrelstats, int options,
+								bool aggressive);
+static void lazy_initialize_worker(shm_toc *toc, ParallelHeapScanDesc *pscan,
+								   LVRelStats **vacrelstats, int *options,
+								   bool *aggressive);
+static void lazy_clear_dead_tuple(LVRelStats *vacrelstats);
+static LVScanDesc lv_beginscan(LVRelStats *vacrelstats, ParallelHeapScanDesc pscan,
+							   Relation onerel);
+static void lv_endscan(LVScanDesc lvscan);
+static BlockNumber lazy_scan_heap_get_nextpage(Relation onerel, LVRelStats* vacrelstats,
+											   LVScanDesc lvscan, bool *all_visible_according_to_vm,
+											   Buffer *vmbuffer, int options, bool aggressive);
+static void lazy_set_vacstate_and_wait_prepared(LVParallelState *pstate);
+static void lazy_set_vacstate_and_wait_finished(LVRelStats *vacrelstats);
+static bool lazy_check_vacstate_prepared(LVParallelState *pstate, uint32 round);
+static bool lazy_check_vacstate_finished(LVParallelState *pstate, uint32 round);
+static int  lazy_count_vacstate_finished(LVParallelState *pstate, uint32 round, int *n_complete);
+static uint32 lazy_set_my_vacstate(LVParallelState *pstate, uint8 state, bool nextloop,
+								 bool broadcast);
+static long lazy_get_max_dead_tuple(LVRelStats *vacrelstats);
 
 /*
  *	lazy_vacuum_rel() -- perform LAZY VACUUM for one heap relation
@@ -173,7 +314,7 @@ static bool heap_page_is_all_visible(Relation rel, Buffer buf,
  *		and locked the relation.
  */
 void
-lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
+lazy_vacuum_rel(Relation onerel, VacuumOptions options, VacuumParams *params,
 				BufferAccessStrategy bstrategy)
 {
 	LVRelStats *vacrelstats;
@@ -205,7 +346,7 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 		starttime = GetCurrentTimestamp();
 	}
 
-	if (options & VACOPT_VERBOSE)
+	if (options.flags & VACOPT_VERBOSE)
 		elevel = INFO;
 	else
 		elevel = DEBUG2;
@@ -233,7 +374,7 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 											   xidFullScanLimit);
 	aggressive |= MultiXactIdPrecedesOrEquals(onerel->rd_rel->relminmxid,
 											  mxactFullScanLimit);
-	if (options & VACOPT_DISABLE_PAGE_SKIPPING)
+	if (options.flags & VACOPT_DISABLE_PAGE_SKIPPING)
 		aggressive = true;
 
 	vacrelstats = (LVRelStats *) palloc0(sizeof(LVRelStats));
@@ -244,15 +385,26 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 	vacrelstats->pages_removed = 0;
 	vacrelstats->lock_waiter_detected = false;
 
-	/* Open all indexes of the relation */
-	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
-	vacrelstats->hasindex = (nindexes > 0);
+	if (options.nworkers > 1)
+	{
+		vacrelstats->nindexes = list_length(RelationGetIndexList(onerel));
 
-	/* Do the vacuuming */
-	lazy_scan_heap(onerel, options, vacrelstats, Irel, nindexes, aggressive);
+		/* Do the vacuuming in parallel */
+		parallel_lazy_scan_heap(onerel, vacrelstats, options.flags, aggressive,
+								options.nworkers);
+	}
+	else
+	{
+		vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
+		vacrelstats->nindexes = nindexes;
 
-	/* Done with indexes */
-	vac_close_indexes(nindexes, Irel, NoLock);
+		/* Do the vacuuming */
+		lazy_scan_heap(vacrelstats, onerel, Irel, nindexes, NULL,
+					   options.flags, aggressive);
+
+		/* Done with indexes */
+		vac_close_indexes(nindexes, Irel, RowExclusiveLock);
+	}
 
 	/*
 	 * Compute whether we actually scanned the all unfrozen pages. If we did,
@@ -319,7 +471,7 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 						new_rel_pages,
 						new_rel_tuples,
 						new_rel_allvisible,
-						vacrelstats->hasindex,
+						(vacrelstats->nindexes != 0),
 						new_frozen_xid,
 						new_min_multi,
 						false);
@@ -428,28 +580,121 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
 }
 
 /*
+ * Launch parallel vacuum workers specified by wnum and wait for all vacuum
+ * workers finish. Before launch vacuum worker we initialize dynamic shared memory
+ * and stores relevant data to it. After all workers finished we gather the vacuum
+ * statistics of all vacuum workers.
+ */
+static void
+parallel_lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
+						int options, bool aggressive, int wnum)
+{
+	ParallelContext	*pcxt;
+	long maxtuples;
+	LVIndStats *vacindstats;
+
+	vacindstats = (LVIndStats *) palloc(sizeof(LVIndStats) * vacrelstats->nindexes);
+
+	EnterParallelMode();
+
+	/* Create parallel context and initialize it */
+	pcxt = CreateParallelContext(lazy_vacuum_worker, wnum);
+
+	/* Estimate DSM size for parallel vacuum */
+	maxtuples = (int) lazy_get_max_dead_tuple(vacrelstats);
+	vacrelstats->max_dead_tuples = maxtuples;
+	lazy_estimate_dsm(pcxt, maxtuples, vacrelstats->nindexes);
+
+	/* Initialize DSM for parallel vacuum */
+	InitializeParallelDSM(pcxt);
+	lazy_initialize_dsm(pcxt, onerel, vacrelstats, options, aggressive);
+
+	/* Launch workers */
+	LaunchParallelWorkers(pcxt);
+
+	/* Wait for workers finished vacuum */
+	WaitForParallelWorkersToFinish(pcxt);
+
+	/* Gather the result of vacuum statistics from all workers */
+	lazy_gather_vacuum_stats(pcxt, vacrelstats, vacindstats);
+
+	/* Now we can compute the new value for pg_class.reltuples */
+	vacrelstats->new_rel_tuples = vac_estimate_reltuples(onerel, false,
+														 vacrelstats->rel_pages,
+														 vacrelstats->scanned_pages,
+														 vacrelstats->scanned_tuples);
+	DestroyParallelContext(pcxt);
+	ExitParallelMode();
+
+	/* After parallel mode, we can update index statistics */
+	lazy_update_index_stats(onerel, vacindstats);
+}
+
+/*
+ * Entry point of parallel vacuum worker.
+ */
+static void
+lazy_vacuum_worker(dsm_segment *seg, shm_toc *toc)
+{
+	ParallelHeapScanDesc pscan;
+	LVRelStats *vacrelstats;
+	int options;
+	bool aggressive;
+	Relation rel;
+	Relation *indrel;
+	int nindexes_worker;
+
+	/* Look up and initialize information and task */
+	lazy_initialize_worker(toc, &pscan, &vacrelstats, &options,
+							   &aggressive);
+
+	rel = relation_open(pscan->phs_relid, ShareUpdateExclusiveLock);
+
+	/* Open all indexes */
+	vac_open_indexes(rel, RowExclusiveLock, &nindexes_worker,
+					 &indrel);
+
+	/* Do lazy vacuum */
+	lazy_scan_heap(vacrelstats, rel, indrel, vacrelstats->nindexes,
+				   pscan, options, aggressive);
+
+	heap_close(rel, ShareUpdateExclusiveLock);
+	vac_close_indexes(vacrelstats->nindexes, indrel, RowExclusiveLock);
+}
+
+/*
  *	lazy_scan_heap() -- scan an open heap relation
  *
  *		This routine prunes each page in the heap, which will among other
  *		things truncate dead tuples to dead line pointers, defragment the
- *		page, and set commit status bits (see heap_page_prune).  It also builds
+ *		page, and set commit status bits (see heap_page_prune).  It also uses
  *		lists of dead tuples and pages with free space, calculates statistics
  *		on the number of live tuples in the heap, and marks pages as
  *		all-visible if appropriate.  When done, or when we run low on space for
- *		dead-tuple TIDs, invoke vacuuming of indexes and call lazy_vacuum_heap
- *		to reclaim dead line pointers.
+ *		dead-tuple TIDs, invoke vacuuming of assigned indexes and call lazy_vacuum_heap
+ *		to reclaim dead line pointers. In parallel vacuum, we need to synchronize
+ *		at where scanning heap finished and vacuuming heap finished. The vacuum
+ *		worker reached to that point first need to wait for other vacuum workers
+ *		reached to the same point.
+ *
+ *		In parallel lazy scan, pscan is not NULL and we get next page number
+ *		using parallel heap scan. We make two synchronization points at where
+ *		before reclaiming dead tuple actually and after reclaimed them.
  *
  *		If there are no indexes then we can reclaim line pointers on the fly;
  *		dead line pointers need only be retained until all index pointers that
  *		reference them have been killed.
  */
 static void
-lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
-			   Relation *Irel, int nindexes, bool aggressive)
+lazy_scan_heap(LVRelStats *vacrelstats, Relation onerel, Relation *Irel,
+			   int nindexes, ParallelHeapScanDesc pscan, int options,
+			   bool aggressive)
 {
-	BlockNumber nblocks,
-				blkno;
+	BlockNumber blkno;
+	BlockNumber nblocks;
 	HeapTupleData tuple;
+	LVScanDesc lvscan;
+	LVIndStats *vacindstats;
 	char	   *relname;
 	BlockNumber empty_pages,
 				vacuumed_pages;
@@ -461,10 +706,10 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 	int			i;
 	PGRUsage	ru0;
 	Buffer		vmbuffer = InvalidBuffer;
-	BlockNumber next_unskippable_block;
-	bool		skipping_blocks;
 	xl_heap_freeze_tuple *frozen;
 	StringInfoData buf;
+	bool		all_visible_according_to_vm = false;
+
 	const int	initprog_index[] = {
 		PROGRESS_VACUUM_PHASE,
 		PROGRESS_VACUUM_TOTAL_HEAP_BLKS,
@@ -482,11 +727,11 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 
 	empty_pages = vacuumed_pages = 0;
 	num_tuples = tups_vacuumed = nkeep = nunused = 0;
+	nblocks = RelationGetNumberOfBlocks(onerel);
 
 	indstats = (IndexBulkDeleteResult **)
-		palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
+		palloc0(sizeof(IndexBulkDeleteResult *) * nindexes);
 
-	nblocks = RelationGetNumberOfBlocks(onerel);
 	vacrelstats->rel_pages = nblocks;
 	vacrelstats->scanned_pages = 0;
 	vacrelstats->nonempty_pages = 0;
@@ -495,86 +740,24 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 	lazy_space_alloc(vacrelstats, nblocks);
 	frozen = palloc(sizeof(xl_heap_freeze_tuple) * MaxHeapTuplesPerPage);
 
+	/* Array of index vacuum statistics */
+	vacindstats = vacrelstats->vacindstats;
+
+	/* Begin heap scan for vacuum */
+	lvscan = lv_beginscan(vacrelstats, pscan, onerel);
+
 	/* Report that we're scanning the heap, advertising total # of blocks */
 	initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP;
 	initprog_val[1] = nblocks;
 	initprog_val[2] = vacrelstats->max_dead_tuples;
 	pgstat_progress_update_multi_param(3, initprog_index, initprog_val);
 
-	/*
-	 * Except when aggressive is set, we want to skip pages that are
-	 * all-visible according to the visibility map, but only when we can skip
-	 * at least SKIP_PAGES_THRESHOLD consecutive pages.  Since we're reading
-	 * sequentially, the OS should be doing readahead for us, so there's no
-	 * gain in skipping a page now and then; that's likely to disable
-	 * readahead and so be counterproductive. Also, skipping even a single
-	 * page means that we can't update relfrozenxid, so we only want to do it
-	 * if we can skip a goodly number of pages.
-	 *
-	 * When aggressive is set, we can't skip pages just because they are
-	 * all-visible, but we can still skip pages that are all-frozen, since
-	 * such pages do not need freezing and do not affect the value that we can
-	 * safely set for relfrozenxid or relminmxid.
-	 *
-	 * Before entering the main loop, establish the invariant that
-	 * next_unskippable_block is the next block number >= blkno that's not we
-	 * can't skip based on the visibility map, either all-visible for a
-	 * regular scan or all-frozen for an aggressive scan.  We set it to
-	 * nblocks if there's no such block.  We also set up the skipping_blocks
-	 * flag correctly at this stage.
-	 *
-	 * Note: The value returned by visibilitymap_get_status could be slightly
-	 * out-of-date, since we make this test before reading the corresponding
-	 * heap page or locking the buffer.  This is OK.  If we mistakenly think
-	 * that the page is all-visible or all-frozen when in fact the flag's just
-	 * been cleared, we might fail to vacuum the page.  It's easy to see that
-	 * skipping a page when aggressive is not set is not a very big deal; we
-	 * might leave some dead tuples lying around, but the next vacuum will
-	 * find them.  But even when aggressive *is* set, it's still OK if we miss
-	 * a page whose all-frozen marking has just been cleared.  Any new XIDs
-	 * just added to that page are necessarily newer than the GlobalXmin we
-	 * computed, so they'll have no effect on the value to which we can safely
-	 * set relfrozenxid.  A similar argument applies for MXIDs and relminmxid.
-	 *
-	 * We will scan the table's last page, at least to the extent of
-	 * determining whether it has tuples or not, even if it should be skipped
-	 * according to the above rules; except when we've already determined that
-	 * it's not worth trying to truncate the table.  This avoids having
-	 * lazy_truncate_heap() take access-exclusive lock on the table to attempt
-	 * a truncation that just fails immediately because there are tuples in
-	 * the last page.  This is worth avoiding mainly because such a lock must
-	 * be replayed on any hot standby, where it can be disruptive.
-	 */
-	next_unskippable_block = 0;
-	if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0)
-	{
-		while (next_unskippable_block < nblocks)
-		{
-			uint8		vmstatus;
+	lazy_set_my_vacstate(vacrelstats->pstate, VACSTATE_SCANNING, false, false);
 
-			vmstatus = visibilitymap_get_status(onerel, next_unskippable_block,
-												&vmbuffer);
-			if (aggressive)
-			{
-				if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0)
-					break;
-			}
-			else
-			{
-				if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) == 0)
-					break;
-			}
-			vacuum_delay_point();
-			next_unskippable_block++;
-		}
-	}
-
-	if (next_unskippable_block >= SKIP_PAGES_THRESHOLD)
-		skipping_blocks = true;
-	else
-		skipping_blocks = false;
-
-	for (blkno = 0; blkno < nblocks; blkno++)
+	while((blkno = lazy_scan_heap_get_nextpage(onerel, vacrelstats, lvscan,
+											   &all_visible_according_to_vm,
+											   &vmbuffer, options, aggressive)) !=
+		  InvalidBlockNumber)
 	{
 		Buffer		buf;
 		Page		page;
@@ -585,100 +768,21 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		int			prev_dead_count;
 		int			nfrozen;
 		Size		freespace;
-		bool		all_visible_according_to_vm = false;
 		bool		all_visible;
 		bool		all_frozen = true;	/* provided all_visible is also true */
 		bool		has_dead_tuples;
 		TransactionId visibility_cutoff_xid = InvalidTransactionId;
 
-		/* see note above about forcing scanning of last page */
-#define FORCE_CHECK_PAGE() \
-		(blkno == nblocks - 1 && should_attempt_truncation(vacrelstats))
-
 		pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
 
-		if (blkno == next_unskippable_block)
-		{
-			/* Time to advance next_unskippable_block */
-			next_unskippable_block++;
-			if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0)
-			{
-				while (next_unskippable_block < nblocks)
-				{
-					uint8		vmskipflags;
-
-					vmskipflags = visibilitymap_get_status(onerel,
-													  next_unskippable_block,
-														   &vmbuffer);
-					if (aggressive)
-					{
-						if ((vmskipflags & VISIBILITYMAP_ALL_FROZEN) == 0)
-							break;
-					}
-					else
-					{
-						if ((vmskipflags & VISIBILITYMAP_ALL_VISIBLE) == 0)
-							break;
-					}
-					vacuum_delay_point();
-					next_unskippable_block++;
-				}
-			}
-
-			/*
-			 * We know we can't skip the current block.  But set up
-			 * skipping_all_visible_blocks to do the right thing at the
-			 * following blocks.
-			 */
-			if (next_unskippable_block - blkno > SKIP_PAGES_THRESHOLD)
-				skipping_blocks = true;
-			else
-				skipping_blocks = false;
-
-			/*
-			 * Normally, the fact that we can't skip this block must mean that
-			 * it's not all-visible.  But in an aggressive vacuum we know only
-			 * that it's not all-frozen, so it might still be all-visible.
-			 */
-			if (aggressive && VM_ALL_VISIBLE(onerel, blkno, &vmbuffer))
-				all_visible_according_to_vm = true;
-		}
-		else
-		{
-			/*
-			 * The current block is potentially skippable; if we've seen a
-			 * long enough run of skippable blocks to justify skipping it, and
-			 * we're not forced to check it, then go ahead and skip.
-			 * Otherwise, the page must be at least all-visible if not
-			 * all-frozen, so we can set all_visible_according_to_vm = true.
-			 */
-			if (skipping_blocks && !FORCE_CHECK_PAGE())
-			{
-				/*
-				 * Tricky, tricky.  If this is in aggressive vacuum, the page
-				 * must have been all-frozen at the time we checked whether it
-				 * was skippable, but it might not be any more.  We must be
-				 * careful to count it as a skipped all-frozen page in that
-				 * case, or else we'll think we can't update relfrozenxid and
-				 * relminmxid.  If it's not an aggressive vacuum, we don't
-				 * know whether it was all-frozen, so we have to recheck; but
-				 * in this case an approximate answer is OK.
-				 */
-				if (aggressive || VM_ALL_FROZEN(onerel, blkno, &vmbuffer))
-					vacrelstats->frozenskipped_pages++;
-				continue;
-			}
-			all_visible_according_to_vm = true;
-		}
-
 		vacuum_delay_point();
 
 		/*
 		 * If we are close to overrunning the available space for dead-tuple
 		 * TIDs, pause and do a cycle of vacuuming before we tackle this page.
 		 */
-		if ((vacrelstats->max_dead_tuples - vacrelstats->num_dead_tuples) < MaxHeapTuplesPerPage &&
-			vacrelstats->num_dead_tuples > 0)
+		if ((vacrelstats->max_dead_tuples - MyDeadTuple->n_dt) < MaxHeapTuplesPerPage &&
+			MyDeadTuple->n_dt > 0)
 		{
 			const int	hvp_index[] = {
 				PROGRESS_VACUUM_PHASE,
@@ -687,6 +791,13 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			int64		hvp_val[2];
 
 			/*
+			 * Here scanning heap is done and we are going to reclaim dead
+			 * tuples actually. Because other vacuum worker could not finished
+			 * yet, we wait for all other workers finish.
+			 */
+			lazy_set_vacstate_and_wait_prepared(vacrelstats->pstate);
+
+			/*
 			 * Before beginning index vacuuming, we release any pin we may
 			 * hold on the visibility map page.  This isn't necessary for
 			 * correctness, but we do it anyway to avoid holding the pin
@@ -705,11 +816,12 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
 										 PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
 
-			/* Remove index entries */
+			/* Remove assigned index entries */
 			for (i = 0; i < nindexes; i++)
-				lazy_vacuum_index(Irel[i],
-								  &indstats[i],
-								  vacrelstats);
+			{
+				if (IsAssignedIndex(i, vacrelstats->pstate->nworkers))
+					lazy_vacuum_index(Irel[i], &indstats[i], vacrelstats);
+			}
 
 			/*
 			 * Report that we are now vacuuming the heap.  We also increase
@@ -724,17 +836,28 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			/* Remove tuples from heap */
 			lazy_vacuum_heap(onerel, vacrelstats);
 
+
+			/* Report that we are once again scanning the heap */
+			pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
+										 PROGRESS_VACUUM_PHASE_SCAN_HEAP);
+
 			/*
 			 * Forget the now-vacuumed tuples, and press on, but be careful
 			 * not to reset latestRemovedXid since we want that value to be
-			 * valid.
+			 * valid. In parallel lazy vacuum, we do that later process.
 			 */
-			vacrelstats->num_dead_tuples = 0;
-			vacrelstats->num_index_scans++;
+			if (vacrelstats->pstate == NULL)
+				lazy_clear_dead_tuple(vacrelstats);
 
-			/* Report that we are once again scanning the heap */
-			pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
-										 PROGRESS_VACUUM_PHASE_SCAN_HEAP);
+			/*
+			 * Here we've done vacuum on the heap and index and we are going
+			 * to begin the next round scan on heap. Wait until all vacuum worker
+			 * finished vacuum. After all vacuum workers finished, all of dead
+			 * tuple arrays are cleared by a process.
+			 */
+			lazy_set_vacstate_and_wait_finished(vacrelstats);
+
+			vacrelstats->num_index_scans++;
 		}
 
 		/*
@@ -760,7 +883,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			 * it's OK to skip vacuuming pages we get a lock conflict on. They
 			 * will be dealt with in some future vacuum.
 			 */
-			if (!aggressive && !FORCE_CHECK_PAGE())
+			if (!aggressive && !FORCE_CHECK_PAGE(blkno))
 			{
 				ReleaseBuffer(buf);
 				vacrelstats->pinskipped_pages++;
@@ -911,7 +1034,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		has_dead_tuples = false;
 		nfrozen = 0;
 		hastup = false;
-		prev_dead_count = vacrelstats->num_dead_tuples;
+		prev_dead_count = MyDeadTuple->n_dt;
 		maxoff = PageGetMaxOffsetNumber(page);
 
 		/*
@@ -1120,10 +1243,10 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 
 		/*
 		 * If there are no indexes then we can vacuum the page right now
-		 * instead of doing a second scan.
+		 * instead of doing a second scan. Because each parallel worker uses its
+		 * own dead tuple area they can vacuum independently.
 		 */
-		if (nindexes == 0 &&
-			vacrelstats->num_dead_tuples > 0)
+		if (Irel == NULL && MyDeadTuple->n_dt > 0)
 		{
 			/* Remove tuples from heap */
 			lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer);
@@ -1134,7 +1257,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			 * not to reset latestRemovedXid since we want that value to be
 			 * valid.
 			 */
-			vacrelstats->num_dead_tuples = 0;
+			lazy_clear_dead_tuple(vacrelstats);
 			vacuumed_pages++;
 		}
 
@@ -1237,7 +1360,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		 * page, so remember its free space as-is.  (This path will always be
 		 * taken if there are no indexes.)
 		 */
-		if (vacrelstats->num_dead_tuples == prev_dead_count)
+		if (MyDeadTuple->n_dt == prev_dead_count)
 			RecordPageWithFreeSpace(onerel, blkno, freespace);
 	}
 
@@ -1252,10 +1375,11 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 	vacrelstats->new_dead_tuples = nkeep;
 
 	/* now we can compute the new value for pg_class.reltuples */
-	vacrelstats->new_rel_tuples = vac_estimate_reltuples(onerel, false,
-														 nblocks,
-												  vacrelstats->scanned_pages,
-														 num_tuples);
+	if (vacrelstats->pstate == NULL)
+		vacrelstats->new_rel_tuples = vac_estimate_reltuples(onerel, false,
+															 nblocks,
+															 vacrelstats->scanned_pages,
+															 num_tuples);
 
 	/*
 	 * Release any remaining pin on visibility map page.
@@ -1268,7 +1392,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 
 	/* If any tuples need to be deleted, perform final vacuum cycle */
 	/* XXX put a threshold on min number of tuples here? */
-	if (vacrelstats->num_dead_tuples > 0)
+	if (MyDeadTuple->n_dt > 0)
 	{
 		const int	hvp_index[] = {
 			PROGRESS_VACUUM_PHASE,
@@ -1276,6 +1400,13 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		};
 		int64		hvp_val[2];
 
+		/*
+		 * Here, scanning heap is done and going to reclaim dead tuples
+		 * actually. Because other vacuum worker might not finished yet,
+		 * we need to wait for other workers finish.
+		 */
+		lazy_set_vacstate_and_wait_prepared(vacrelstats->pstate);
+
 		/* Log cleanup info before we touch indexes */
 		vacuum_log_cleanup_info(onerel, vacrelstats);
 
@@ -1285,9 +1416,10 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 
 		/* Remove index entries */
 		for (i = 0; i < nindexes; i++)
-			lazy_vacuum_index(Irel[i],
-							  &indstats[i],
-							  vacrelstats);
+		{
+			if (IsAssignedIndex(i, vacrelstats->pstate->nworkers))
+				lazy_vacuum_index(Irel[i], &indstats[i], vacrelstats);
+		}
 
 		/* Report that we are now vacuuming the heap */
 		hvp_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_HEAP;
@@ -1297,10 +1429,22 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		/* Remove tuples from heap */
 		pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
 									 PROGRESS_VACUUM_PHASE_VACUUM_HEAP);
+
 		lazy_vacuum_heap(onerel, vacrelstats);
+
+		/*
+		 * Here, we've done to vacuum on heap and going to begin the next
+		 * scan on heap. Wait until all vacuum workers finish vacuum.
+		 * Once all vacuum workers finished, one of the vacuum worker clears
+		 * dead tuple array.
+		 */
+		lazy_set_vacstate_and_wait_finished(vacrelstats);
 		vacrelstats->num_index_scans++;
 	}
 
+	/* Change my vacstate to Complete */
+	lazy_set_my_vacstate(vacrelstats->pstate, VACSTATE_COMPLETE, false, true);
+
 	/* report all blocks vacuumed; and that we're cleaning up */
 	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno);
 	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
@@ -1308,7 +1452,10 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 
 	/* Do post-vacuum cleanup and statistics update for each index */
 	for (i = 0; i < nindexes; i++)
-		lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
+	{
+		if (IsAssignedIndex(i, vacrelstats->pstate->nworkers))
+			lazy_cleanup_index(Irel[i], indstats[i], vacrelstats, &vacindstats[i]);
+	}
 
 	/* If no indexes, make log report that lazy_vacuum_heap would've made */
 	if (vacuumed_pages)
@@ -1317,6 +1464,8 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 						RelationGetRelationName(onerel),
 						tups_vacuumed, vacuumed_pages)));
 
+	lv_endscan(lvscan);
+
 	/*
 	 * This is pretty messy, but we split it up so that we can skip emitting
 	 * individual parts of the message when not applicable.
@@ -1347,6 +1496,81 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 	pfree(buf.data);
 }
 
+/*
+ * gather_vacuum_stats() -- Gather vacuum statistics from workers
+ */
+static void
+lazy_gather_vacuum_stats(ParallelContext *pcxt, LVRelStats *vacrelstats,
+									LVIndStats *vacindstats)
+{
+	int	i;
+	LVRelStats *lvstats_list;
+	LVIndStats *lvindstats_list;
+
+	lvstats_list = (LVRelStats *) shm_toc_lookup(pcxt->toc, VACUUM_KEY_VACUUM_STATS);
+	lvindstats_list = (LVIndStats *) shm_toc_lookup(pcxt->toc, VACUUM_KEY_INDEX_STATS);
+
+	/* Gather each worker stats */
+	for (i = 0; i < pcxt->nworkers; i++)
+	{
+		LVRelStats *wstats = lvstats_list + sizeof(LVRelStats) * i;
+
+		vacrelstats->scanned_pages += wstats->scanned_pages;
+		vacrelstats->pinskipped_pages += wstats->pinskipped_pages;
+		vacrelstats->frozenskipped_pages += wstats->frozenskipped_pages;
+		vacrelstats->scanned_tuples += wstats->scanned_tuples;
+		vacrelstats->new_dead_tuples += wstats->new_dead_tuples;
+		vacrelstats->pages_removed += wstats->pages_removed;
+		vacrelstats->tuples_deleted += wstats->tuples_deleted;
+		vacrelstats->nonempty_pages += wstats->nonempty_pages;
+	}
+
+	/* all vacuum workers have same value of rel_pages */
+	vacrelstats->rel_pages = lvstats_list->rel_pages;
+
+	/* Copy index vacuum statistics on DSM to local memory */
+	memcpy(vacindstats, lvindstats_list, sizeof(LVIndStats) * vacrelstats->nindexes);
+}
+
+/*
+ * lazy_update_index_stats() -- Update index vacuum statistics
+ *
+ * This routine can not be called in parallel context.
+ */
+static void
+lazy_update_index_stats(Relation onerel, LVIndStats *vacindstats)
+{
+	List *indexoidlist;
+	ListCell *indexoidscan;
+	int i;
+
+	indexoidlist = RelationGetIndexList(onerel);
+	i = 0;
+
+	foreach(indexoidscan, indexoidlist)
+	{
+		Oid indexoid = lfirst_oid(indexoidscan);
+		Relation indrel;
+
+		/* Update index relation statistics if needed */
+		if (vacindstats[i].do_update)
+		{
+			indrel = index_open(indexoid, RowExclusiveLock);
+			vac_update_relstats(indrel,
+								vacindstats[i].rel_pages,
+								vacindstats[i].rel_tuples,
+								0,
+								false,
+								InvalidTransactionId,
+								InvalidMultiXactId,
+								false);
+			index_close(indrel, RowExclusiveLock);
+		}
+		i++;
+	}
+
+	list_free(indexoidlist);
+}
 
 /*
  *	lazy_vacuum_heap() -- second pass over the heap
@@ -1371,7 +1595,8 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
 	npages = 0;
 
 	tupindex = 0;
-	while (tupindex < vacrelstats->num_dead_tuples)
+
+	while (tupindex < MyDeadTuple->n_dt)
 	{
 		BlockNumber tblk;
 		Buffer		buf;
@@ -1380,7 +1605,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
 
 		vacuum_delay_point();
 
-		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
+		tblk = ItemPointerGetBlockNumber(&MyDeadTuple->dt_array[tupindex]);
 		buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL,
 								 vac_strategy);
 		if (!ConditionalLockBufferForCleanup(buf))
@@ -1421,7 +1646,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
  *
  * Caller must hold pin and buffer cleanup lock on the buffer.
  *
- * tupindex is the index in vacrelstats->dead_tuples of the first dead
+ * tupindex is the index in MyDeadTuple->dt_array of the first dead
  * tuple for this page.  We assume the rest follow sequentially.
  * The return value is the first tupindex after the tuples of this page.
  */
@@ -1439,16 +1664,16 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 
 	START_CRIT_SECTION();
 
-	for (; tupindex < vacrelstats->num_dead_tuples; tupindex++)
+	for (; tupindex < MyDeadTuple->n_dt; tupindex++)
 	{
 		BlockNumber tblk;
 		OffsetNumber toff;
 		ItemId		itemid;
 
-		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
+		tblk = ItemPointerGetBlockNumber(&MyDeadTuple->dt_array[tupindex]);
 		if (tblk != blkno)
 			break;				/* past end of tuples for this block */
-		toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]);
+		toff = ItemPointerGetOffsetNumber(&MyDeadTuple->dt_array[tupindex]);
 		itemid = PageGetItemId(page, toff);
 		ItemIdSetUnused(itemid);
 		unused[uncnt++] = toff;
@@ -1573,7 +1798,7 @@ lazy_check_needs_freeze(Buffer buf, bool *hastup)
  *	lazy_vacuum_index() -- vacuum one index relation.
  *
  *		Delete all the index entries pointing to tuples listed in
- *		vacrelstats->dead_tuples, and update running statistics.
+ *		MyDeadTuple->dt_array, and update running statistics.
  */
 static void
 lazy_vacuum_index(Relation indrel,
@@ -1582,6 +1807,7 @@ lazy_vacuum_index(Relation indrel,
 {
 	IndexVacuumInfo ivinfo;
 	PGRUsage	ru0;
+	double total_n_dead_tuples = 0;
 
 	pg_rusage_init(&ru0);
 
@@ -1596,10 +1822,25 @@ lazy_vacuum_index(Relation indrel,
 	*stats = index_bulk_delete(&ivinfo, *stats,
 							   lazy_tid_reaped, (void *) vacrelstats);
 
+	/* Count total number of scanned tuples during index vacuum */
+	if (vacrelstats->pstate == NULL)
+		total_n_dead_tuples = MyDeadTuple->n_dt;
+	else
+	{
+		int i;
+
+		/*
+		 * Since there is no vacuum worker who updates dead tuple during
+		 * reclaim phase. We can read them without holding lock.
+		 */
+		for (i = 0; i < vacrelstats->pstate->nworkers; i++)
+			total_n_dead_tuples += (vacrelstats->dead_tuples[i]).n_dt;
+	}
+
 	ereport(elevel,
-			(errmsg("scanned index \"%s\" to remove %d row versions",
+			(errmsg("scanned index \"%s\" to remove %0.f row versions",
 					RelationGetRelationName(indrel),
-					vacrelstats->num_dead_tuples),
+					total_n_dead_tuples),
 			 errdetail("%s.", pg_rusage_show(&ru0))));
 }
 
@@ -1609,7 +1850,8 @@ lazy_vacuum_index(Relation indrel,
 static void
 lazy_cleanup_index(Relation indrel,
 				   IndexBulkDeleteResult *stats,
-				   LVRelStats *vacrelstats)
+				   LVRelStats *vacrelstats,
+				   LVIndStats *vacindstats)
 {
 	IndexVacuumInfo ivinfo;
 	PGRUsage	ru0;
@@ -1630,17 +1872,31 @@ lazy_cleanup_index(Relation indrel,
 
 	/*
 	 * Now update statistics in pg_class, but only if the index says the count
-	 * is accurate.
+	 * is accurate. In parallel lazy vacuum, the worker can not update these
+	 * information by itself, so save to DSM and then the launcher process
+	 * updates it later.
 	 */
 	if (!stats->estimated_count)
-		vac_update_relstats(indrel,
-							stats->num_pages,
-							stats->num_index_tuples,
-							0,
-							false,
-							InvalidTransactionId,
-							InvalidMultiXactId,
-							false);
+	{
+		if (IsParallelWorker())
+		{
+			/* Save to shared memory */
+			vacindstats->do_update = true;
+			vacindstats->rel_pages = stats->num_pages;
+			vacindstats->rel_tuples = stats->num_index_tuples;
+		}
+		else
+		{
+			vac_update_relstats(indrel,
+								stats->num_pages,
+								stats->num_index_tuples,
+								0,
+								false,
+								InvalidTransactionId,
+								InvalidMultiXactId,
+								false);
+		}
+	}
 
 	ereport(elevel,
 			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
@@ -1938,62 +2194,102 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
 /*
  * lazy_space_alloc - space allocation decisions for lazy vacuum
  *
+ * In parallel lazy vacuum the space for dead tuple locations are already
+ * allocated in dynamic shared memory, so we allocate space for dead tuple
+ * locations in local memory only when in not parallel lazy vacuum and set
+ * MyDeadTuple.
+ *
  * See the comments at the head of this file for rationale.
  */
 static void
 lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
 {
-	long		maxtuples;
-	int			vac_work_mem = IsAutoVacuumWorkerProcess() &&
-	autovacuum_work_mem != -1 ?
-	autovacuum_work_mem : maintenance_work_mem;
-
-	if (vacrelstats->hasindex)
+	/*
+	 * If in not parallel lazy vacuum, we need to allocate dead
+	 * tuple array in local memory.
+	 */
+	if (vacrelstats->pstate == NULL)
 	{
-		maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData);
-		maxtuples = Min(maxtuples, INT_MAX);
-		maxtuples = Min(maxtuples, MaxAllocSize / sizeof(ItemPointerData));
-
-		/* curious coding here to ensure the multiplication can't overflow */
-		if ((BlockNumber) (maxtuples / LAZY_ALLOC_TUPLES) > relblocks)
-			maxtuples = relblocks * LAZY_ALLOC_TUPLES;
+		long maxtuples = lazy_get_max_dead_tuple(vacrelstats);
 
-		/* stay sane if small maintenance_work_mem */
-		maxtuples = Max(maxtuples, MaxHeapTuplesPerPage);
+		vacrelstats->dead_tuples = (LVDeadTuple *) palloc(sizeof(LVDeadTuple));
+		MyDeadTuple = vacrelstats->dead_tuples;
+		MyDeadTuple->dt_array = palloc0(sizeof(ItemPointerData) * (int)maxtuples);
+		vacrelstats->max_dead_tuples = maxtuples;
 	}
 	else
 	{
-		maxtuples = MaxHeapTuplesPerPage;
+		/*
+		 * In parallel lazy vacuum, we initialize the dead tuple array.
+		 * LVDeadTuple array is structed at the beginning of dead_tuples variable,
+		 * so remaining space can be used for dead tuple array. The dt_base variable
+		 * points to the beginning of dead tuple array.
+		 */
+
+		char *dt_base = (char *)vacrelstats->dead_tuples;
+		LVDeadTuple *dt = &(vacrelstats->dead_tuples[ParallelWorkerNumber]);
+
+		/* Adjust dt_base to the beginning of dead tuple array */
+		dt_base += sizeof(LVDeadTuple) * vacrelstats->pstate->nworkers;
+		dt->dt_array = (ItemPointer)
+			(dt_base + sizeof(ItemPointerData) * vacrelstats->max_dead_tuples * ParallelWorkerNumber);
+
+		/* set MyDeadTuple */
+		MyDeadTuple = dt;
 	}
 
-	vacrelstats->num_dead_tuples = 0;
-	vacrelstats->max_dead_tuples = (int) maxtuples;
-	vacrelstats->dead_tuples = (ItemPointer)
-		palloc(maxtuples * sizeof(ItemPointerData));
+	MyDeadTuple->n_dt = 0;
 }
 
 /*
  * lazy_record_dead_tuple - remember one deletable tuple
  */
 static void
-lazy_record_dead_tuple(LVRelStats *vacrelstats,
-					   ItemPointer itemptr)
+lazy_record_dead_tuple(LVRelStats *vacrelstats, ItemPointer itemptr)
 {
 	/*
 	 * The array shouldn't overflow under normal behavior, but perhaps it
 	 * could if we are given a really small maintenance_work_mem. In that
 	 * case, just forget the last few tuples (we'll get 'em next time).
 	 */
-	if (vacrelstats->num_dead_tuples < vacrelstats->max_dead_tuples)
+	if (MyDeadTuple->n_dt < vacrelstats->max_dead_tuples)
 	{
-		vacrelstats->dead_tuples[vacrelstats->num_dead_tuples] = *itemptr;
-		vacrelstats->num_dead_tuples++;
-		pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
-									 vacrelstats->num_dead_tuples);
+		/*
+		 * In parallel lzy vacuum, since each parallel vacuum worker has
+		 * its own dead tuple array we don't need to do this exclusively.
+		 */
+		MyDeadTuple->dt_array[MyDeadTuple->n_dt] = *itemptr;
+		(MyDeadTuple->n_dt)++;
+
+		/* XXX : Update progress information here */
 	}
 }
 
 /*
+ *  lazy_clear_dead_tuple() -- clear dead tuple list
+ */
+static void
+lazy_clear_dead_tuple(LVRelStats *vacrelstats)
+{
+	/*
+	 * In parallel lazy vacuum one of the parallel worker is responsible
+	 * for clearing all dead tuples. Note that we're assumed that only
+	 * one process touches the dead tuple array.
+	 */
+	if (vacrelstats->pstate != NULL && vacrelstats->nindexes != 0)
+	{
+		int i;
+		for (i = 0; i < vacrelstats->pstate->nworkers; i++)
+		{
+			LVDeadTuple *dead_tuples = &(vacrelstats->dead_tuples[i]);
+			dead_tuples->n_dt = 0;
+		}
+	}
+	else
+		MyDeadTuple->n_dt = 0;
+}
+
+/*
  *	lazy_tid_reaped() -- is a particular tid deletable?
  *
  *		This has the right signature to be an IndexBulkDeleteCallback.
@@ -2005,14 +2301,33 @@ lazy_tid_reaped(ItemPointer itemptr, void *state)
 {
 	LVRelStats *vacrelstats = (LVRelStats *) state;
 	ItemPointer res;
+	int i;
+	int num = (vacrelstats->pstate == NULL) ? 1 : vacrelstats->pstate->nworkers;
+
+	/*
+	 * In parallel lazy vacuum all dead tuple TID locations are stored into
+	 * dynamic shared memory together and entire dead tuple arrays is not
+	 * ordered. However since each dead tuple array corresponding vacuum
+	 * worker is ordered by TID location we can search 'num' times. Here
+	 * since no write happends vacuum worker access the dead tuple array
+	 * without holding lock.
+	 */
+	for (i = 0; i < num; i++)
+	{
+		ItemPointer dead_tuples = (vacrelstats->dead_tuples[i]).dt_array;
+		int n_tuples = (vacrelstats->dead_tuples[i]).n_dt;
 
-	res = (ItemPointer) bsearch((void *) itemptr,
-								(void *) vacrelstats->dead_tuples,
-								vacrelstats->num_dead_tuples,
-								sizeof(ItemPointerData),
-								vac_cmp_itemptr);
+		res = (ItemPointer) bsearch((void *) itemptr,
+									(void *) dead_tuples,
+									n_tuples,
+									sizeof(ItemPointerData),
+									vac_cmp_itemptr);
 
-	return (res != NULL);
+		if (res != NULL)
+			return true;
+	}
+
+	return false;
 }
 
 /*
@@ -2156,3 +2471,649 @@ heap_page_is_all_visible(Relation rel, Buffer buf,
 
 	return all_visible;
 }
+
+/*
+ * Return the block number we need to scan next, or InvalidBlockNumber if scan
+ * is done.
+ *
+ * Except when aggressive is set, we want to skip pages that are
+ * all-visible according to the visibility map, but only when we can skip
+ * at least SKIP_PAGES_THRESHOLD consecutive pages If we're not in parallel
+ * mode.  Since we're reading sequentially, the OS should be doing readahead
+ * for us, so there's no gain in skipping a page now and then; that's likely
+ * to disable readahead and so be counterproductive. Also, skipping even a
+ * single page means that we can't update relfrozenxid, so we only want to do it
+ * if we can skip a goodly number of pages.
+ *
+ * When aggressive is set, we can't skip pages just because they are
+ * all-visible, but we can still skip pages that are all-frozen, since
+ * such pages do not need freezing and do not affect the value that we can
+ * safely set for relfrozenxid or relminmxid.
+ *
+ * In not parallel mode, before entering the main loop, establish the
+ * invariant that next_unskippable_block is the next block number >= blkno
+ * that's not we can't skip based on the visibility map, either all-visible
+ * for a regular scan or all-frozen for an aggressive scan.  We set it to
+ * nblocks if there's no such block.  We also set up the skipping_blocks
+ * flag correctly at this stage.
+ *
+ * In parallel mode, vacrelstats->pstate is not NULL. We scan heap pages
+ * using parallel heap scan description. Each worker calls heap_parallelscan_nextpage()
+ * in order to exclusively get  block number we need to scan at next.
+ * If given block is all-visible according to visibility map, we skip to
+ * scan this block immediately unlike not parallel lazy scan.
+ *
+ * Note: The value returned by visibilitymap_get_status could be slightly
+ * out-of-date, since we make this test before reading the corresponding
+ * heap page or locking the buffer.  This is OK.  If we mistakenly think
+ * that the page is all-visible or all-frozen when in fact the flag's just
+ * been cleared, we might fail to vacuum the page.  It's easy to see that
+ * skipping a page when aggressive is not set is not a very big deal; we
+ * might leave some dead tuples lying around, but the next vacuum will
+ * find them.  But even when aggressive *is* set, it's still OK if we miss
+ * a page whose all-frozen marking has just been cleared.  Any new XIDs
+ * just added to that page are necessarily newer than the GlobalXmin we
+ * computed, so they'll have no effect on the value to which we can safely
+ * set relfrozenxid.  A similar argument applies for MXIDs and relminmxid.
+ *
+ * We will scan the table's last page, at least to the extent of
+ * determining whether it has tuples or not, even if it should be skipped
+ * according to the above rules; except when we've already determined that
+ * it's not worth trying to truncate the table.  This avoids having
+ * lazy_truncate_heap() take access-exclusive lock on the table to attempt
+ * a truncation that just fails immediately because there are tuples in
+ * the last page.  This is worth avoiding mainly because such a lock must
+ * be replayed on any hot standby, where it can be disruptive.
+ */
+static BlockNumber
+lazy_scan_heap_get_nextpage(Relation onerel, LVRelStats *vacrelstats,
+							LVScanDesc lvscan, bool *all_visible_according_to_vm,
+							Buffer *vmbuffer, int options, bool aggressive)
+{
+	BlockNumber blkno;
+
+	if (vacrelstats->pstate != NULL)
+	{
+		/*
+		 * In parallel lazy vacuum since it's hard to know how many consecutive
+		 * all-visible pages exits on table we skip to scan the heap page immediately.
+		 * if it is all-visible page.
+		 */
+		while ((blkno = heap_parallelscan_nextpage(lvscan->heapscan)) != InvalidBlockNumber)
+		{
+			*all_visible_according_to_vm = false;
+			vacuum_delay_point();
+
+			/* Consider to skip scan page according visibility map */
+			if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0 &&
+				!FORCE_CHECK_PAGE(blkno))
+			{
+				uint8		vmstatus;
+
+				vmstatus = visibilitymap_get_status(onerel, blkno, vmbuffer);
+
+				if (aggressive)
+				{
+					if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) != 0)
+					{
+						vacrelstats->frozenskipped_pages++;
+						continue;
+					}
+					else if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) != 0)
+						*all_visible_according_to_vm = true;
+				}
+				else
+				{
+					if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) != 0)
+					{
+						if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) != 0)
+							vacrelstats->frozenskipped_pages++;
+						continue;
+					}
+				}
+			}
+
+			/* We need to scan current blkno, break */
+			break;
+		}
+	}
+	else
+	{
+		bool skipping_blocks = false;
+
+		/* Initialize lv_nextunskippable_page if needed */
+		if (lvscan->lv_cblock == 0 && (options & VACOPT_DISABLE_PAGE_SKIPPING) == 0)
+		{
+			while (lvscan->lv_next_unskippable_block < lvscan->lv_nblocks)
+			{
+				uint8		vmstatus;
+
+				vmstatus = visibilitymap_get_status(onerel, lvscan->lv_next_unskippable_block,
+													vmbuffer);
+				if (aggressive)
+				{
+					if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0)
+						break;
+				}
+				else
+				{
+					if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) == 0)
+						break;
+				}
+				vacuum_delay_point();
+				lvscan->lv_next_unskippable_block++;
+			}
+
+			if (lvscan->lv_next_unskippable_block >= SKIP_PAGES_THRESHOLD)
+				skipping_blocks = true;
+			else
+				skipping_blocks = false;
+		}
+
+		/* Decide the block number we need to scan */
+		for (blkno = lvscan->lv_cblock; blkno < lvscan->lv_nblocks; blkno++)
+		{
+			if (blkno == lvscan->lv_next_unskippable_block)
+			{
+				/* Time to advance next_unskippable_block */
+				lvscan->lv_next_unskippable_block++;
+				if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0)
+				{
+					while (lvscan->lv_next_unskippable_block < lvscan->lv_nblocks)
+					{
+						uint8		vmstatus;
+
+						vmstatus = visibilitymap_get_status(onerel,
+															lvscan->lv_next_unskippable_block,
+															vmbuffer);
+						if (aggressive)
+						{
+							if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0)
+								break;
+						}
+						else
+						{
+							if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) == 0)
+								break;
+						}
+						vacuum_delay_point();
+						lvscan->lv_next_unskippable_block++;
+					}
+				}
+
+				/*
+				 * We know we can't skip the current block.  But set up
+				 * skipping_all_visible_blocks to do the right thing at the
+				 * following blocks.
+				 */
+				if (lvscan->lv_next_unskippable_block - blkno > SKIP_PAGES_THRESHOLD)
+					skipping_blocks = true;
+				else
+					skipping_blocks = false;
+
+				/*
+				 * Normally, the fact that we can't skip this block must mean that
+				 * it's not all-visible.  But in an aggressive vacuum we know only
+				 * that it's not all-frozen, so it might still be all-visible.
+				 */
+				if (aggressive && VM_ALL_VISIBLE(onerel, blkno, vmbuffer))
+					*all_visible_according_to_vm = true;
+
+				/* Found out that next unskippable block number */
+				break;
+			}
+			else
+			{
+				/*
+				 * The current block is potentially skippable; if we've seen a
+				 * long enough run of skippable blocks to justify skipping it, and
+				 * we're not forced to check it, then go ahead and skip.
+				 * Otherwise, the page must be at least all-visible if not
+				 * all-frozen, so we can set all_visible_according_to_vm = true.
+				 */
+				if (skipping_blocks && !FORCE_CHECK_PAGE(blkno))
+				{
+					/*
+					 * Tricky, tricky.  If this is in aggressive vacuum, the page
+					 * must have been all-frozen at the time we checked whether it
+					 * was skippable, but it might not be any more.  We must be
+					 * careful to count it as a skipped all-frozen page in that
+					 * case, or else we'll think we can't update relfrozenxid and
+					 * relminmxid.  If it's not an aggressive vacuum, we don't
+					 * know whether it was all-frozen, so we have to recheck; but
+					 * in this case an approximate answer is OK.
+					 */
+					if (aggressive || VM_ALL_FROZEN(onerel, blkno, vmbuffer))
+						vacrelstats->frozenskipped_pages++;
+					continue;
+				}
+
+				*all_visible_according_to_vm = true;
+
+				/* We need to scan current blkno, break */
+				break;
+			}
+		} /* for */
+
+		/* Advance the current block number for the next scan */
+		lvscan->lv_cblock = blkno + 1;
+	}
+
+	return (blkno == lvscan->lv_nblocks) ? InvalidBlockNumber : blkno;
+}
+
+/*
+ * Begin lazy vacuum scan. lvscan->heapscan is NULL if
+ * we're not in parallel lazy vacuum.
+ */
+static LVScanDesc
+lv_beginscan(LVRelStats *vacrelstats, ParallelHeapScanDesc pscan,
+			 Relation onerel)
+{
+	LVScanDesc lvscan;
+
+	lvscan = (LVScanDesc) palloc(sizeof(LVScanDescData));
+
+	lvscan->lv_cblock = 0;
+	lvscan->lv_next_unskippable_block = 0;
+	lvscan->lv_nblocks = vacrelstats->rel_pages;
+
+	if (pscan != NULL)
+		lvscan->heapscan = heap_beginscan_parallel(onerel, pscan);
+	else
+		lvscan->heapscan = NULL;
+
+	return lvscan;
+}
+
+/*
+ * End lazy vacuum scan.
+ */
+static void
+lv_endscan(LVScanDesc lvscan)
+{
+	if (lvscan->heapscan != NULL)
+		heap_endscan(lvscan->heapscan);
+	pfree(lvscan);
+}
+
+/* ----------------------------------------------------------------
+ *						Parallel Lazy Vacuum Support
+ * ----------------------------------------------------------------
+ */
+
+/*
+ * Estimate storage for parallel lazy vacuum.
+ */
+static void
+lazy_estimate_dsm(ParallelContext *pcxt, long maxtuples, int nindexes)
+{
+	int size = 0;
+	int keys = 0;
+
+	/* Estimate size for parallel heap scan */
+	size += heap_parallelscan_estimate(SnapshotAny);
+	keys++;
+
+	/* Estimate size for vacuum statistics */
+	size += BUFFERALIGN(sizeof(LVRelStats) * pcxt->nworkers);
+	keys++;
+
+	/* Estimate size fo index vacuum statistics */
+	size += BUFFERALIGN(sizeof(LVIndStats) * nindexes);
+	keys++;
+
+	/* Estimate size for dead tuple arrays */
+	size += BUFFERALIGN((sizeof(LVDeadTuple) + sizeof(ItemPointerData) * maxtuples) * pcxt->nworkers);
+	keys++;
+
+	/* Estimate size for parallel lazy vacuum state */
+	size += BUFFERALIGN(sizeof(LVParallelState) + sizeof(VacWorker) * pcxt->nworkers);
+	keys++;
+
+	/* Estimate size for vacuum task */
+	size += BUFFERALIGN(sizeof(VacuumTask));
+	keys++;
+
+	shm_toc_estimate_chunk(&pcxt->estimator, size);
+	shm_toc_estimate_keys(&pcxt->estimator, keys);
+}
+
+/*
+ * Initialize dynamic shared memory for parallel lazy vacuum. We store
+ * relevant informations of parallel heap scanning, dead tuple array
+ * and vacuum statistics for each worker and some parameters for
+ * lazy vacuum.
+ */
+static void
+lazy_initialize_dsm(ParallelContext *pcxt, Relation onerel,
+					LVRelStats *vacrelstats, int options,
+					bool aggressive)
+{
+	ParallelHeapScanDesc pscan;
+	LVRelStats *lvrelstats;
+	LVIndStats *lvindstats;
+	LVDeadTuple *dead_tuples;
+	LVParallelState *pstate;
+	VacuumTask	*vacuum_task;
+	int i;
+	int dead_tuples_size;
+	int pstate_size;
+
+	/* Allocate and initialize DSM for parallel scan description */
+	pscan = (ParallelHeapScanDesc) shm_toc_allocate(pcxt->toc,
+													heap_parallelscan_estimate(SnapshotAny));
+
+	shm_toc_insert(pcxt->toc, VACUUM_KEY_PARALLEL_SCAN, pscan);
+	heap_parallelscan_initialize(pscan, onerel, SnapshotAny);
+
+	/* Allocate and initialize DSM for vacuum stats for each worker */
+	lvrelstats = (LVRelStats *)shm_toc_allocate(pcxt->toc,
+											 sizeof(LVRelStats) * pcxt->nworkers);
+	shm_toc_insert(pcxt->toc, VACUUM_KEY_VACUUM_STATS, lvrelstats);
+	for (i = 0; i < pcxt->nworkers; i++)
+	{
+		LVRelStats *stats = lvrelstats + sizeof(LVRelStats) * i;
+
+		memcpy(stats, vacrelstats, sizeof(LVRelStats));
+	}
+
+	/* Allocate and initialize DSM for dead tuple array */
+	dead_tuples_size = sizeof(LVDeadTuple) * pcxt->nworkers;
+	dead_tuples_size += sizeof(ItemPointerData) * vacrelstats->max_dead_tuples * pcxt->nworkers;
+	dead_tuples = (LVDeadTuple *) shm_toc_allocate(pcxt->toc, dead_tuples_size);
+	vacrelstats->dead_tuples = dead_tuples;
+	shm_toc_insert(pcxt->toc, VACUUM_KEY_DEAD_TUPLES, dead_tuples);
+
+	/* Allocate DSM for index vacuum statistics */
+	lvindstats = (LVIndStats *) shm_toc_allocate(pcxt->toc,
+												 sizeof(LVIndStats) * vacrelstats->nindexes);
+	shm_toc_insert(pcxt->toc, VACUUM_KEY_INDEX_STATS, lvindstats);
+
+
+	/* Allocate and initialize DSM for parallel state */
+	pstate_size = sizeof(LVParallelState) + sizeof(VacWorker) * pcxt->nworkers;
+	pstate = (LVParallelState *) shm_toc_allocate(pcxt->toc, pstate_size);
+	shm_toc_insert(pcxt->toc, VACUUM_KEY_PARALLEL_STATE, pstate);
+	pstate->nworkers = pcxt->nworkers;
+	ConditionVariableInit(&(pstate->cv));
+	SpinLockInit(&(pstate->mutex));
+
+	/* Allocate and initialize DSM for vacuum task */
+	vacuum_task = (VacuumTask *) shm_toc_allocate(pcxt->toc, sizeof(VacuumTask));
+	shm_toc_insert(pcxt->toc, VACUUM_KEY_VACUUM_TASK, vacuum_task);
+	vacuum_task->aggressive = aggressive;
+	vacuum_task->options = options;
+	vacuum_task->oldestxmin = OldestXmin;
+	vacuum_task->freezelimit = FreezeLimit;
+	vacuum_task->multixactcutoff = MultiXactCutoff;
+	vacuum_task->elevel = elevel;
+}
+
+/*
+ * Initialize parallel lazy vacuum for worker.
+ */
+static void
+lazy_initialize_worker(shm_toc *toc, ParallelHeapScanDesc *pscan,
+						   LVRelStats **vacrelstats, int *options,
+						   bool *aggressive)
+{
+	LVRelStats *lvstats;
+	LVIndStats *vacindstats;
+	VacuumTask	*vacuum_task;
+	LVDeadTuple *dead_tuples;
+	LVParallelState *pstate;
+
+	/* Set up parallel heap scan description */
+	*pscan = (ParallelHeapScanDesc) shm_toc_lookup(toc, VACUUM_KEY_PARALLEL_SCAN);
+
+	/* Set up vacuum stats */
+	lvstats = (LVRelStats *) shm_toc_lookup(toc, VACUUM_KEY_VACUUM_STATS);
+	*vacrelstats = lvstats + sizeof(LVRelStats) * ParallelWorkerNumber;
+
+	/* Set up vacuum index statistics */
+	vacindstats = (LVIndStats *) shm_toc_lookup(toc, VACUUM_KEY_INDEX_STATS);
+	(*vacrelstats)->vacindstats = (LVIndStats *)vacindstats;
+
+	/* Set up dead tuple list */
+	dead_tuples = (LVDeadTuple *) shm_toc_lookup(toc, VACUUM_KEY_DEAD_TUPLES);
+	(*vacrelstats)->dead_tuples = dead_tuples;
+
+	/* Set up vacuum task */
+	vacuum_task = (VacuumTask *) shm_toc_lookup(toc, VACUUM_KEY_VACUUM_TASK);
+
+	/* Set up parallel vacuum state */
+	pstate = (LVParallelState *) shm_toc_lookup(toc, VACUUM_KEY_PARALLEL_STATE);
+	(*vacrelstats)->pstate = pstate;
+	MyVacWorker = &(pstate->vacworkers[ParallelWorkerNumber]);
+	MyVacWorker->state = VACSTATE_STARTUP;
+
+	/* Set up parameters for lazy vacuum */
+	OldestXmin = vacuum_task->oldestxmin;
+	FreezeLimit = vacuum_task->freezelimit;
+	MultiXactCutoff = vacuum_task->multixactcutoff;
+	elevel = vacuum_task->elevel;
+	*options = vacuum_task->options;
+	*aggressive = vacuum_task->aggressive;
+}
+
+/*
+ * Set my vacuum state exclusively and wait until all vacuum workers
+ * finish vacuum.
+ */
+static void
+lazy_set_vacstate_and_wait_finished(LVRelStats *vacrelstats)
+{
+	LVParallelState *pstate = vacrelstats->pstate;
+	uint32 round;
+	int n_count, n_comp;
+
+	/* Exit if in not parallel vacuum */
+	if (pstate == NULL)
+		return;
+
+	SpinLockAcquire(&(pstate->mutex));
+
+	/* Change my vacstate */
+	round = MyVacWorker->round;
+	MyVacWorker->state = VACSTATE_VACUUM_FINISHED;
+
+	/* Check all vacuum worker states */
+	n_count = lazy_count_vacstate_finished(pstate, round, &n_comp);
+
+	/*
+	 * If I'm a last running worker that has reached to here then clear
+	 * dead tuple. Note that clearing dead tuple array must be done
+	 * by only one worker and during holding lock.
+	 */
+	if ((n_count + n_comp) == pstate->nworkers)
+		lazy_clear_dead_tuple(vacrelstats);
+
+	SpinLockRelease(&(pstate->mutex));
+
+	ConditionVariablePrepareToSleep(&(pstate->cv));
+
+	/* Sleep until all of vacuum workers reached here */
+	while (!lazy_check_vacstate_finished(pstate, round))
+		ConditionVariableSleep(&(pstate->cv), WAIT_EVENT_PARALLEL_FINISH);
+
+	ConditionVariableCancelSleep();
+
+	/* For next round scan, change its state and increment round number */
+	lazy_set_my_vacstate(pstate, VACSTATE_SCANNING, true, false);
+}
+
+/*
+ * Set my vacuum state exclusively and wait until all vacuum workers
+ * prepared vacuum.
+ */
+static void
+lazy_set_vacstate_and_wait_prepared(LVParallelState *pstate)
+{
+	uint32 round;
+
+	/* Exit if in not parallel vacuum */
+	if (pstate == NULL)
+		return;
+
+	/* update my vacstate */
+	round = lazy_set_my_vacstate(pstate, VACSTATE_VACUUM_PREPARED, false, true);
+
+	ConditionVariablePrepareToSleep(&(pstate->cv));
+
+	/* Sleep until all of vacuum workers reached here */
+	while (!lazy_check_vacstate_prepared(pstate, round))
+		ConditionVariableSleep(&(pstate->cv), WAIT_EVENT_PARALLEL_FINISH);
+
+	ConditionVariableCancelSleep();
+
+	/* For next round scan, change its state */
+	lazy_set_my_vacstate(pstate, VACSTATE_VACUUMING, false, false);
+}
+
+/*
+ * Set my vacstate. After set state we increment its round and notice other
+ * waiting process if required. Return current its round number.
+ */
+static uint32
+lazy_set_my_vacstate(LVParallelState *pstate, uint8 state, bool nextloop,
+					 bool broadcast)
+{
+	uint32 round;
+
+	/* Quick exit if in not parallel vacuum */
+	if (pstate == NULL)
+		return 0;
+
+	Assert(IsParallelWorker());
+
+	SpinLockAcquire(&(pstate->mutex));
+
+	MyVacWorker->state = state;
+	round = MyVacWorker->round;
+
+	/* Increment its round number */
+	if (nextloop)
+		(MyVacWorker->round)++;
+
+	SpinLockRelease(&(pstate->mutex));
+
+	/* Notice other waiting vacuum worker */
+	if (broadcast)
+		ConditionVariableBroadcast(&(pstate->cv));
+
+	return round;
+}
+
+/*
+ * Check if all vacuum workers have finished to scan heap and prepared to
+ * reclaim dead tuple. Return true if all vacuum workers have prepared.
+ * Otherwise return false.
+ */
+static bool
+lazy_check_vacstate_prepared(LVParallelState *pstate, uint32 round)
+{
+	int n_count = 0;
+	int n_comp = 0;
+	int i;
+
+	SpinLockAcquire(&(pstate->mutex));
+
+	/*
+	 * Count vacuum workers who are in coutable_state on same round and
+	 * who are in VACSTATE_COMPLETE state.
+	 */
+	for (i = 0; i < pstate->nworkers; i++)
+	{
+		VacWorker *vacworker = &(pstate->vacworkers[i]);
+		uint32 w_round = vacworker->round;
+
+		if ((vacworker->state & VACPHASE_VACUUM) != 0 && w_round ==  round)
+			n_count++;
+		else if (vacworker->state == VACSTATE_COMPLETE)
+			n_comp++;
+	}
+
+	SpinLockRelease(&(pstate->mutex));
+
+	return (n_count + n_comp) == pstate->nworkers;
+}
+
+/*
+ * Check if all vacuum workers have finished vacuum on table and index.
+ * Return true if all vacuum workers have finished. Otherwise return false.
+ */
+static bool
+lazy_check_vacstate_finished(LVParallelState *pstate, uint32 round)
+{
+	int n_count, n_comp;
+
+	SpinLockAcquire(&(pstate->mutex));
+	n_count = lazy_count_vacstate_finished(pstate, round, &n_comp);
+	SpinLockRelease(&(pstate->mutex));
+
+	return (n_count + n_comp) == pstate->nworkers;
+}
+
+/*
+ * When counting the number of vacuum worker who has finished to vacuum
+ * on table and index, some vacuum workers could proceed to subsequent
+ * state on next round. We count the number of vacuum worker who is in the same
+ * state or is in subsequent state on next round. Caller must hold mutex lock.
+ */
+static int
+lazy_count_vacstate_finished(LVParallelState *pstate, uint32 round, int *n_complete)
+{
+	int n_count = 0;
+	int n_comp = 0;
+	int i;
+
+	for (i = 0; i < pstate->nworkers; i++)
+	{
+		VacWorker *vacworker = &(pstate->vacworkers[i]);
+		uint32 w_round = vacworker->round;
+
+		if (((vacworker->state & VACSTATE_VACUUM_FINISHED) != 0 && w_round == round) ||
+			((vacworker->state & VACPHASE_SCAN) != 0 && w_round == (round + 1)))
+			n_count++;
+		else if (vacworker->state == VACSTATE_COMPLETE)
+			n_comp++;
+	}
+
+	*n_complete = n_comp;
+
+	return n_count;
+}
+
+/*
+ * Return the number of maximum dead tuples can be stored according
+ * to vac_work_mem.
+ */
+static long
+lazy_get_max_dead_tuple(LVRelStats *vacrelstats)
+{
+	long maxtuples;
+	int	vac_work_mem = IsAutoVacuumWorkerProcess() &&
+		autovacuum_work_mem != -1 ?
+		autovacuum_work_mem : maintenance_work_mem;
+
+	if (vacrelstats->nindexes != 0)
+	{
+		maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData);
+		maxtuples = Min(maxtuples, INT_MAX);
+		maxtuples = Min(maxtuples, MaxAllocSize / sizeof(ItemPointerData));
+
+		/* curious coding here to ensure the multiplication can't overflow */
+		if ((BlockNumber) (maxtuples / LAZY_ALLOC_TUPLES) > vacrelstats->old_rel_pages)
+			maxtuples = vacrelstats->old_rel_pages * LAZY_ALLOC_TUPLES;
+
+		/* stay sane if small maintenance_work_mem */
+		maxtuples = Max(maxtuples, MaxHeapTuplesPerPage);
+	}
+	else
+	{
+		maxtuples = MaxHeapTuplesPerPage;
+	}
+
+	return maxtuples;
+}
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index a27e5ed..c3bf0d9 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1595,7 +1595,12 @@ _equalDropdbStmt(const DropdbStmt *a, const DropdbStmt *b)
 static bool
 _equalVacuumStmt(const VacuumStmt *a, const VacuumStmt *b)
 {
-	COMPARE_SCALAR_FIELD(options);
+	if (a->options.flags != b->options.flags)
+		return false;
+
+	if (a->options.nworkers != b->options.nworkers)
+		return false;
+
 	COMPARE_NODE_FIELD(relation);
 	COMPARE_NODE_FIELD(va_cols);
 
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 9eef550..dcf0353 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -178,6 +178,7 @@ static void processCASbits(int cas_bits, int location, const char *constrType,
 			   bool *deferrable, bool *initdeferred, bool *not_valid,
 			   bool *no_inherit, core_yyscan_t yyscanner);
 static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
+static VacuumOptions *makeVacOpt(VacuumOption opt, int nworkers);
 
 %}
 
@@ -228,6 +229,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	struct ImportQual	*importqual;
 	InsertStmt			*istmt;
 	VariableSetStmt		*vsetstmt;
+	VacuumOptions		*vacopts;
 	PartitionElem		*partelem;
 	PartitionSpec		*partspec;
 	PartitionRangeDatum	*partrange_datum;
@@ -292,7 +294,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 				create_extension_opt_item alter_extension_opt_item
 
 %type <ival>	opt_lock lock_type cast_context
-%type <ival>	vacuum_option_list vacuum_option_elem
+%type <vacopts>	vacuum_option_list vacuum_option_elem
 %type <boolean>	opt_or_replace
 				opt_grant_grant_option opt_grant_admin_option
 				opt_nowait opt_if_exists opt_with_data
@@ -9720,47 +9722,59 @@ cluster_index_specification:
 VacuumStmt: VACUUM opt_full opt_freeze opt_verbose
 				{
 					VacuumStmt *n = makeNode(VacuumStmt);
-					n->options = VACOPT_VACUUM;
+					VacuumOptions *vacopts = makeVacOpt(VACOPT_VACUUM, 1);
 					if ($2)
-						n->options |= VACOPT_FULL;
+						vacopts->flags |= VACOPT_FULL;
 					if ($3)
-						n->options |= VACOPT_FREEZE;
+						vacopts->flags |= VACOPT_FREEZE;
 					if ($4)
-						n->options |= VACOPT_VERBOSE;
+						vacopts->flags |= VACOPT_VERBOSE;
+
+					n->options.flags = vacopts->flags;
+					n->options.nworkers = 1;
 					n->relation = NULL;
 					n->va_cols = NIL;
 					$$ = (Node *)n;
+					pfree(vacopts);
 				}
 			| VACUUM opt_full opt_freeze opt_verbose qualified_name
 				{
 					VacuumStmt *n = makeNode(VacuumStmt);
-					n->options = VACOPT_VACUUM;
+					VacuumOptions *vacopts = makeVacOpt(VACOPT_VACUUM, 1);
 					if ($2)
-						n->options |= VACOPT_FULL;
+						vacopts->flags |= VACOPT_FULL;
 					if ($3)
-						n->options |= VACOPT_FREEZE;
+						vacopts->flags |= VACOPT_FREEZE;
 					if ($4)
-						n->options |= VACOPT_VERBOSE;
+						vacopts->flags |= VACOPT_VERBOSE;
+
+					n->options.flags = vacopts->flags;
+					n->options.nworkers = 1;
 					n->relation = $5;
 					n->va_cols = NIL;
 					$$ = (Node *)n;
+					pfree(vacopts);
 				}
 			| VACUUM opt_full opt_freeze opt_verbose AnalyzeStmt
 				{
 					VacuumStmt *n = (VacuumStmt *) $5;
-					n->options |= VACOPT_VACUUM;
+					n->options.flags |= VACOPT_VACUUM;
 					if ($2)
-						n->options |= VACOPT_FULL;
+						n->options.flags |= VACOPT_FULL;
 					if ($3)
-						n->options |= VACOPT_FREEZE;
+						n->options.flags |= VACOPT_FREEZE;
 					if ($4)
-						n->options |= VACOPT_VERBOSE;
+						n->options.flags |= VACOPT_VERBOSE;
+					n->options.nworkers = 1;
 					$$ = (Node *)n;
 				}
 			| VACUUM '(' vacuum_option_list ')'
 				{
 					VacuumStmt *n = makeNode(VacuumStmt);
-					n->options = VACOPT_VACUUM | $3;
+					VacuumOptions *vacopts = $3;
+
+					n->options.flags = vacopts->flags | VACOPT_VACUUM;
+					n->options.nworkers = vacopts->nworkers;
 					n->relation = NULL;
 					n->va_cols = NIL;
 					$$ = (Node *) n;
@@ -9768,29 +9782,52 @@ VacuumStmt: VACUUM opt_full opt_freeze opt_verbose
 			| VACUUM '(' vacuum_option_list ')' qualified_name opt_name_list
 				{
 					VacuumStmt *n = makeNode(VacuumStmt);
-					n->options = VACOPT_VACUUM | $3;
+					VacuumOptions *vacopts = $3;
+
+					n->options.flags = vacopts->flags | VACOPT_VACUUM;
+					n->options.nworkers = vacopts->nworkers;
 					n->relation = $5;
 					n->va_cols = $6;
 					if (n->va_cols != NIL)	/* implies analyze */
-						n->options |= VACOPT_ANALYZE;
+						n->options.flags |= VACOPT_ANALYZE;
 					$$ = (Node *) n;
 				}
 		;
 
 vacuum_option_list:
 			vacuum_option_elem								{ $$ = $1; }
-			| vacuum_option_list ',' vacuum_option_elem		{ $$ = $1 | $3; }
+			| vacuum_option_list ',' vacuum_option_elem
+			{
+				VacuumOptions *vacopts1 = (VacuumOptions *)$1;
+				VacuumOptions *vacopts2 = (VacuumOptions *)$3;
+
+				vacopts1->flags |= vacopts2->flags;
+				if (vacopts1->nworkers < vacopts2->nworkers)
+					vacopts1->nworkers = vacopts2->nworkers;
+
+				$$ = vacopts1;
+				pfree(vacopts2);
+			}
 		;
 
 vacuum_option_elem:
-			analyze_keyword		{ $$ = VACOPT_ANALYZE; }
-			| VERBOSE			{ $$ = VACOPT_VERBOSE; }
-			| FREEZE			{ $$ = VACOPT_FREEZE; }
-			| FULL				{ $$ = VACOPT_FULL; }
+			analyze_keyword		{ $$ = makeVacOpt(VACOPT_ANALYZE, 1); }
+			| VERBOSE			{ $$ = makeVacOpt(VACOPT_VERBOSE, 1); }
+			| FREEZE			{ $$ = makeVacOpt(VACOPT_FREEZE, 1); }
+			| FULL				{ $$ = makeVacOpt(VACOPT_FULL, 1); }
+			| PARALLEL ICONST
+				{
+					if ($2 < 1)
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("parallel vacuum degree must be more than 1"),
+								 parser_errposition(@1)));
+					$$ = makeVacOpt(VACOPT_PARALLEL, $2);
+				}
 			| IDENT
 				{
 					if (strcmp($1, "disable_page_skipping") == 0)
-						$$ = VACOPT_DISABLE_PAGE_SKIPPING;
+						$$ = makeVacOpt(VACOPT_DISABLE_PAGE_SKIPPING, 1);
 					else
 						ereport(ERROR,
 								(errcode(ERRCODE_SYNTAX_ERROR),
@@ -9798,27 +9835,36 @@ vacuum_option_elem:
 									 parser_errposition(@1)));
 				}
 		;
-
 AnalyzeStmt:
 			analyze_keyword opt_verbose
 				{
 					VacuumStmt *n = makeNode(VacuumStmt);
-					n->options = VACOPT_ANALYZE;
+					VacuumOptions *vacopts = makeVacOpt(VACOPT_ANALYZE, 1);
+
 					if ($2)
-						n->options |= VACOPT_VERBOSE;
+						vacopts->flags |= VACOPT_VERBOSE;
+
+					n->options.flags = vacopts->flags;
+					n->options.nworkers = 1;
 					n->relation = NULL;
 					n->va_cols = NIL;
 					$$ = (Node *)n;
+					pfree(vacopts);
 				}
 			| analyze_keyword opt_verbose qualified_name opt_name_list
 				{
 					VacuumStmt *n = makeNode(VacuumStmt);
-					n->options = VACOPT_ANALYZE;
+					VacuumOptions *vacopts = makeVacOpt(VACOPT_ANALYZE, 1);
+
 					if ($2)
-						n->options |= VACOPT_VERBOSE;
+						vacopts->flags |= VACOPT_VERBOSE;
+
+					n->options.flags = vacopts->flags;
+					n->options.nworkers = 1;
 					n->relation = $3;
 					n->va_cols = $4;
 					$$ = (Node *)n;
+					pfree(vacopts);
 				}
 		;
 
@@ -15284,6 +15330,16 @@ makeRecursiveViewSelect(char *relname, List *aliases, Node *query)
 	return (Node *) s;
 }
 
+static VacuumOptions *
+makeVacOpt(VacuumOption opt, int nworkers)
+{
+	VacuumOptions *vacopts = palloc(sizeof(VacuumOptions));
+
+	vacopts->flags = opt;
+	vacopts->nworkers = nworkers;
+	return vacopts;
+}
+
 /* parser_init()
  * Initialize to parse one query string
  */
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index 251b9fe..5cc683f 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -186,7 +186,7 @@ typedef struct av_relation
 typedef struct autovac_table
 {
 	Oid			at_relid;
-	int			at_vacoptions;	/* bitmask of VacuumOption */
+	VacuumOptions at_vacoptions;	/* contains bitmask of VacuumOption */
 	VacuumParams at_params;
 	int			at_vacuum_cost_delay;
 	int			at_vacuum_cost_limit;
@@ -2414,7 +2414,7 @@ do_autovacuum(void)
 			 * next table in our list.
 			 */
 			HOLD_INTERRUPTS();
-			if (tab->at_vacoptions & VACOPT_VACUUM)
+			if (tab->at_vacoptions.flags & VACOPT_VACUUM)
 				errcontext("automatic vacuum of table \"%s.%s.%s\"",
 						   tab->at_datname, tab->at_nspname, tab->at_relname);
 			else
@@ -2651,10 +2651,11 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map,
 		tab = palloc(sizeof(autovac_table));
 		tab->at_relid = relid;
 		tab->at_sharedrel = classForm->relisshared;
-		tab->at_vacoptions = VACOPT_SKIPTOAST |
+		tab->at_vacoptions.flags = VACOPT_SKIPTOAST |
 			(dovacuum ? VACOPT_VACUUM : 0) |
 			(doanalyze ? VACOPT_ANALYZE : 0) |
 			(!wraparound ? VACOPT_NOWAIT : 0);
+		tab->at_vacoptions.nworkers = 1;
 		tab->at_params.freeze_min_age = freeze_min_age;
 		tab->at_params.freeze_table_age = freeze_table_age;
 		tab->at_params.multixact_freeze_min_age = multixact_freeze_min_age;
@@ -2901,10 +2902,10 @@ autovac_report_activity(autovac_table *tab)
 	int			len;
 
 	/* Report the command and possible options */
-	if (tab->at_vacoptions & VACOPT_VACUUM)
+	if (tab->at_vacoptions.flags & VACOPT_VACUUM)
 		snprintf(activity, MAX_AUTOVAC_ACTIV_LEN,
 				 "autovacuum: VACUUM%s",
-				 tab->at_vacoptions & VACOPT_ANALYZE ? " ANALYZE" : "");
+				 tab->at_vacoptions.flags & VACOPT_ANALYZE ? " ANALYZE" : "");
 	else
 		snprintf(activity, MAX_AUTOVAC_ACTIV_LEN,
 				 "autovacuum: ANALYZE");
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 127dc86..ab435ce 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -654,7 +654,7 @@ standard_ProcessUtility(Node *parsetree,
 				VacuumStmt *stmt = (VacuumStmt *) parsetree;
 
 				/* we choose to allow this during "read only" transactions */
-				PreventCommandDuringRecovery((stmt->options & VACOPT_VACUUM) ?
+				PreventCommandDuringRecovery((stmt->options.flags & VACOPT_VACUUM) ?
 											 "VACUUM" : "ANALYZE");
 				/* forbidden in parallel mode due to CommandIsReadOnly */
 				ExecVacuum(stmt, isTopLevel);
@@ -2394,7 +2394,7 @@ CreateCommandTag(Node *parsetree)
 			break;
 
 		case T_VacuumStmt:
-			if (((VacuumStmt *) parsetree)->options & VACOPT_VACUUM)
+			if (((VacuumStmt *) parsetree)->options.flags & VACOPT_VACUUM)
 				tag = "VACUUM";
 			else
 				tag = "ANALYZE";
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 92afc32..37d6780 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -1991,7 +1991,6 @@ EstimateSnapshotSpace(Snapshot snap)
 	Size		size;
 
 	Assert(snap != InvalidSnapshot);
-	Assert(snap->satisfies == HeapTupleSatisfiesMVCC);
 
 	/* We allocate any XID arrays needed in the same palloc block. */
 	size = add_size(sizeof(SerializedSnapshotData),
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index ee7e05a..712f70e 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -131,6 +131,7 @@ extern Size heap_parallelscan_estimate(Snapshot snapshot);
 extern void heap_parallelscan_initialize(ParallelHeapScanDesc target,
 							 Relation relation, Snapshot snapshot);
 extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc);
+extern BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
 
 extern bool heap_fetch(Relation relation, Snapshot snapshot,
 		   HeapTuple tuple, Buffer *userbuf, bool keep_buf,
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 541c2fa..7fecbae 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -15,6 +15,7 @@
 #define VACUUM_H
 
 #include "access/htup.h"
+#include "access/heapam.h"
 #include "catalog/pg_statistic.h"
 #include "catalog/pg_type.h"
 #include "nodes/parsenodes.h"
@@ -158,7 +159,7 @@ extern int	vacuum_multixact_freeze_table_age;
 
 /* in commands/vacuum.c */
 extern void ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel);
-extern void vacuum(int options, RangeVar *relation, Oid relid,
+extern void vacuum(VacuumOptions options, RangeVar *relation, Oid relid,
 	   VacuumParams *params, List *va_cols,
 	   BufferAccessStrategy bstrategy, bool isTopLevel);
 extern void vac_open_indexes(Relation relation, LOCKMODE lockmode,
@@ -189,7 +190,7 @@ extern void vac_update_datfrozenxid(void);
 extern void vacuum_delay_point(void);
 
 /* in commands/vacuumlazy.c */
-extern void lazy_vacuum_rel(Relation onerel, int options,
+extern void lazy_vacuum_rel(Relation onerel, VacuumOptions options,
 				VacuumParams *params, BufferAccessStrategy bstrategy);
 
 /* in commands/analyze.c */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 7ceaa22..d19dad7 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2936,13 +2936,20 @@ typedef enum VacuumOption
 	VACOPT_FULL = 1 << 4,		/* FULL (non-concurrent) vacuum */
 	VACOPT_NOWAIT = 1 << 5,		/* don't wait to get lock (autovacuum only) */
 	VACOPT_SKIPTOAST = 1 << 6,	/* don't process the TOAST table, if any */
-	VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7		/* don't skip any pages */
+	VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7,		/* don't skip any pages */
+	VACOPT_PARALLEL = 1 << 8	/* do VACUUM parallelly */
 } VacuumOption;
 
+typedef struct VacuumOptions
+{
+	VacuumOption flags; /* OR of VacuumOption flags */
+	int nworkers; /* # of parallel vacuum workers */
+} VacuumOptions;
+
 typedef struct VacuumStmt
 {
 	NodeTag		type;
-	int			options;		/* OR of VacuumOption flags */
+	VacuumOptions	options;
 	RangeVar   *relation;		/* single table to process, or NULL */
 	List	   *va_cols;		/* list of column names, or NIL for all */
 } VacuumStmt;
diff --git a/src/test/regress/expected/vacuum.out b/src/test/regress/expected/vacuum.out
index 9b604be..bc83323 100644
--- a/src/test/regress/expected/vacuum.out
+++ b/src/test/regress/expected/vacuum.out
@@ -80,5 +80,6 @@ CONTEXT:  SQL function "do_analyze" statement 1
 SQL function "wrap_do_analyze" statement 1
 VACUUM FULL vactst;
 VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
+VACUUM (PARALLEL 2) vactst;
 DROP TABLE vaccluster;
 DROP TABLE vactst;
diff --git a/src/test/regress/sql/vacuum.sql b/src/test/regress/sql/vacuum.sql
index 7b819f6..46355ec 100644
--- a/src/test/regress/sql/vacuum.sql
+++ b/src/test/regress/sql/vacuum.sql
@@ -61,6 +61,7 @@ VACUUM FULL vaccluster;
 VACUUM FULL vactst;
 
 VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
+VACUUM (PARALLEL 2) vactst;
 
 DROP TABLE vaccluster;
 DROP TABLE vactst;
