On Fri, Oct 25, 2019 at 2:06 PM Dilip Kumar <dilipbal...@gmail.com> wrote:
>
> On Fri, Oct 25, 2019 at 10:22 AM Masahiko Sawada <sawada.m...@gmail.com> 
> wrote:
> >
> > For more detail of my idea it is that the first worker who entered to
> > vacuum_delay_point adds its local value to shared value and reset the
> > local value to 0. And then the worker sleeps if it exceeds
> > VacuumCostLimit but before sleeping it can subtract VacuumCostLimit
> > from the shared value. Since vacuum_delay_point are typically called
> > per page processed I expect there will not such problem. Thoughts?
>
> Oh right, I assumed that when the local balance is exceeding the
> VacuumCostLimit that time you are adding it to the shared value but
> you are adding it to to shared value every time in vacuum_delay_point.
> So I think your idea is correct.

I've attached the updated patch set.

First three patches add new variables and a callback to index AM.

Next two patches are the main part to support parallel vacuum. I've
incorporated all review comments I got so far. The memory layout of
variable-length index statistics might be complex a bit. It's similar
to the format of heap tuple header, having a null bitmap. And both the
size of index statistics and actual data for each indexes follows.

Last patch is a PoC patch that implements the shared vacuum cost
balance. For now it's separated but after testing both approaches it
will be merged to 0004 patch. I'll test both next week.

This patch set can be applied on top of the patch[1] that improves
gist index bulk-deletion. So canparallelvacuum of gist index is true.

[1] 
https://www.postgresql.org/message-id/CAFiTN-uQY%2BB%2BCLb8W3YYdb7XmB9hyYFXkAy3C7RY%3D-YSWRV1DA%40mail.gmail.com

Regards,

--
Masahiko Sawada
From bf8e8ae5ded91327d504a19227c96378d3d0b513 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.m...@gmail.com>
Date: Thu, 24 Oct 2019 14:51:16 +0900
Subject: [PATCH v31 2/6] Add an index AM callback to estimate DSM for parallel
 vacuum

---
 contrib/bloom/blutils.c                       |  1 +
 doc/src/sgml/indexam.sgml                     | 17 +++++++++++
 src/backend/access/brin/brin.c                |  1 +
 src/backend/access/gin/ginutil.c              |  1 +
 src/backend/access/gist/gist.c                |  1 +
 src/backend/access/hash/hash.c                |  1 +
 src/backend/access/index/indexam.c            | 29 +++++++++++++++++++
 src/backend/access/nbtree/nbtree.c            |  1 +
 src/backend/access/spgist/spgutils.c          |  1 +
 src/include/access/amapi.h                    |  9 ++++++
 src/include/access/genam.h                    |  1 +
 .../modules/dummy_index_am/dummy_index_am.c   |  1 +
 12 files changed, 64 insertions(+)

diff --git a/contrib/bloom/blutils.c b/contrib/bloom/blutils.c
index 98163c81bd..9ef14a47f3 100644
--- a/contrib/bloom/blutils.c
+++ b/contrib/bloom/blutils.c
@@ -145,6 +145,7 @@ blhandler(PG_FUNCTION_ARGS)
 	amroutine->amestimateparallelscan = NULL;
 	amroutine->aminitparallelscan = NULL;
 	amroutine->amparallelrescan = NULL;
+	amroutine->amestimateparallelvacuum = NULL;
 
 	PG_RETURN_POINTER(amroutine);
 }
diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml
index fa5682db04..c3d2352d0f 100644
--- a/doc/src/sgml/indexam.sgml
+++ b/doc/src/sgml/indexam.sgml
@@ -151,6 +151,9 @@ typedef struct IndexAmRoutine
     amestimateparallelscan_function amestimateparallelscan;    /* can be NULL */
     aminitparallelscan_function aminitparallelscan;    /* can be NULL */
     amparallelrescan_function amparallelrescan;    /* can be NULL */
+
+    /* interface functions to support parallel vacuum */
+    amestimateparallelvacuum_function amestimateparallelvacuum; /* can be NULL */
 } IndexAmRoutine;
 </programlisting>
   </para>
@@ -733,6 +736,20 @@ amparallelrescan (IndexScanDesc scan);
    the beginning.
   </para>
 
+  <para>
+<programlisting>
+void
+amestimateparallelvacuum (IndexScanDesc scan);
+</programlisting>
+   Estimate and return the number of bytes of dynamic shared memory which the
+   access method will be needed to copy the statistics to.
+  </para>
+
+  <para>
+   It is not necessary to implement this function for access methods which
+   do not support parallel vacuum or in cases where the access method does not
+   require more than size of <structname>IndexBulkDeleteResult</structname>.
+  </para>
  </sect1>
 
  <sect1 id="index-scanning">
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 6ea48fb555..4045f5eacf 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -125,6 +125,7 @@ brinhandler(PG_FUNCTION_ARGS)
 	amroutine->amestimateparallelscan = NULL;
 	amroutine->aminitparallelscan = NULL;
 	amroutine->amparallelrescan = NULL;
+	amroutine->amestimateparallelvacuum = NULL;
 
 	PG_RETURN_POINTER(amroutine);
 }
diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c
index 0c33809c83..9832f651ef 100644
--- a/src/backend/access/gin/ginutil.c
+++ b/src/backend/access/gin/ginutil.c
@@ -77,6 +77,7 @@ ginhandler(PG_FUNCTION_ARGS)
 	amroutine->amestimateparallelscan = NULL;
 	amroutine->aminitparallelscan = NULL;
 	amroutine->amparallelrescan = NULL;
+	amroutine->amestimateparallelvacuum = NULL;
 
 	PG_RETURN_POINTER(amroutine);
 }
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 0363bf814a..88b1e839b3 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -99,6 +99,7 @@ gisthandler(PG_FUNCTION_ARGS)
 	amroutine->amestimateparallelscan = NULL;
 	amroutine->aminitparallelscan = NULL;
 	amroutine->amparallelrescan = NULL;
+	amroutine->amestimateparallelvacuum = NULL;
 
 	PG_RETURN_POINTER(amroutine);
 }
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index f21d9ac78f..3666318064 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -98,6 +98,7 @@ hashhandler(PG_FUNCTION_ARGS)
 	amroutine->amestimateparallelscan = NULL;
 	amroutine->aminitparallelscan = NULL;
 	amroutine->amparallelrescan = NULL;
+	amroutine->amestimateparallelvacuum = NULL;
 
 	PG_RETURN_POINTER(amroutine);
 }
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index 9dfa0ddfbb..5238b9d38f 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -711,6 +711,35 @@ index_vacuum_cleanup(IndexVacuumInfo *info,
 	return indexRelation->rd_indam->amvacuumcleanup(info, stats);
 }
 
+/*
+ * index_parallelvacuum_estimate - estimate shared memory for parallel vacuum
+ *
+ * Currently, we don't pass any information to the AM-specific estimator,
+ * so it can probably only return a constant.  In the future, we might need
+ * to pass more information.
+ */
+Size
+index_parallelvacuum_estimate(Relation indexRelation)
+{
+	Size		nbytes;
+
+	RELATION_CHECKS;
+
+	/*
+	 * If amestimateparallelvacuum is not provided, assume only
+	 * IndexBulkDeleteResult is needed.
+	 */
+	if (indexRelation->rd_indam->amestimateparallelvacuum != NULL)
+	{
+		nbytes = indexRelation->rd_indam->amestimateparallelvacuum();
+		Assert(nbytes >= MAXALIGN(sizeof(IndexBulkDeleteResult)));
+	}
+	else
+		nbytes = MAXALIGN(sizeof(IndexBulkDeleteResult));
+
+	return nbytes;
+}
+
 /* ----------------
  *		index_can_return
  *
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index e885aadc21..f1db77886c 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -147,6 +147,7 @@ bthandler(PG_FUNCTION_ARGS)
 	amroutine->amestimateparallelscan = btestimateparallelscan;
 	amroutine->aminitparallelscan = btinitparallelscan;
 	amroutine->amparallelrescan = btparallelrescan;
+	amroutine->amestimateparallelvacuum = NULL;
 
 	PG_RETURN_POINTER(amroutine);
 }
diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c
index 0c86b63f65..ff66c3ac6c 100644
--- a/src/backend/access/spgist/spgutils.c
+++ b/src/backend/access/spgist/spgutils.c
@@ -80,6 +80,7 @@ spghandler(PG_FUNCTION_ARGS)
 	amroutine->amestimateparallelscan = NULL;
 	amroutine->aminitparallelscan = NULL;
 	amroutine->amparallelrescan = NULL;
+	amroutine->amestimateparallelvacuum = NULL;
 
 	PG_RETURN_POINTER(amroutine);
 }
diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h
index f7d2a1b7e3..549912c1c9 100644
--- a/src/include/access/amapi.h
+++ b/src/include/access/amapi.h
@@ -156,6 +156,12 @@ typedef void (*aminitparallelscan_function) (void *target);
 /* (re)start parallel index scan */
 typedef void (*amparallelrescan_function) (IndexScanDesc scan);
 
+/*
+ * Callback function signatures - for parallel index vacuuming.
+ */
+/* estimate size of parallel index vacuuming memory */
+typedef Size (*amestimateparallelvacuum_function) (void);
+
 /*
  * API struct for an index AM.  Note this must be stored in a single palloc'd
  * chunk of memory.
@@ -232,6 +238,9 @@ typedef struct IndexAmRoutine
 	amestimateparallelscan_function amestimateparallelscan; /* can be NULL */
 	aminitparallelscan_function aminitparallelscan; /* can be NULL */
 	amparallelrescan_function amparallelrescan; /* can be NULL */
+
+	/* interface functions to support parallel vacuum */
+	amestimateparallelvacuum_function amestimateparallelvacuum; /* can be NULL */
 } IndexAmRoutine;
 
 
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index a813b004be..48ed5bbac7 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -179,6 +179,7 @@ extern IndexBulkDeleteResult *index_bulk_delete(IndexVacuumInfo *info,
 												void *callback_state);
 extern IndexBulkDeleteResult *index_vacuum_cleanup(IndexVacuumInfo *info,
 												   IndexBulkDeleteResult *stats);
+extern Size index_parallelvacuum_estimate(Relation indexRelation);
 extern bool index_can_return(Relation indexRelation, int attno);
 extern RegProcedure index_getprocid(Relation irel, AttrNumber attnum,
 									uint16 procnum);
diff --git a/src/test/modules/dummy_index_am/dummy_index_am.c b/src/test/modules/dummy_index_am/dummy_index_am.c
index f12eefbb24..c90405a23b 100644
--- a/src/test/modules/dummy_index_am/dummy_index_am.c
+++ b/src/test/modules/dummy_index_am/dummy_index_am.c
@@ -325,6 +325,7 @@ dihandler(PG_FUNCTION_ARGS)
 	amroutine->amestimateparallelscan = NULL;
 	amroutine->aminitparallelscan = NULL;
 	amroutine->amparallelrescan = NULL;
+	amroutine->amestimateparallelvacuum = NULL;
 
 	PG_RETURN_POINTER(amroutine);
 }
-- 
2.22.0

From a1fa1f544ecbdc529254c021f139776f6cacbae0 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.m...@gmail.com>
Date: Thu, 24 Oct 2019 16:30:02 +0900
Subject: [PATCH v31 3/6] Add an index AM field to check if use
 maintenance_work_mem

---
 contrib/bloom/blutils.c                          | 1 +
 doc/src/sgml/indexam.sgml                        | 2 ++
 src/backend/access/brin/brin.c                   | 1 +
 src/backend/access/gin/ginutil.c                 | 1 +
 src/backend/access/gist/gist.c                   | 1 +
 src/backend/access/hash/hash.c                   | 1 +
 src/backend/access/nbtree/nbtree.c               | 1 +
 src/backend/access/spgist/spgutils.c             | 1 +
 src/include/access/amapi.h                       | 2 ++
 src/test/modules/dummy_index_am/dummy_index_am.c | 1 +
 10 files changed, 12 insertions(+)

diff --git a/contrib/bloom/blutils.c b/contrib/bloom/blutils.c
index 9ef14a47f3..d50122d9e2 100644
--- a/contrib/bloom/blutils.c
+++ b/contrib/bloom/blutils.c
@@ -122,6 +122,7 @@ blhandler(PG_FUNCTION_ARGS)
 	amroutine->amcanparallel = false;
 	amroutine->amcanparallelvacuum = true;
 	amroutine->amcaninclude = false;
+	amroutine->amusemaintenanceworkmem = false;
 	amroutine->amkeytype = InvalidOid;
 
 	amroutine->ambuild = blbuild;
diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml
index c3d2352d0f..df4cad11b3 100644
--- a/doc/src/sgml/indexam.sgml
+++ b/doc/src/sgml/indexam.sgml
@@ -124,6 +124,8 @@ typedef struct IndexAmRoutine
     bool        amcanparallelvacuum;
     /* does AM support columns included with clause INCLUDE? */
     bool        amcaninclude;
+    /* does AM use maintenance_work_mem? */
+    bool        amusemaintenanceworkmem;
     /* type of data stored in index, or InvalidOid if variable */
     Oid         amkeytype;
 
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 4045f5eacf..4a3286bfde 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -102,6 +102,7 @@ brinhandler(PG_FUNCTION_ARGS)
 	amroutine->amcanparallel = false;
 	amroutine->amcanparallelvacuum = true;
 	amroutine->amcaninclude = false;
+	amroutine->amusemaintenanceworkmem = false;
 	amroutine->amkeytype = InvalidOid;
 
 	amroutine->ambuild = brinbuild;
diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c
index 9832f651ef..a28a71999d 100644
--- a/src/backend/access/gin/ginutil.c
+++ b/src/backend/access/gin/ginutil.c
@@ -54,6 +54,7 @@ ginhandler(PG_FUNCTION_ARGS)
 	amroutine->amcanparallel = false;
 	amroutine->amcanparallelvacuum = true;
 	amroutine->amcaninclude = false;
+	amroutine->amusemaintenanceworkmem = true;
 	amroutine->amkeytype = InvalidOid;
 
 	amroutine->ambuild = ginbuild;
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 88b1e839b3..752b5bc88c 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -76,6 +76,7 @@ gisthandler(PG_FUNCTION_ARGS)
 	amroutine->amcanparallel = false;
 	amroutine->amcanparallelvacuum = true;
 	amroutine->amcaninclude = true;
+	amroutine->amusemaintenanceworkmem = false;
 	amroutine->amkeytype = InvalidOid;
 
 	amroutine->ambuild = gistbuild;
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index 3666318064..dc0dc312ef 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -75,6 +75,7 @@ hashhandler(PG_FUNCTION_ARGS)
 	amroutine->amcanparallel = false;
 	amroutine->amcanparallelvacuum = true;
 	amroutine->amcaninclude = false;
+	amroutine->amusemaintenanceworkmem = false;
 	amroutine->amkeytype = INT4OID;
 
 	amroutine->ambuild = hashbuild;
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index f1db77886c..1ea2ba3fe0 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -124,6 +124,7 @@ bthandler(PG_FUNCTION_ARGS)
 	amroutine->amcanparallel = true;
 	amroutine->amcanparallelvacuum = true;
 	amroutine->amcaninclude = true;
+	amroutine->amusemaintenanceworkmem = false;
 	amroutine->amkeytype = InvalidOid;
 
 	amroutine->ambuild = btbuild;
diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c
index ff66c3ac6c..4a1689859a 100644
--- a/src/backend/access/spgist/spgutils.c
+++ b/src/backend/access/spgist/spgutils.c
@@ -57,6 +57,7 @@ spghandler(PG_FUNCTION_ARGS)
 	amroutine->amcanparallel = false;
 	amroutine->amcanparallelvacuum = true;
 	amroutine->amcaninclude = false;
+	amroutine->amusemaintenanceworkmem = false;
 	amroutine->amkeytype = InvalidOid;
 
 	amroutine->ambuild = spgbuild;
diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h
index 549912c1c9..d166350bbe 100644
--- a/src/include/access/amapi.h
+++ b/src/include/access/amapi.h
@@ -205,6 +205,8 @@ typedef struct IndexAmRoutine
 	bool		amcanparallelvacuum;
 	/* does AM support columns included with clause INCLUDE? */
 	bool		amcaninclude;
+	/* does AM use maintenance_work_mem? */
+	bool		amusemaintenanceworkmem;
 	/* type of data stored in index, or InvalidOid if variable */
 	Oid			amkeytype;
 
diff --git a/src/test/modules/dummy_index_am/dummy_index_am.c b/src/test/modules/dummy_index_am/dummy_index_am.c
index c90405a23b..374d545f0d 100644
--- a/src/test/modules/dummy_index_am/dummy_index_am.c
+++ b/src/test/modules/dummy_index_am/dummy_index_am.c
@@ -302,6 +302,7 @@ dihandler(PG_FUNCTION_ARGS)
 	amroutine->amcanparallel = false;
 	amroutine->amcanparallelvacuum = false;
 	amroutine->amcaninclude = false;
+	amroutine->amusemaintenanceworkmem = false;
 	amroutine->amkeytype = InvalidOid;
 
 	amroutine->ambuild = dibuild;
-- 
2.22.0

From 0eedb72e6fd99e4dee95b3fb43430823f313ca97 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.m...@gmail.com>
Date: Wed, 23 Jan 2019 16:07:53 +0900
Subject: [PATCH v31 5/6] Add --paralell, -P option to vacuumdb command

---
 doc/src/sgml/ref/vacuumdb.sgml    | 16 +++++++++++
 src/bin/scripts/t/100_vacuumdb.pl | 10 ++++++-
 src/bin/scripts/vacuumdb.c        | 48 ++++++++++++++++++++++++++++++-
 3 files changed, 72 insertions(+), 2 deletions(-)

diff --git a/doc/src/sgml/ref/vacuumdb.sgml b/doc/src/sgml/ref/vacuumdb.sgml
index 47d93456f8..f6ac0c6e5a 100644
--- a/doc/src/sgml/ref/vacuumdb.sgml
+++ b/doc/src/sgml/ref/vacuumdb.sgml
@@ -226,6 +226,22 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>-P <replaceable class="parameter">workers</replaceable></option></term>
+      <term><option>--parallel=<replaceable class="parameter">workers</replaceable></option></term>
+      <listitem>
+       <para>
+        Execute parallel vacuum with <productname>PostgreSQL</productname>'s
+        <replaceable class="parameter">workers</replaceable> background workers.
+       </para>
+       <para>
+        This option will require background workers, so make sure your
+        <xref linkend="guc-max-parallel-workers-maintenance"/> setting is more
+        than one.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-q</option></term>
       <term><option>--quiet</option></term>
diff --git a/src/bin/scripts/t/100_vacuumdb.pl b/src/bin/scripts/t/100_vacuumdb.pl
index b685b35282..8fe80719e8 100644
--- a/src/bin/scripts/t/100_vacuumdb.pl
+++ b/src/bin/scripts/t/100_vacuumdb.pl
@@ -3,7 +3,7 @@ use warnings;
 
 use PostgresNode;
 use TestLib;
-use Test::More tests => 44;
+use Test::More tests => 48;
 
 program_help_ok('vacuumdb');
 program_version_ok('vacuumdb');
@@ -48,6 +48,14 @@ $node->issues_sql_like(
 $node->command_fails(
 	[ 'vacuumdb', '--analyze-only', '--disable-page-skipping', 'postgres' ],
 	'--analyze-only and --disable-page-skipping specified together');
+$node->issues_sql_like(
+	[ 'vacuumdb', '-P2', 'postgres' ],
+	qr/statement: VACUUM \(PARALLEL 2\).*;/,
+	'vacuumdb -P2');
+$node->issues_sql_like(
+	[ 'vacuumdb', '-P', 'postgres' ],
+	qr/statement: VACUUM \(PARALLEL\).*;/,
+	'vacuumdb -P');
 $node->command_ok([qw(vacuumdb -Z --table=pg_am dbname=template1)],
 	'vacuumdb with connection string');
 
diff --git a/src/bin/scripts/vacuumdb.c b/src/bin/scripts/vacuumdb.c
index 2c7219239f..63bf66a70b 100644
--- a/src/bin/scripts/vacuumdb.c
+++ b/src/bin/scripts/vacuumdb.c
@@ -34,6 +34,8 @@ typedef struct vacuumingOptions
 	bool		skip_locked;
 	int			min_xid_age;
 	int			min_mxid_age;
+	int			parallel_workers;	/* -1 disables, 0 for choosing based on the
+									 * number of indexes */
 } vacuumingOptions;
 
 
@@ -86,6 +88,7 @@ main(int argc, char *argv[])
 		{"full", no_argument, NULL, 'f'},
 		{"verbose", no_argument, NULL, 'v'},
 		{"jobs", required_argument, NULL, 'j'},
+		{"parallel", optional_argument, NULL, 'P'},
 		{"maintenance-db", required_argument, NULL, 2},
 		{"analyze-in-stages", no_argument, NULL, 3},
 		{"disable-page-skipping", no_argument, NULL, 4},
@@ -115,6 +118,7 @@ main(int argc, char *argv[])
 
 	/* initialize options to all false */
 	memset(&vacopts, 0, sizeof(vacopts));
+	vacopts.parallel_workers = -1;
 
 	pg_logging_init(argv[0]);
 	progname = get_progname(argv[0]);
@@ -122,7 +126,7 @@ main(int argc, char *argv[])
 
 	handle_help_version_opts(argc, argv, "vacuumdb", help);
 
-	while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fvj:", long_options, &optindex)) != -1)
+	while ((c = getopt_long(argc, argv, "h:p:P::U:wWeqd:zZFat:fvj:", long_options, &optindex)) != -1)
 	{
 		switch (c)
 		{
@@ -182,6 +186,24 @@ main(int argc, char *argv[])
 					exit(1);
 				}
 				break;
+			case 'P':
+				{
+					int parallel_workers = 0;
+
+					if (optarg != NULL)
+					{
+						parallel_workers = atoi(optarg);
+						if (parallel_workers <= 0)
+						{
+							pg_log_error("number of parallel workers must be at least 1");
+							exit(1);
+						}
+					}
+
+					/* allow to set 0, meaning PARALLEL without the parallel degree */
+					vacopts.parallel_workers = parallel_workers;
+					break;
+				}
 			case 2:
 				maintenance_db = pg_strdup(optarg);
 				break;
@@ -254,9 +276,22 @@ main(int argc, char *argv[])
 						 "disable-page-skipping");
 			exit(1);
 		}
+		if (vacopts.parallel_workers >= 0)
+		{
+			pg_log_error("cannot use the \"%s\" option when performing only analyze",
+						 "parallel");
+			exit(1);
+		}
 		/* allow 'and_analyze' with 'analyze_only' */
 	}
 
+	if (vacopts.full && vacopts.parallel_workers >= 0)
+	{
+		pg_log_error("cannot use the \"%s\" option with \"%s\" option",
+					 "full", "parallel");
+		exit(1);
+	}
+
 	setup_cancel_handler();
 
 	/* Avoid opening extra connections. */
@@ -822,6 +857,16 @@ prepare_vacuum_command(PQExpBuffer sql, int serverVersion,
 				appendPQExpBuffer(sql, "%sANALYZE", sep);
 				sep = comma;
 			}
+			if (vacopts->parallel_workers > 0)
+			{
+				appendPQExpBuffer(sql, "%sPARALLEL %d", sep, vacopts->parallel_workers);
+				sep = comma;
+			}
+			if (vacopts->parallel_workers == 0)
+			{
+				appendPQExpBuffer(sql, "%sPARALLEL", sep);
+				sep = comma;
+			}
 			if (sep != paren)
 				appendPQExpBufferChar(sql, ')');
 		}
@@ -885,6 +930,7 @@ help(const char *progname)
 	printf(_("  -j, --jobs=NUM                  use this many concurrent connections to vacuum\n"));
 	printf(_("      --min-mxid-age=MXID_AGE     minimum multixact ID age of tables to vacuum\n"));
 	printf(_("      --min-xid-age=XID_AGE       minimum transaction ID age of tables to vacuum\n"));
+	printf(_("  -P, --parallel[=NUM]            do parallel vacuuming\n"));
 	printf(_("  -q, --quiet                     don't write any messages\n"));
 	printf(_("      --skip-locked               skip relations that cannot be immediately locked\n"));
 	printf(_("  -t, --table='TABLE[(COLUMNS)]'  vacuum specific table(s) only\n"));
-- 
2.22.0

From 9dcbbbcfaafeef51ff5ea069124265dd9ad572e6 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.m...@gmail.com>
Date: Fri, 25 Oct 2019 22:47:41 +0900
Subject: [PATCH v31 4/6] Add parallel option to VACUUM command

This change adds PARALLEL option to VACUUM command that enable us to
perform index vacuuming and index cleanup with background
workers. Individual indexes is processed by one vacuum
process. Therefore parallel vacuum can be used when the table has at
least two indexes and it cannot specify larger parallel degree than
the number of indexes that the table has.

The parallel degree is either specified by user or determined based on
the number of indexes that the table has, and further limited by
max_parallel_maintenance_workers. The table size and index size don't
affect it.
---
 doc/src/sgml/config.sgml              |   14 +-
 doc/src/sgml/ref/vacuum.sgml          |   45 ++
 src/backend/access/heap/vacuumlazy.c  | 1049 ++++++++++++++++++++++---
 src/backend/access/transam/parallel.c |    4 +
 src/backend/commands/vacuum.c         |   45 ++
 src/backend/postmaster/autovacuum.c   |    2 +
 src/bin/psql/tab-complete.c           |    2 +-
 src/include/access/heapam.h           |    3 +
 src/include/commands/vacuum.h         |    5 +
 src/test/regress/expected/vacuum.out  |   14 +
 src/test/regress/sql/vacuum.sql       |   10 +
 11 files changed, 1084 insertions(+), 109 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 886632ff43..335a0ec752 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2265,13 +2265,13 @@ include_dir 'conf.d'
        <listitem>
         <para>
          Sets the maximum number of parallel workers that can be
-         started by a single utility command.  Currently, the only
-         parallel utility command that supports the use of parallel
-         workers is <command>CREATE INDEX</command>, and only when
-         building a B-tree index.  Parallel workers are taken from the
-         pool of processes established by <xref
-         linkend="guc-max-worker-processes"/>, limited by <xref
-         linkend="guc-max-parallel-workers"/>.  Note that the requested
+         started by a single utility command.  Currently, the parallel
+         utility commands that support the use of parallel workers are
+         <command>CREATE INDEX</command> only when building a B-tree index,
+         and <command>VACUUM</command> without <literal>FULL</literal>
+         option. Parallel workers are taken from the pool of processes
+         established by <xref linkend="guc-max-worker-processes"/>, limited
+         by <xref linkend="guc-max-parallel-workers"/>.  Note that the requested
          number of workers may not actually be available at run time.
          If this occurs, the utility operation will run with fewer
          workers than expected.  The default value is 2.  Setting this
diff --git a/doc/src/sgml/ref/vacuum.sgml b/doc/src/sgml/ref/vacuum.sgml
index f9b0fb8794..ae086b976b 100644
--- a/doc/src/sgml/ref/vacuum.sgml
+++ b/doc/src/sgml/ref/vacuum.sgml
@@ -34,6 +34,7 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
     SKIP_LOCKED [ <replaceable class="parameter">boolean</replaceable> ]
     INDEX_CLEANUP [ <replaceable class="parameter">boolean</replaceable> ]
     TRUNCATE [ <replaceable class="parameter">boolean</replaceable> ]
+    PARALLEL [ <replaceable class="parameter">integer</replaceable> ]
 
 <phrase>and <replaceable class="parameter">table_and_columns</replaceable> is:</phrase>
 
@@ -223,6 +224,32 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>PARALLEL</literal></term>
+    <listitem>
+     <para>
+      Perform vacuum index and cleanup index phases of <command>VACUUM</command>
+      in parallel using <replaceable class="parameter">integer</replaceable>
+      background workers (for the detail of each vacuum phases, please
+      refer to <xref linkend="vacuum-phases"/>). If the parallel degree
+      <replaceable class="parameter">integer</replaceable> is omitted,
+      then <command>VACUUM</command> decides the number of workers based
+      on number of indexes that support parallel vacuum operation on the
+      relation which is further limited by
+      <xref linkend="guc-max-parallel-workers-maintenance"/>. Please note
+      that it is not guaranteed that the number of parallel worker specified
+      in <replaceable class="parameter">integer</replaceable> will be used
+      during execution. It is possible for a vacuum to run with fewer workers
+      than specified, or even with no workers at all. Only one worker can
+      be used per index. So parallel workers are launched only when there
+      are at least <literal>2</literal> indexes in the table. Workers for
+      vacuum launches before starting each phases and exit at the end of
+      the phase. These behaviors might change in a future release. This
+      option can not use with <literal>FULL</literal> option.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><replaceable class="parameter">boolean</replaceable></term>
     <listitem>
@@ -237,6 +264,18 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><replaceable class="parameter">integer</replaceable></term>
+    <listitem>
+     <para>
+      Specifies a positive integer value passed to the selected option.
+      The <replaceable class="parameter">integer</replaceable> value can
+      also be omitted, in which case the default value of the selected
+      option is used.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><replaceable class="parameter">table_name</replaceable></term>
     <listitem>
@@ -316,6 +355,12 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
     more than a plain <command>VACUUM</command> would.
    </para>
 
+   <para>
+     The <option>PARALLEL</option> option is used for only vacuum purpose.
+     Even if this option is specified with <option>ANALYZE</option> option
+     it does not affect <option>ANALYZE</option>.
+   </para>
+
    <para>
     <command>VACUUM</command> causes a substantial increase in I/O traffic,
     which might cause poor performance for other active sessions.  Therefore,
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index a3c4a1df3b..02040c837e 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -22,6 +22,21 @@
  * of index scans performed.  So we don't use maintenance_work_mem memory for
  * the TID array, just enough to hold as many heap tuples as fit on one page.
  *
+ * Lazy vacuum supports parallel execution with parallel worker processes.  In
+ * parallel lazy vacuum, we perform both index vacuuming and index cleanup with
+ * parallel worker processes.  Individual indexes are processed by one vacuum
+ * process.  At the beginning of lazy vacuum (at lazy_scan_heap) we prepare the
+ * parallel context and initialize the DSM segment that contains shared information
+ * as well as the memory space for storing dead tuples.  When starting either
+ * index vacuuming or index cleanup, we launch parallel worker processes.  Once
+ * all indexes are processed the parallel worker processes exit.  And then the
+ * leader process re-initializes the parallel context while keeping recorded
+ * dead tuples so that the leader can launch parallel workers again in the next
+ * time.  Note that all parallel workers live during either index vacuuming or
+ * index cleanup but the leader process neither exits from the parallel mode
+ * nor destroys the parallel context.  For updating the index statistics, since
+ * any updates are not allowed during parallel mode we update the index
+ * statistics after exited from the parallel mode.
  *
  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -36,13 +51,16 @@
 
 #include <math.h>
 
+#include "access/amapi.h"
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/heapam_xlog.h"
 #include "access/htup_details.h"
 #include "access/multixact.h"
+#include "access/parallel.h"
 #include "access/transam.h"
 #include "access/visibilitymap.h"
+#include "access/xact.h"
 #include "access/xlog.h"
 #include "catalog/storage.h"
 #include "commands/dbcommands.h"
@@ -55,6 +73,7 @@
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #include "storage/lmgr.h"
+#include "tcop/tcopprot.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/pg_rusage.h"
@@ -110,6 +129,139 @@
  */
 #define PREFETCH_SIZE			((BlockNumber) 32)
 
+/*
+ * DSM keys for parallel lazy vacuum. Unlike other parallel execution code,
+ * since we don't need to worry about DSM keys conflicting with plan_node_id
+ * we can use small integers.
+ */
+#define PARALLEL_VACUUM_KEY_SHARED			1
+#define PARALLEL_VACUUM_KEY_DEAD_TUPLES		2
+#define PARALLEL_VACUUM_KEY_QUERY_TEXT		3
+
+/*
+ * PARALLEL_VACUUM_DISABLE_LEADER_PARTICIPATION disables the leader's
+ * participation in parallel lazy vacuum.  This may be useful as a debugging
+ * aid.
+#undef PARALLEL_VACUUM_DISABLE_LEADER_PARTICIPATION
+ */
+
+/*
+ * Macro to check if we are in a parallel lazy vacuum. If true, we are
+ * in the parallel mode and prepared the DSM segment.
+ */
+#define ParallelVacuumIsActive(lps) (((LVParallelState *) (lps)) != NULL)
+
+/*
+ * LVDeadTuples stores the dead tuple TIDs collected during heap scan.
+ * This is allocated in the DSM segment when parallel lazy vacuum
+ * mode, otherwise allocated in a local memory.
+ */
+typedef struct LVDeadTuples
+{
+	int			max_tuples;	/* # slots allocated in array */
+	int			num_tuples;	/* current # of entries */
+	/* List of TIDs of tuples we intend to delete */
+	/* NB: this list is ordered by TID address */
+	ItemPointerData itemptrs[FLEXIBLE_ARRAY_MEMBER];	/* array of ItemPointerData */
+} LVDeadTuples;
+#define SizeOfLVDeadTuples offsetof(LVDeadTuples, itemptrs) + sizeof(ItemPointerData)
+
+/*
+ * Shared information among parallel workers. So this is allocated in
+ * the DSM segment.
+ */
+typedef struct LVShared
+{
+	/*
+	 * Target table relid and log level. These fields are not modified
+	 * during the lazy vacuum.
+	 */
+	Oid		relid;
+	int		elevel;
+
+	/*
+	 * An indication for vacuum workers of doing either index vacuuming or
+	 * index cleanup.
+	 */
+	bool	for_cleanup;
+
+	/*
+	 * Fields for both index vacuuming and index cleanup.
+	 *
+	 * reltuples is the total number of input heap tuples.  We set either
+	 * an old live tuples in index vacuuming case or the new live tuples in
+	 * index cleanup case.
+	 *
+	 * estimated_count is true if the reltuples is estimated value.
+	 */
+	double	reltuples;
+	bool	estimated_count;
+
+
+	/*
+	 * In single process lazy vacuum we could consume more memory during
+	 * index vacuuming or cleanup apart from the memory for heap scanning
+	 * if an index consume memory during ambulkdelete and amvacuumcleanup.
+	 * In parallel index vacuuming, since individual vacuum workers
+	 * consumes memory we set the new maitenance_work_mem for each workers
+	 * to not consume more memory than single process lazy vacuum.
+	 */
+	int		maintenance_work_mem_worker;
+
+	/* The number of indexes that do NOT support parallel index vacuuming */
+	int		nindexes_nonparallel;
+
+	/*
+	 * Variables to control parallel index vacuuming.  Index statistics
+	 * returned from ambulkdelete and amvacuumcleanup is nullable variable
+	 * length.  'offset' is NULL bitmap. Note that a 0 indicates a null,
+	 * while 1 indicates non-null.  The index statistics follows at end of
+	 * struct.
+	 */
+	pg_atomic_uint32	nprocessed;	/* counter for vacuuming and clean up */
+	uint32				offset;		/* sizeof header incl. bitmap */
+	bits8				bitmap[FLEXIBLE_ARRAY_MEMBER];	 /* bit map of NULLs */
+
+	/* Shared index statistics data follows at end of struct */
+} LVShared;
+#define SizeOfLVShared offsetof(LVShared, bitmap) + sizeof(bits8)
+#define GetSharedIndStats(s) \
+	((LVSharedIndStats *)((char *)(s) + ((LVShared *)(s))->offset))
+#define IndStatsIsNull(s, i) \
+	(!(((LVShared *)(s))->bitmap[(i) >> 3] & (1 << ((i) & 0x07))))
+
+/*
+ * Struct for an index bulk-deletion statistic used for parallel lazy
+ * vacuum. This is allocated in the DSM segment.  IndexBulkDeleteResult
+ * follows at end of struct.
+ */
+typedef struct LVSharedIndStats
+{
+	Size	size;
+	bool	updated;	/* are the stats updated */
+
+	/* Index bulk-deletion result data follows at end of struct */
+} LVSharedIndStats;
+#define SizeOfSharedIndStats(s) \
+	(sizeof(LVSharedIndStats) + ((LVSharedIndStats *)(s))->size)
+#define GetIndexBulkDeleteResult(s) \
+	((IndexBulkDeleteResult *)((char *)(s) + sizeof(LVSharedIndStats)))
+
+/* Struct for parallel lazy vacuum */
+typedef struct LVParallelState
+{
+	ParallelContext	*pcxt;
+
+	/* Shared information among parallel vacuum workers */
+	LVShared		*lvshared;
+
+	/*
+	 * Always true except for a debugging case where
+	 * PARALLEL_VACUUM_DISABLE_LEADER_PARTICIPATION are defined.
+	 */
+	bool			leaderparticipates;
+} LVParallelState;
+
 typedef struct LVRelStats
 {
 	/* useindex = true means two-pass strategy; false means one-pass */
@@ -128,17 +280,12 @@ typedef struct LVRelStats
 	BlockNumber pages_removed;
 	double		tuples_deleted;
 	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
-	/* List of TIDs of tuples we intend to delete */
-	/* NB: this list is ordered by TID address */
-	int			num_dead_tuples;	/* current # of entries */
-	int			max_dead_tuples;	/* # slots allocated in array */
-	ItemPointer dead_tuples;	/* array of ItemPointerData */
+	LVDeadTuples *dead_tuples;
 	int			num_index_scans;
 	TransactionId latestRemovedXid;
 	bool		lock_waiter_detected;
 } LVRelStats;
 
-
 /* A few variables that don't seem worth passing around as parameters */
 static int	elevel = -1;
 
@@ -155,12 +302,11 @@ static void lazy_scan_heap(Relation onerel, VacuumParams *params,
 						   bool aggressive);
 static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
 static bool lazy_check_needs_freeze(Buffer buf, bool *hastup);
-static void lazy_vacuum_index(Relation indrel,
-							  IndexBulkDeleteResult **stats,
-							  LVRelStats *vacrelstats);
+static void lazy_vacuum_index(Relation indrel, IndexBulkDeleteResult **stats,
+							  LVDeadTuples *dead_tuples, double reltuples);
 static void lazy_cleanup_index(Relation indrel,
-							   IndexBulkDeleteResult *stats,
-							   LVRelStats *vacrelstats);
+							   IndexBulkDeleteResult **stats,
+							   double reltuples, bool estimated_count);
 static int	lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 							 int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer);
 static bool should_attempt_truncation(VacuumParams *params,
@@ -169,12 +315,36 @@ static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
 static BlockNumber count_nondeletable_pages(Relation onerel,
 											LVRelStats *vacrelstats);
 static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
-static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
+static void lazy_record_dead_tuple(LVDeadTuples *dead_tuples,
 								   ItemPointer itemptr);
 static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
 static int	vac_cmp_itemptr(const void *left, const void *right);
 static bool heap_page_is_all_visible(Relation rel, Buffer buf,
 									 TransactionId *visibility_cutoff_xid, bool *all_frozen);
+static LVParallelState *begin_parallel_vacuum(LVRelStats *vacrelstats, Oid relid,
+											  BlockNumber nblocks, Relation *Irel,
+											  int nindexes, int nrequested);
+static void end_parallel_vacuum(LVParallelState *lps, Relation *Irel, int nindexes,
+								IndexBulkDeleteResult **stats);
+static void prepare_index_statistics(LVShared *lvshared, Relation *Irel, int nindexes);
+static void lazy_vacuum_indexes(LVRelStats *vacrelstats, Relation *Irel,
+								int nindexes, IndexBulkDeleteResult **stats,
+								LVParallelState *lps);
+static void lazy_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel,
+								 int nindexes, IndexBulkDeleteResult **stats,
+								 LVParallelState *lps);
+static void lazy_parallel_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel,
+													int nindexes, IndexBulkDeleteResult **stats,
+													LVParallelState *lps);
+static void vacuum_or_cleanup_indexes_worker(Relation *Irel, int nindexes,
+											 IndexBulkDeleteResult **stats,
+											 LVShared *lvshared,
+											 LVDeadTuples *dead_tuples);
+static void update_index_statistics(Relation *Irel, IndexBulkDeleteResult **stats,
+									int nindexes);
+static LVSharedIndStats *get_indstats(LVShared *lvshared, int n);
+static int compute_parallel_workers(Relation *Irel, int nindexes, int nrequested);
+static long compute_max_dead_tuples(BlockNumber relblocks, bool hasindex);
 
 
 /*
@@ -488,6 +658,18 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
  *		dead-tuple TIDs, invoke vacuuming of indexes and call lazy_vacuum_heap
  *		to reclaim dead line pointers.
  *
+ *		If the table has at least two indexes and parallel lazy vacuum is
+ *		requested, we execute both index vacuuming and index cleanup with
+ *		parallel workers. In parallel lazy vacuum, we enter parallel mode and
+ *		then create both the parallel context and the DSM segment before starting
+ *		heap scan so that we can record dead tuples to the DSM segment. All
+ *		parallel workers are launched at beginning of index vacuuming and index
+ *		cleanup and they exit once done with all indexes. At the end of this
+ *		function we exit from parallel mode. Index bulk-deletion results are
+ *		stored in the DSM segment and update index statistics as a whole after
+ *		exited from parallel mode since all writes are not allowed during parallel
+ *		mode.
+ *
  *		If there are no indexes then we can reclaim line pointers on the fly;
  *		dead line pointers need only be retained until all index pointers that
  *		reference them have been killed.
@@ -496,6 +678,8 @@ static void
 lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 			   Relation *Irel, int nindexes, bool aggressive)
 {
+	LVParallelState *lps = NULL;
+	LVDeadTuples *dead_tuples;
 	BlockNumber nblocks,
 				blkno;
 	HeapTupleData tuple;
@@ -518,6 +702,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 	bool		skipping_blocks;
 	xl_heap_freeze_tuple *frozen;
 	StringInfoData buf;
+	int			parallel_workers = 0;
 	const int	initprog_index[] = {
 		PROGRESS_VACUUM_PHASE,
 		PROGRESS_VACUUM_TOTAL_HEAP_BLKS,
@@ -553,13 +738,41 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 	vacrelstats->nonempty_pages = 0;
 	vacrelstats->latestRemovedXid = InvalidTransactionId;
 
-	lazy_space_alloc(vacrelstats, nblocks);
+	/*
+	 * If parallel lazy vacuum is requested and we vacuum indexes, compute
+	 * the number of parallel vacuum worker to launch.
+	 */
+	if (params->nworkers >= 0 && vacrelstats->useindex)
+		parallel_workers = compute_parallel_workers(Irel, nindexes,
+													params->nworkers);
+
+	if (parallel_workers > 0)
+	{
+		/*
+		 * Enter parallel mode, create the parallel context and allocate the
+		 * DSM segment.
+		 */
+		lps = begin_parallel_vacuum(vacrelstats,
+									RelationGetRelid(onerel),
+									nblocks, Irel, nindexes,
+									parallel_workers);
+	}
+	else
+	{
+		/*
+		 * Use single process vacuum. We allocate the memory space for dead
+		 * tuples locally.
+		 */
+		lazy_space_alloc(vacrelstats, nblocks);
+	}
+
+	dead_tuples = vacrelstats->dead_tuples;
 	frozen = palloc(sizeof(xl_heap_freeze_tuple) * MaxHeapTuplesPerPage);
 
 	/* Report that we're scanning the heap, advertising total # of blocks */
 	initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP;
 	initprog_val[1] = nblocks;
-	initprog_val[2] = vacrelstats->max_dead_tuples;
+	initprog_val[2] = dead_tuples->max_tuples;
 	pgstat_progress_update_multi_param(3, initprog_index, initprog_val);
 
 	/*
@@ -737,8 +950,8 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 		 * If we are close to overrunning the available space for dead-tuple
 		 * TIDs, pause and do a cycle of vacuuming before we tackle this page.
 		 */
-		if ((vacrelstats->max_dead_tuples - vacrelstats->num_dead_tuples) < MaxHeapTuplesPerPage &&
-			vacrelstats->num_dead_tuples > 0)
+		if ((dead_tuples->max_tuples - dead_tuples->num_tuples) < MaxHeapTuplesPerPage &&
+			dead_tuples->num_tuples > 0)
 		{
 			const int	hvp_index[] = {
 				PROGRESS_VACUUM_PHASE,
@@ -766,10 +979,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 										 PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
 
 			/* Remove index entries */
-			for (i = 0; i < nindexes; i++)
-				lazy_vacuum_index(Irel[i],
-								  &indstats[i],
-								  vacrelstats);
+			lazy_vacuum_indexes(vacrelstats, Irel, nindexes, indstats, lps);
 
 			/*
 			 * Report that we are now vacuuming the heap.  We also increase
@@ -789,7 +999,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 			 * not to reset latestRemovedXid since we want that value to be
 			 * valid.
 			 */
-			vacrelstats->num_dead_tuples = 0;
+			dead_tuples->num_tuples = 0;
 			vacrelstats->num_index_scans++;
 
 			/*
@@ -985,7 +1195,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 		has_dead_tuples = false;
 		nfrozen = 0;
 		hastup = false;
-		prev_dead_count = vacrelstats->num_dead_tuples;
+		prev_dead_count = dead_tuples->num_tuples;
 		maxoff = PageGetMaxOffsetNumber(page);
 
 		/*
@@ -1024,7 +1234,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 			 */
 			if (ItemIdIsDead(itemid))
 			{
-				lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+				lazy_record_dead_tuple(dead_tuples, &(tuple.t_self));
 				all_visible = false;
 				continue;
 			}
@@ -1170,7 +1380,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 
 			if (tupgone)
 			{
-				lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+				lazy_record_dead_tuple(dead_tuples, &(tuple.t_self));
 				HeapTupleHeaderAdvanceLatestRemovedXid(tuple.t_data,
 													   &vacrelstats->latestRemovedXid);
 				tups_vacuumed += 1;
@@ -1240,7 +1450,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 		 * doing a second scan. Also we don't do that but forget dead tuples
 		 * when index cleanup is disabled.
 		 */
-		if (!vacrelstats->useindex && vacrelstats->num_dead_tuples > 0)
+		if (!vacrelstats->useindex && dead_tuples->num_tuples > 0)
 		{
 			if (nindexes == 0)
 			{
@@ -1269,7 +1479,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 			 * not to reset latestRemovedXid since we want that value to be
 			 * valid.
 			 */
-			vacrelstats->num_dead_tuples = 0;
+			dead_tuples->num_tuples = 0;
 
 			/*
 			 * Periodically do incremental FSM vacuuming to make newly-freed
@@ -1384,7 +1594,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 		 * page, so remember its free space as-is.  (This path will always be
 		 * taken if there are no indexes.)
 		 */
-		if (vacrelstats->num_dead_tuples == prev_dead_count)
+		if (dead_tuples->num_tuples == prev_dead_count)
 			RecordPageWithFreeSpace(onerel, blkno, freespace);
 	}
 
@@ -1418,7 +1628,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 
 	/* If any tuples need to be deleted, perform final vacuum cycle */
 	/* XXX put a threshold on min number of tuples here? */
-	if (vacrelstats->num_dead_tuples > 0)
+	if (dead_tuples->num_tuples > 0)
 	{
 		const int	hvp_index[] = {
 			PROGRESS_VACUUM_PHASE,
@@ -1434,10 +1644,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 									 PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
 
 		/* Remove index entries */
-		for (i = 0; i < nindexes; i++)
-			lazy_vacuum_index(Irel[i],
-							  &indstats[i],
-							  vacrelstats);
+		lazy_vacuum_indexes(vacrelstats, Irel, nindexes, indstats, lps);
 
 		/* Report that we are now vacuuming the heap */
 		hvp_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_HEAP;
@@ -1463,12 +1670,19 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
 								 PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
 
-	/* Do post-vacuum cleanup and statistics update for each index */
+	/* Do post-vacuum cleanup */
 	if (vacrelstats->useindex)
-	{
-		for (i = 0; i < nindexes; i++)
-			lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
-	}
+		lazy_cleanup_indexes(vacrelstats, Irel, nindexes, indstats, lps);
+
+	/*
+	 * End parallel mode before updating index statistics as we cannot write
+	 * during parallel mode.
+	 */
+	if (ParallelVacuumIsActive(lps))
+		end_parallel_vacuum(lps, Irel, nindexes, indstats);
+
+	/* Update index statistics */
+	 update_index_statistics(Irel, indstats, nindexes);
 
 	/* If no indexes, make log report that lazy_vacuum_heap would've made */
 	if (vacuumed_pages)
@@ -1534,7 +1748,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
 	npages = 0;
 
 	tupindex = 0;
-	while (tupindex < vacrelstats->num_dead_tuples)
+	while (tupindex < vacrelstats->dead_tuples->num_tuples)
 	{
 		BlockNumber tblk;
 		Buffer		buf;
@@ -1543,7 +1757,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
 
 		vacuum_delay_point();
 
-		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
+		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples->itemptrs[tupindex]);
 		buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL,
 								 vac_strategy);
 		if (!ConditionalLockBufferForCleanup(buf))
@@ -1591,6 +1805,7 @@ static int
 lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 				 int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer)
 {
+	LVDeadTuples	*dead_tuples = vacrelstats->dead_tuples;
 	Page		page = BufferGetPage(buffer);
 	OffsetNumber unused[MaxOffsetNumber];
 	int			uncnt = 0;
@@ -1601,16 +1816,16 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 
 	START_CRIT_SECTION();
 
-	for (; tupindex < vacrelstats->num_dead_tuples; tupindex++)
+	for (; tupindex < dead_tuples->num_tuples; tupindex++)
 	{
 		BlockNumber tblk;
 		OffsetNumber toff;
 		ItemId		itemid;
 
-		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
+		tblk = ItemPointerGetBlockNumber(&dead_tuples->itemptrs[tupindex]);
 		if (tblk != blkno)
 			break;				/* past end of tuples for this block */
-		toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]);
+		toff = ItemPointerGetOffsetNumber(&dead_tuples->itemptrs[tupindex]);
 		itemid = PageGetItemId(page, toff);
 		ItemIdSetUnused(itemid);
 		unused[uncnt++] = toff;
@@ -1731,19 +1946,290 @@ lazy_check_needs_freeze(Buffer buf, bool *hastup)
 	return false;
 }
 
+/*
+ * Perform index vacuuming or index cleanup with parallel workers. This function
+ * must be used by the parallel vacuum leader process. The caller must set
+ * lps->lvshared->for_cleanup to indicate whether vacuuming or cleanup.
+ */
+static void
+lazy_parallel_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel,
+										int nindexes, IndexBulkDeleteResult **stats,
+										LVParallelState *lps)
+{
+	Assert(!IsParallelWorker());
+	Assert(ParallelVacuumIsActive(lps));
+	Assert(nindexes > 0);
+
+	LaunchParallelWorkers(lps->pcxt);
+
+	if (lps->lvshared->for_cleanup)
+		ereport(elevel,
+				(errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
+								 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
+								 lps->pcxt->nworkers_launched),
+						lps->pcxt->nworkers_launched, lps->pcxt->nworkers)));
+	else
+		ereport(elevel,
+				(errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
+								 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
+								 lps->pcxt->nworkers_launched),
+						lps->pcxt->nworkers_launched, lps->pcxt->nworkers)));
+
+	/*
+	 * Join as parallel workers. The leader process alone does that in case where
+	 * no workers launched.
+	 */
+	if (lps->leaderparticipates || lps->pcxt->nworkers_launched == 0)
+		vacuum_or_cleanup_indexes_worker(Irel, nindexes, stats, lps->lvshared,
+										 vacrelstats->dead_tuples);
+
+	/* Wait for all vacuum workers to finish */
+	WaitForParallelWorkersToFinish(lps->pcxt);
+
+	/*
+	 * We need to reinitialize the parallel context as no more index vacuuming and
+	 * index cleanup will be performed after that.
+	 */
+	if (!lps->lvshared->for_cleanup)
+	{
+		/* Reset the processing count */
+		pg_atomic_write_u32(&(lps->lvshared->nprocessed), 0);
+
+		/*
+		 * Reinitialize the parallel context to relaunch parallel workers
+		 * for the next execution.
+		 */
+		ReinitializeParallelDSM(lps->pcxt);
+	}
+}
+
+/*
+ * Index vacuuming and index cleanup routine used by parallel vacuum
+ * worker processes including the leader process.  After finished each
+ * indexes this function copies the index statistics returned from
+ * ambulkdelete and amvacuumcleanup to the DSM segment.
+ */
+static void
+vacuum_or_cleanup_indexes_worker(Relation *Irel, int nindexes,
+								 IndexBulkDeleteResult **stats,
+								 LVShared *lvshared,
+								 LVDeadTuples *dead_tuples)
+{
+	/* Loop until all indexes are vacuumed */
+	for (;;)
+	{
+		int idx;
+		LVSharedIndStats *shared_indstats;
+		IndexBulkDeleteResult *bulkdelete_res;
+
+		/* Get an index number to process */
+		idx = pg_atomic_fetch_add_u32(&(lvshared->nprocessed), 1);
+
+		/* Done for all indexes? */
+		if (idx >= nindexes)
+			break;
+
+		/* Get index statistics struct of this index */
+		shared_indstats = get_indstats(lvshared, idx);
+
+		/* Skip if this index doesn't support parallel index vacuuming */
+		if (shared_indstats == NULL)
+			continue;
+
+		/* Get the space for IndexBulkDeleteResult */
+		bulkdelete_res = GetIndexBulkDeleteResult(shared_indstats);
+
+		/*
+		 * Update the pointer to the corresponding bulk-deletion result
+		 * if someone has already updated it.
+		 */
+		if (shared_indstats->updated && stats[idx] == NULL)
+			stats[idx] = bulkdelete_res;
+
+		/* Do vacuum or cleanup one index */
+		if (lvshared->for_cleanup)
+			lazy_cleanup_index(Irel[idx], &(stats[idx]), lvshared->reltuples,
+							   lvshared->estimated_count);
+		else
+			lazy_vacuum_index(Irel[idx], &(stats[idx]), dead_tuples,
+							  lvshared->reltuples);
+
+		/*
+		 * Copy the index bulk-deletion result returned from ambulkdelete
+		 * and amvacuumcleanup to the DSM segment if it's the first time to
+		 * get it from them, because they allocate it locally and it's
+		 * possible that an index will be vacuumed by the different vacuum
+		 * process at the next time.  The copying the result normally
+		 * happens only after the first time of index vacuuming.  From the
+		 * second time, we pass the result on the DSM segment so that they
+		 * then update it directly.
+		 *
+		 * Since all vacuum workers write the bulk-deletion result at
+		 * different slots we can write them without locking.
+		 */
+		if (!shared_indstats->updated && stats[idx] != NULL)
+		{
+			memcpy(bulkdelete_res, stats[idx], shared_indstats->size);
+			shared_indstats->updated = true;
+
+			/*
+			 * no longer need the locally allocated result and now
+			 * stats[idx] points to the DSM segment.
+			 */
+			pfree(stats[idx]);
+			stats[idx] = bulkdelete_res;
+		}
+	}
+}
+
+/*
+ * Cleanup indexes.  This function must be used by the parallel vacuum
+ * leader process in parallel vacuum case.
+ */
+static void
+lazy_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel,
+					int nindexes, IndexBulkDeleteResult **stats,
+					LVParallelState *lps)
+{
+	int		idx;
+
+	Assert(!IsParallelWorker());
+	Assert(nindexes > 0);
+
+	/*
+	 * If parallel vacuum is active we perform index cleanup with parallel
+	 * workers.
+	 */
+	if (ParallelVacuumIsActive(lps))
+	{
+		/* Tell parallel workers to do index cleanup */
+		lps->lvshared->for_cleanup = true;
+
+		/*
+		 * Now we can provide a better estimate of total number of
+		 * surviving tuples (we assume indexes are more interested in that
+		 * than in the number of nominally live tuples).
+		 */
+		lps->lvshared->reltuples = vacrelstats->new_rel_tuples;
+		lps->lvshared->estimated_count =
+			(vacrelstats->tupcount_pages < vacrelstats->rel_pages);
+
+		/*
+		 * Generally index cleanup does not scan the index when index
+		 * vacuuming (ambulkdelete) was already performed.  So we perform
+		 * index cleanup with parallel workers only if we have not
+		 * performed index vacuuming yet.  Otherwise, we do it in the
+		 * leader process alone.
+		 */
+		if (vacrelstats->num_index_scans == 0)
+			lazy_parallel_vacuum_or_cleanup_indexes(vacrelstats, Irel, nindexes,
+													stats, lps);
+		else
+		{
+			/*
+			 * Do cleanup by the leader process alone.  Since we need to
+			 * copy the index statistics to the DSM segment we cannot use
+			 * lazy_index_cleanup instead.
+			 */
+			vacuum_or_cleanup_indexes_worker(Irel, nindexes, stats,
+											 lps->lvshared,
+											 vacrelstats->dead_tuples);
+		}
+
+		/*
+		 * Done if there is no indexes that do not support parallel index
+		 * vacuuming.  Otherwise fall through to do single process vacuum
+		 * on such indexes.
+		 */
+		if (lps->lvshared->nindexes_nonparallel == 0)
+			return;
+	}
+
+	for (idx = 0; idx < nindexes; idx++)
+	{
+		/*
+		 * Skip indexes that we have already cleaned up during parallel
+		 * index vacuuming.
+		 */
+		if (ParallelVacuumIsActive(lps) && !IndStatsIsNull(lps->lvshared,idx))
+			continue;
+
+		lazy_cleanup_index(Irel[idx], &stats[idx],
+						   vacrelstats->new_rel_tuples,
+						   vacrelstats->tupcount_pages < vacrelstats->rel_pages);
+	}
+}
+
+/*
+ * Vacuum indexes. This function must be used by the parallel vacuum leader
+ * process in parallel vacuum case.
+ */
+static void
+lazy_vacuum_indexes(LVRelStats *vacrelstats, Relation *Irel,
+					int nindexes, IndexBulkDeleteResult **stats,
+					LVParallelState *lps)
+{
+	int		idx;
+
+	Assert(!IsParallelWorker());
+	Assert(nindexes > 0);
+
+	/*
+	 * If parallel vacuum is active we perform index vacuuming with
+	 * parallel workers.
+	 */
+	if (ParallelVacuumIsActive(lps))
+	{
+		/* Tell parallel workers to do index vacuuming */
+		lps->lvshared->for_cleanup = false;
+
+		/*
+		 * We can only provide an approximate value of num_heap_tuples in
+		 * vacuum cases.
+		 */
+		lps->lvshared->reltuples = vacrelstats->old_live_tuples;
+		lps->lvshared->estimated_count = true;
+
+		lazy_parallel_vacuum_or_cleanup_indexes(vacrelstats, Irel, nindexes,
+												stats, lps);
+
+		/*
+		 * Done if there is no indexes that do not support parallel index
+		 * vacuuming.  Otherwise fall through to do single process vacuum
+		 * on such indexes.
+		 */
+		if (lps->lvshared->nindexes_nonparallel == 0)
+			return;
+	}
+
+	for (idx = 0; idx < nindexes; idx++)
+	{
+		/*
+		 * Skip indexes that we have already vacuumed during parallel index
+		 * vacuuming.
+		 */
+		if (ParallelVacuumIsActive(lps) && !IndStatsIsNull(lps->lvshared, idx))
+			continue;
+
+		lazy_vacuum_index(Irel[idx], &stats[idx], vacrelstats->dead_tuples,
+						  vacrelstats->old_live_tuples);
+	}
+}
 
 /*
  *	lazy_vacuum_index() -- vacuum one index relation.
  *
  *		Delete all the index entries pointing to tuples listed in
  *		vacrelstats->dead_tuples, and update running statistics.
+ *		reltuples is the number of heap tuples to be passed to the
+ *		bulk delete callback.
  */
 static void
-lazy_vacuum_index(Relation indrel,
-				  IndexBulkDeleteResult **stats,
-				  LVRelStats *vacrelstats)
+lazy_vacuum_index(Relation indrel, IndexBulkDeleteResult **stats,
+				  LVDeadTuples *dead_tuples, double reltuples)
 {
 	IndexVacuumInfo ivinfo;
+	char		*msgfmt;
 	PGRUsage	ru0;
 
 	pg_rusage_init(&ru0);
@@ -1753,30 +2239,38 @@ lazy_vacuum_index(Relation indrel,
 	ivinfo.report_progress = false;
 	ivinfo.estimated_count = true;
 	ivinfo.message_level = elevel;
-	/* We can only provide an approximate value of num_heap_tuples here */
-	ivinfo.num_heap_tuples = vacrelstats->old_live_tuples;
+	ivinfo.num_heap_tuples = reltuples;
 	ivinfo.strategy = vac_strategy;
 
 	/* Do bulk deletion */
 	*stats = index_bulk_delete(&ivinfo, *stats,
-							   lazy_tid_reaped, (void *) vacrelstats);
+							   lazy_tid_reaped, (void *) dead_tuples);
+
+	if (IsParallelWorker())
+		msgfmt = gettext_noop("scanned index \"%s\" to remove %d row versions by parallel vacuum worker");
+	else
+		msgfmt = gettext_noop("scanned index \"%s\" to remove %d row versions");
 
 	ereport(elevel,
-			(errmsg("scanned index \"%s\" to remove %d row versions",
+			(errmsg(msgfmt,
 					RelationGetRelationName(indrel),
-					vacrelstats->num_dead_tuples),
+					dead_tuples->num_tuples),
 			 errdetail_internal("%s", pg_rusage_show(&ru0))));
 }
 
 /*
  *	lazy_cleanup_index() -- do post-vacuum cleanup for one index relation.
+ *
+ *		reltuples is the number of heap tuples and estimated_count is true
+ *		if the reltuples is an estimated value.
  */
 static void
 lazy_cleanup_index(Relation indrel,
-				   IndexBulkDeleteResult *stats,
-				   LVRelStats *vacrelstats)
+				   IndexBulkDeleteResult **stats,
+				   double reltuples, bool estimated_count)
 {
 	IndexVacuumInfo ivinfo;
+	char		*msgfmt;
 	PGRUsage	ru0;
 
 	pg_rusage_init(&ru0);
@@ -1784,49 +2278,62 @@ lazy_cleanup_index(Relation indrel,
 	ivinfo.index = indrel;
 	ivinfo.analyze_only = false;
 	ivinfo.report_progress = false;
-	ivinfo.estimated_count = (vacrelstats->tupcount_pages < vacrelstats->rel_pages);
+	ivinfo.estimated_count = estimated_count;
 	ivinfo.message_level = elevel;
 
-	/*
-	 * Now we can provide a better estimate of total number of surviving
-	 * tuples (we assume indexes are more interested in that than in the
-	 * number of nominally live tuples).
-	 */
-	ivinfo.num_heap_tuples = vacrelstats->new_rel_tuples;
+	ivinfo.num_heap_tuples = reltuples;
 	ivinfo.strategy = vac_strategy;
 
-	stats = index_vacuum_cleanup(&ivinfo, stats);
+	*stats = index_vacuum_cleanup(&ivinfo, *stats);
 
-	if (!stats)
+	if (!(*stats))
 		return;
 
-	/*
-	 * Now update statistics in pg_class, but only if the index says the count
-	 * is accurate.
-	 */
-	if (!stats->estimated_count)
-		vac_update_relstats(indrel,
-							stats->num_pages,
-							stats->num_index_tuples,
-							0,
-							false,
-							InvalidTransactionId,
-							InvalidMultiXactId,
-							false);
+	if (IsParallelWorker())
+		msgfmt = gettext_noop("index \"%s\" now contains %.0f row versions in %u pages, reported by parallel vacuum worker");
+	else
+		msgfmt = gettext_noop("index \"%s\" now contains %.0f row versions in %u pages");
 
 	ereport(elevel,
-			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
+			(errmsg(msgfmt,
 					RelationGetRelationName(indrel),
-					stats->num_index_tuples,
-					stats->num_pages),
+					(*stats)->num_index_tuples,
+					(*stats)->num_pages),
 			 errdetail("%.0f index row versions were removed.\n"
 					   "%u index pages have been deleted, %u are currently reusable.\n"
 					   "%s.",
-					   stats->tuples_removed,
-					   stats->pages_deleted, stats->pages_free,
+					   (*stats)->tuples_removed,
+					   (*stats)->pages_deleted, (*stats)->pages_free,
 					   pg_rusage_show(&ru0))));
+}
+
+/*
+ * Update index statistics in pg_class if the statistics is accurate.
+ */
+static void
+update_index_statistics(Relation *Irel, IndexBulkDeleteResult **stats,
+						int nindexes)
+{
+	int i;
+
+	Assert(!IsInParallelMode());
 
-	pfree(stats);
+	for (i = 0; i < nindexes; i++)
+	{
+		if (stats[i] == NULL || stats[i]->estimated_count)
+			continue;
+
+		/* Update index statistics */
+		vac_update_relstats(Irel[i],
+							stats[i]->num_pages,
+							stats[i]->num_index_tuples,
+							0,
+							false,
+							InvalidTransactionId,
+							InvalidMultiXactId,
+							false);
+		pfree(stats[i]);
+	}
 }
 
 /*
@@ -2134,19 +2641,17 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
 }
 
 /*
- * lazy_space_alloc - space allocation decisions for lazy vacuum
- *
- * See the comments at the head of this file for rationale.
+ * Return the maximum number of dead tuples we can record.
  */
-static void
-lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
+static long
+compute_max_dead_tuples(BlockNumber relblocks, bool useindex)
 {
 	long		maxtuples;
 	int			vac_work_mem = IsAutoVacuumWorkerProcess() &&
 	autovacuum_work_mem != -1 ?
 	autovacuum_work_mem : maintenance_work_mem;
 
-	if (vacrelstats->useindex)
+	if (useindex)
 	{
 		maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData);
 		maxtuples = Min(maxtuples, INT_MAX);
@@ -2160,34 +2665,49 @@ lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
 		maxtuples = Max(maxtuples, MaxHeapTuplesPerPage);
 	}
 	else
-	{
 		maxtuples = MaxHeapTuplesPerPage;
-	}
 
-	vacrelstats->num_dead_tuples = 0;
-	vacrelstats->max_dead_tuples = (int) maxtuples;
-	vacrelstats->dead_tuples = (ItemPointer)
-		palloc(maxtuples * sizeof(ItemPointerData));
+	return maxtuples;
+}
+
+/*
+ * lazy_space_alloc - space allocation decisions for lazy vacuum
+ *
+ * See the comments at the head of this file for rationale.
+ */
+static void
+lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
+{
+	LVDeadTuples	*dead_tuples = NULL;
+	long		maxtuples;
+
+	maxtuples = compute_max_dead_tuples(relblocks, vacrelstats->useindex);
+
+	dead_tuples = (LVDeadTuples *)
+		palloc(SizeOfLVDeadTuples + maxtuples * sizeof(ItemPointerData));
+	dead_tuples->num_tuples = 0;
+	dead_tuples->max_tuples = (int) maxtuples;
+
+	vacrelstats->dead_tuples = dead_tuples;
 }
 
 /*
  * lazy_record_dead_tuple - remember one deletable tuple
  */
 static void
-lazy_record_dead_tuple(LVRelStats *vacrelstats,
-					   ItemPointer itemptr)
+lazy_record_dead_tuple(LVDeadTuples *dead_tuples, ItemPointer itemptr)
 {
 	/*
 	 * The array shouldn't overflow under normal behavior, but perhaps it
 	 * could if we are given a really small maintenance_work_mem. In that
 	 * case, just forget the last few tuples (we'll get 'em next time).
 	 */
-	if (vacrelstats->num_dead_tuples < vacrelstats->max_dead_tuples)
+	if (dead_tuples->num_tuples < dead_tuples->max_tuples)
 	{
-		vacrelstats->dead_tuples[vacrelstats->num_dead_tuples] = *itemptr;
-		vacrelstats->num_dead_tuples++;
+		dead_tuples->itemptrs[dead_tuples->num_tuples] = *itemptr;
+		dead_tuples->num_tuples++;
 		pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
-									 vacrelstats->num_dead_tuples);
+									 dead_tuples->num_tuples);
 	}
 }
 
@@ -2201,12 +2721,12 @@ lazy_record_dead_tuple(LVRelStats *vacrelstats,
 static bool
 lazy_tid_reaped(ItemPointer itemptr, void *state)
 {
-	LVRelStats *vacrelstats = (LVRelStats *) state;
+	LVDeadTuples	*dead_tuples = (LVDeadTuples *) state;
 	ItemPointer res;
 
 	res = (ItemPointer) bsearch((void *) itemptr,
-								(void *) vacrelstats->dead_tuples,
-								vacrelstats->num_dead_tuples,
+								(void *) dead_tuples->itemptrs,
+								dead_tuples->num_tuples,
 								sizeof(ItemPointerData),
 								vac_cmp_itemptr);
 
@@ -2354,3 +2874,330 @@ heap_page_is_all_visible(Relation rel, Buffer buf,
 
 	return all_visible;
 }
+
+/*
+ * Compute the number of parallel worker processes to request. Both index
+ * vacuuming and index cleanup can be executed together with parallel workers.
+ * The relation sizes of table and indexes don't affect to the parallel
+ * degree for now. nrequested is the number of parallel workers that user
+ * requested. If nrequested is 0 we compute the parallel degree based on
+ * nindexes that is the number of indexes that support parallel index
+ * vacuuming.
+ */
+static int
+compute_parallel_workers(Relation *Irel, int nindexes, int nrequested)
+{
+	bool	leaderparticipates = true;
+	int		nindexes_to_vacuum = 0;
+	int		parallel_workers;
+	int		i;
+
+	Assert(nrequested >= 0);
+
+	/* Return immediately when parallelism disabled */
+	if (max_parallel_maintenance_workers == 0)
+		return 0;
+
+	/*
+	 * Compute the number of indexes that can participate to parallel index
+	 * vacuuming.
+	 */
+	for (i = 0; i < nindexes; i++)
+	{
+		IndexAmRoutine *amroutine = GetIndexAmRoutine(Irel[i]->rd_amhandler);
+
+		if (amroutine->amcanparallelvacuum)
+			nindexes_to_vacuum++;
+	}
+
+	/* No index supports parallel index vacuuming */
+	if (nindexes_to_vacuum == 0)
+		return 0;
+
+#ifdef PARALLEL_VACUUM_DISABLE_LEADER_PARTICIPATION
+	leaderparticipates = false;
+#endif
+
+	/* The leader process takes one index */
+	if (leaderparticipates)
+		nindexes_to_vacuum--;
+
+	/* Compute the parallel degree */
+	parallel_workers = (nrequested > 0) ?
+		Min(nrequested, nindexes_to_vacuum) : nindexes_to_vacuum;
+
+	/* cap by max_parallel_maintenace_workers */
+	parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
+
+	return parallel_workers;
+}
+
+/*
+ * Enter parallel mode, allocate and initialize the DSM segment.
+ */
+static LVParallelState *
+begin_parallel_vacuum(LVRelStats *vacrelstats, Oid relid, BlockNumber nblocks,
+					  Relation *Irel, int nindexes, int nrequested)
+{
+	LVParallelState *lps = (LVParallelState *) palloc(sizeof(LVParallelState));
+	ParallelContext *pcxt;
+	LVShared		*shared;
+	LVDeadTuples	*dead_tuples;
+	long	maxtuples;
+	char	*sharedquery;
+	Size	est_shared;
+	Size	est_deadtuples;
+	int		querylen;
+	int		i;
+
+	Assert(nrequested > 0);
+	Assert(nindexes > 0);
+
+	lps->leaderparticipates = true;
+
+#ifdef PARALLEL_VACUUM_DISABLE_LEADER_PARTICIPATION
+	lps->leaderparticipates = false;
+#endif
+
+	EnterParallelMode();
+	pcxt = CreateParallelContext("postgres", "heap_parallel_vacuum_main",
+								 nrequested);
+	lps->pcxt = pcxt;
+	Assert(pcxt->nworkers > 0);
+
+	/* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
+	est_shared = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes)));
+	for (i = 0; i < nindexes; i++)
+	{
+		if (Irel[i]->rd_indam->amcanparallelvacuum)
+			est_shared = add_size(est_shared,
+									add_size(sizeof(LVSharedIndStats),
+											 index_parallelvacuum_estimate(Irel[i])));
+	}
+	shm_toc_estimate_chunk(&pcxt->estimator, est_shared);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Estimate size for dead tuples -- PARALLEL_VACUUM_KEY_DEAD_TUPLES */
+	maxtuples = compute_max_dead_tuples(nblocks, true);
+	est_deadtuples = MAXALIGN(add_size(SizeOfLVDeadTuples,
+									   mul_size(sizeof(ItemPointerData), maxtuples)));
+	shm_toc_estimate_chunk(&pcxt->estimator, est_deadtuples);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
+	querylen = strlen(debug_query_string);
+	shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	InitializeParallelDSM(pcxt);
+
+	/* Prepare shared information */
+	shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared);
+	MemSet(shared, 0, est_shared);
+	shared->relid = relid;
+	shared->elevel = elevel;
+	shared->offset = add_size(SizeOfLVShared, BITMAPLEN(nindexes));
+	prepare_index_statistics(shared, Irel, nindexes);
+	pg_atomic_init_u32(&(shared->nprocessed), 0);
+
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
+	lps->lvshared = shared;
+
+	/* Prepare the dead tuple space */
+	dead_tuples = (LVDeadTuples *) shm_toc_allocate(pcxt->toc, est_deadtuples);
+	dead_tuples->max_tuples = maxtuples;
+	dead_tuples->num_tuples = 0;
+	MemSet(dead_tuples->itemptrs, 0, sizeof(ItemPointerData) * maxtuples);
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, dead_tuples);
+	vacrelstats->dead_tuples = dead_tuples;
+
+	/* Store query string for workers */
+	sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
+	memcpy(sharedquery, debug_query_string, querylen + 1);
+	sharedquery[querylen] = '\0';
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
+
+	return lps;
+}
+
+/*
+ * Initialize variables for shared index statistics, set NULL bitmap and
+ * the struct size of each indexes.  Also this function sets the number of
+ * indexes that do not support parallel index vacuuming and that use
+ * maintenance_work_mem.
+ */
+static void
+prepare_index_statistics(LVShared *lvshared, Relation *Irel, int nindexes)
+{
+	char *p = (char *) GetSharedIndStats(lvshared);
+	int			vac_work_mem = IsAutoVacuumWorkerProcess() &&
+		autovacuum_work_mem != -1 ?
+		autovacuum_work_mem : maintenance_work_mem;
+	int nindexes_mwm = 0;
+	int i;
+
+	for (i = 0; i < nindexes; i++)
+	{
+		LVSharedIndStats *indstats;
+
+		if (!Irel[i]->rd_indam->amcanparallelvacuum)
+		{
+			/* Set NULL as this index does not support parallel vacuum */
+			lvshared->bitmap[i >> 3] |= 0 << (i & 0x07);
+			lvshared->nindexes_nonparallel++;
+			continue;
+		}
+
+		if (Irel[i]->rd_indam->amusemaintenanceworkmem)
+			nindexes_mwm++;
+
+		/* Set the size for index statistics */
+		indstats = (LVSharedIndStats *) p;
+		lvshared->bitmap[i >> 3] |= 1 << (i & 0x07);
+		indstats->size = index_parallelvacuum_estimate(Irel[i]);
+
+		p += SizeOfSharedIndStats(indstats);
+	}
+
+	/* Compute the new maitenance_work_mem value for index vacuuming */
+	lvshared->maintenance_work_mem_worker =
+		(nindexes_mwm > 0) ? vac_work_mem / nindexes_mwm : vac_work_mem;
+}
+
+/*
+ * Destroy the parallel context, and end parallel mode.
+ *
+ * All writes are not allowed during parallel mode and it might not be
+ * safe to exit from the parallel mode while keeping the parallel context.
+ * So we copy the updated index statistics to a local memory and then later
+ * use that to update the index statistics.
+ */
+static void
+end_parallel_vacuum(LVParallelState *lps, Relation *Irel, int nindexes,
+					IndexBulkDeleteResult **stats)
+{
+	int i;
+
+	Assert(!IsParallelWorker());
+
+	/* copy the updated statistics */
+	for (i = 0; i < nindexes; i++)
+	{
+		LVSharedIndStats *indstats = get_indstats(lps->lvshared, i);
+
+		/*
+		 * Skip unused slot.  The statistics of this index are already
+		 * stored in local memory.
+		 */
+		if (indstats == NULL)
+			continue;
+
+		if (indstats->updated)
+		{
+			stats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
+			memcpy(stats[i],
+				   GetIndexBulkDeleteResult(indstats),
+				   sizeof(IndexBulkDeleteResult));
+		}
+		else
+			stats[i] = NULL;
+	}
+
+	DestroyParallelContext(lps->pcxt);
+	ExitParallelMode();
+
+	/* Deactivate parallel vacuum */
+	pfree(lps);
+	lps = NULL;
+}
+
+/* Return the Nth index statistics or NULL */
+static LVSharedIndStats *
+get_indstats(LVShared *lvshared, int n)
+{
+	int		i;
+	char	*p = (char *) GetSharedIndStats(lvshared);;
+
+	if (IndStatsIsNull(lvshared, n))
+		return NULL;
+
+	for (i = 0; i < (n - 1); i++)
+	{
+		if (IndStatsIsNull(lvshared, i))
+			continue;
+
+		p += SizeOfSharedIndStats(p);
+	}
+
+	return (LVSharedIndStats *) p;
+}
+
+/*
+ * Perform work within a launched parallel process.
+ *
+ * Since parallel vacuum workers work only within index vacuuming and index
+ * cleanup, no need to report the progress information.
+ */
+void
+heap_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
+{
+	Relation	onerel;
+	Relation	*indrels;
+	LVShared	*lvshared;
+	LVDeadTuples	*dead_tuples;
+	int			nindexes;
+	char		*sharedquery;
+	IndexBulkDeleteResult **stats;
+
+	lvshared = (LVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED,
+										   false);
+	elevel = lvshared->elevel;
+
+	ereport(DEBUG1,
+			(errmsg("starting parallel lazy vacuum worker for %s",
+					lvshared->for_cleanup ? "cleanup" : "vacuuming")));
+
+	/* Set debug_query_string for individual workers */
+	sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, false);
+	debug_query_string = sharedquery;
+	pgstat_report_activity(STATE_RUNNING, debug_query_string);
+
+	/*
+	 * Open table.  The lock mode is the same as the leader process.  It's
+	 * okay because The lockmode does not conflict among the parallel
+	 * workers.
+	 */
+	onerel = table_open(lvshared->relid, ShareUpdateExclusiveLock);
+
+	/*
+	 * Open all indexes. indrels are sorted in order by OID, which should
+	 * be matched to the leader's one.
+	 */
+	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &indrels);
+	Assert(nindexes > 0);
+
+	/* Set dead tuple space */
+	dead_tuples = (LVDeadTuples *) shm_toc_lookup(toc,
+												  PARALLEL_VACUUM_KEY_DEAD_TUPLES,
+												  false);
+
+	/* Set cost-based vacuum delay */
+	VacuumCostActive = (VacuumCostDelay > 0);
+	VacuumCostBalance = 0;
+	VacuumPageHit = 0;
+	VacuumPageMiss = 0;
+	VacuumPageDirty = 0;
+
+	stats = (IndexBulkDeleteResult **)
+		palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
+
+	if (lvshared->maintenance_work_mem_worker > 0)
+		maintenance_work_mem = lvshared->maintenance_work_mem_worker;
+
+	/* Do either vacuuming indexes or cleaning indexes */
+	vacuum_or_cleanup_indexes_worker(indrels, nindexes, stats, lvshared,
+									 dead_tuples);
+
+	vac_close_indexes(nindexes, indrels, RowExclusiveLock);
+	table_close(onerel, ShareUpdateExclusiveLock);
+}
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 55d129a64f..86511b2703 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -14,6 +14,7 @@
 
 #include "postgres.h"
 
+#include "access/heapam.h"
 #include "access/nbtree.h"
 #include "access/parallel.h"
 #include "access/session.h"
@@ -140,6 +141,9 @@ static const struct
 	},
 	{
 		"_bt_parallel_build_main", _bt_parallel_build_main
+	},
+	{
+		"heap_parallel_vacuum_main", heap_parallel_vacuum_main
 	}
 };
 
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 4b67b40b28..9ada501709 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -42,6 +42,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
+#include "postmaster/bgworker_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -99,6 +100,7 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
 	/* Set default value */
 	params.index_cleanup = VACOPT_TERNARY_DEFAULT;
 	params.truncate = VACOPT_TERNARY_DEFAULT;
+	params.nworkers = -1;
 
 	/* Parse options list */
 	foreach(lc, vacstmt->options)
@@ -129,6 +131,28 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
 			params.index_cleanup = get_vacopt_ternary_value(opt);
 		else if (strcmp(opt->defname, "truncate") == 0)
 			params.truncate = get_vacopt_ternary_value(opt);
+		else if (strcmp(opt->defname, "parallel") == 0)
+		{
+			if (opt->arg == NULL)
+			{
+				/*
+				 * Parallel lazy vacuum is requested but user didn't specify
+				 * the parallel degree. The parallel degree will be determined
+				 * at the start of lazy vacuum.
+				 */
+				params.nworkers = 0;
+			}
+			else
+			{
+				params.nworkers = defGetInt32(opt);
+				if (params.nworkers < 1 || params.nworkers > MAX_PARALLEL_WORKER_LIMIT)
+					ereport(ERROR,
+							(errcode(ERRCODE_SYNTAX_ERROR),
+							 errmsg("parallel vacuum degree must be between 1 and %d",
+									MAX_PARALLEL_WORKER_LIMIT),
+							 parser_errposition(pstate, opt->location)));
+			}
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -170,6 +194,11 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
 		}
 	}
 
+	if ((params.options & VACOPT_FULL) && params.nworkers >= 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("cannot specify FULL option with PARALLEL option")));
+
 	/*
 	 * All freeze ages are zero if the FREEZE option is given; otherwise pass
 	 * them as -1 which means to use the default values.
@@ -1742,6 +1771,22 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
 		return false;
 	}
 
+	/*
+	 * Since parallel workers cannot access data in temporary tables, parallel
+	 * vacuum is not allowed for temporary relation.
+	 */
+	if (RelationUsesLocalBuffers(onerel) && params->nworkers >= 0)
+	{
+		ereport(WARNING,
+				(errmsg("skipping vacuum on \"%s\" --- cannot vacuum temporary tables in parallel",
+						RelationGetRelationName(onerel))));
+		relation_close(onerel, lmode);
+		PopActiveSnapshot();
+		CommitTransactionCommand();
+		/* It's OK to proceed with ANALYZE on this table */
+		return true;
+	}
+
 	/*
 	 * Silently ignore partitioned tables as there is no work to be done.  The
 	 * useful work is on their child partitions, which have been queued up for
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index c1dd8168ca..c3690f9c41 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -2891,6 +2891,8 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map,
 			(!wraparound ? VACOPT_SKIP_LOCKED : 0);
 		tab->at_params.index_cleanup = VACOPT_TERNARY_DEFAULT;
 		tab->at_params.truncate = VACOPT_TERNARY_DEFAULT;
+		/* We don't support parallel vacuum for autovacuum for now */
+		tab->at_params.nworkers = -1;
 		tab->at_params.freeze_min_age = freeze_min_age;
 		tab->at_params.freeze_table_age = freeze_table_age;
 		tab->at_params.multixact_freeze_min_age = multixact_freeze_min_age;
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index e00dbab5aa..321a1511a8 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -3556,7 +3556,7 @@ psql_completion(const char *text, int start, int end)
 		if (ends_with(prev_wd, '(') || ends_with(prev_wd, ','))
 			COMPLETE_WITH("FULL", "FREEZE", "ANALYZE", "VERBOSE",
 						  "DISABLE_PAGE_SKIPPING", "SKIP_LOCKED",
-						  "INDEX_CLEANUP", "TRUNCATE");
+						  "INDEX_CLEANUP", "TRUNCATE", "PARALLEL");
 		else if (TailMatches("FULL|FREEZE|ANALYZE|VERBOSE|DISABLE_PAGE_SKIPPING|SKIP_LOCKED|INDEX_CLEANUP|TRUNCATE"))
 			COMPLETE_WITH("ON", "OFF");
 	}
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 858bcb6bc9..12065cc038 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -24,6 +24,8 @@
 #include "nodes/primnodes.h"
 #include "storage/bufpage.h"
 #include "storage/lockdefs.h"
+#include "storage/shm_toc.h"
+#include "storage/dsm.h"
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
 
@@ -193,6 +195,7 @@ extern Size SyncScanShmemSize(void);
 struct VacuumParams;
 extern void heap_vacuum_rel(Relation onerel,
 							struct VacuumParams *params, BufferAccessStrategy bstrategy);
+extern void heap_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc);
 
 /* in heap/heapam_visibility.c */
 extern bool HeapTupleSatisfiesVisibility(HeapTuple stup, Snapshot snapshot,
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 128f7ae65d..43702f2f86 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -184,6 +184,11 @@ typedef struct VacuumParams
 										 * default value depends on reloptions */
 	VacOptTernaryValue truncate;	/* Truncate empty pages at the end,
 									 * default value depends on reloptions */
+	/*
+	 * The number of parallel vacuum workers. -1 by default for no workers
+	 * and 0 for choosing based on the number of indexes.
+	 */
+	int			nworkers;
 } VacuumParams;
 
 /* GUC parameters */
diff --git a/src/test/regress/expected/vacuum.out b/src/test/regress/expected/vacuum.out
index aff0b10a93..91db6a10b0 100644
--- a/src/test/regress/expected/vacuum.out
+++ b/src/test/regress/expected/vacuum.out
@@ -92,6 +92,20 @@ CONTEXT:  SQL function "do_analyze" statement 1
 SQL function "wrap_do_analyze" statement 1
 VACUUM FULL vactst;
 VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
+-- PARALLEL option
+VACUUM (PARALLEL) vaccluster;
+VACUUM (PARALLEL 2) vaccluster;
+VACUUM (PARALLEL 0) vaccluster; -- error
+ERROR:  parallel vacuum degree must be between 1 and 1024
+LINE 1: VACUUM (PARALLEL 0) vaccluster;
+                ^
+VACUUM (PARALLEL 2, INDEX_CLEANUP FALSE) vaccluster;
+VACUUM (PARALLEL 2, FULL TRUE) vaccluster; -- error, cannot use both PARALLEL and FULL
+ERROR:  cannot specify FULL option with PARALLEL option
+CREATE TEMPORARY TABLE tmp (a int PRIMARY KEY);
+CREATE INDEX tmp_idx1 ON tmp (a);
+VACUUM (PARALLEL 1) tmp; -- error, cannot parallel vacuum temporary tables
+WARNING:  skipping vacuum on "tmp" --- cannot vacuum temporary tables in parallel
 -- INDEX_CLEANUP option
 CREATE TABLE no_index_cleanup (i INT PRIMARY KEY, t TEXT);
 -- Use uncompressed data stored in toast.
diff --git a/src/test/regress/sql/vacuum.sql b/src/test/regress/sql/vacuum.sql
index f0fee3af2b..66a9b110fe 100644
--- a/src/test/regress/sql/vacuum.sql
+++ b/src/test/regress/sql/vacuum.sql
@@ -75,6 +75,16 @@ VACUUM FULL vactst;
 
 VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
 
+-- PARALLEL option
+VACUUM (PARALLEL) vaccluster;
+VACUUM (PARALLEL 2) vaccluster;
+VACUUM (PARALLEL 0) vaccluster; -- error
+VACUUM (PARALLEL 2, INDEX_CLEANUP FALSE) vaccluster;
+VACUUM (PARALLEL 2, FULL TRUE) vaccluster; -- error, cannot use both PARALLEL and FULL
+CREATE TEMPORARY TABLE tmp (a int PRIMARY KEY);
+CREATE INDEX tmp_idx1 ON tmp (a);
+VACUUM (PARALLEL 1) tmp; -- error, cannot parallel vacuum temporary tables
+
 -- INDEX_CLEANUP option
 CREATE TABLE no_index_cleanup (i INT PRIMARY KEY, t TEXT);
 -- Use uncompressed data stored in toast.
-- 
2.22.0

From a4bce0f6d662e4e42d98d6a9ffe70728e254f64a Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.m...@gmail.com>
Date: Fri, 25 Oct 2019 21:56:24 +0900
Subject: [PATCH v31 6/6] PoC: shared vacuum cost balance

---
 src/backend/access/heap/vacuumlazy.c | 23 ++++++++-
 src/backend/commands/vacuum.c        | 72 +++++++++++++++++++++++-----
 src/include/access/heapam.h          |  1 +
 3 files changed, 81 insertions(+), 15 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 02040c837e..cf0ccee037 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -211,6 +211,13 @@ typedef struct LVShared
 	/* The number of indexes that do NOT support parallel index vacuuming */
 	int		nindexes_nonparallel;
 
+	/*
+	 * Shared vacuum cost balance.  During parallel index vacuuming
+	 * VacuumSharedCostBalance points to this value and it accumulates the
+	 * balance of each parallel vacuum workers.
+	 */
+	pg_atomic_uint32 cost_balance;
+
 	/*
 	 * Variables to control parallel index vacuuming.  Index statistics
 	 * returned from ambulkdelete and amvacuumcleanup is nullable variable
@@ -230,6 +237,9 @@ typedef struct LVShared
 #define IndStatsIsNull(s, i) \
 	(!(((LVShared *)(s))->bitmap[(i) >> 3] & (1 << ((i) & 0x07))))
 
+/* Global variable for shared cost-based vacuum delay */
+pg_atomic_uint32	*VacuumSharedCostBalance = NULL;
+
 /*
  * Struct for an index bulk-deletion statistic used for parallel lazy
  * vacuum. This is allocated in the DSM segment.  IndexBulkDeleteResult
@@ -1960,6 +1970,10 @@ lazy_parallel_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel,
 	Assert(ParallelVacuumIsActive(lps));
 	Assert(nindexes > 0);
 
+	/* Move the current balance to the shared value */
+	pg_atomic_write_u32(&(lps->lvshared->cost_balance), VacuumCostBalance);
+	VacuumCostBalance = 0;
+
 	LaunchParallelWorkers(lps->pcxt);
 
 	if (lps->lvshared->for_cleanup)
@@ -1987,11 +2001,15 @@ lazy_parallel_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel,
 	WaitForParallelWorkersToFinish(lps->pcxt);
 
 	/*
-	 * We need to reinitialize the parallel context as no more index vacuuming and
-	 * index cleanup will be performed after that.
+	 * We need neither to reinitialize the parallel context nor to reset vacuum cost
+	 * balance after index cleanup as no more index vacuuming and index cleanup will
+	 * be performed after that.
 	 */
 	if (!lps->lvshared->for_cleanup)
 	{
+		/* Continue to use the shared balance value */
+		VacuumCostBalance = pg_atomic_read_u32(&(lps->lvshared->cost_balance));
+
 		/* Reset the processing count */
 		pg_atomic_write_u32(&(lps->lvshared->nprocessed), 0);
 
@@ -2999,6 +3017,7 @@ begin_parallel_vacuum(LVRelStats *vacrelstats, Oid relid, BlockNumber nblocks,
 	shared->offset = add_size(SizeOfLVShared, BITMAPLEN(nindexes));
 	prepare_index_statistics(shared, Irel, nindexes);
 	pg_atomic_init_u32(&(shared->nprocessed), 0);
+	pg_atomic_init_u32(&(shared->cost_balance), 0);
 
 	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
 	lps->lvshared = shared;
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 9ada501709..7ace51e099 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -412,6 +412,7 @@ vacuum(List *relations, VacuumParams *params,
 		VacuumPageHit = 0;
 		VacuumPageMiss = 0;
 		VacuumPageDirty = 0;
+		VacuumSharedCostBalance = NULL;
 
 		/*
 		 * Loop to process each selected relation.
@@ -1990,28 +1991,73 @@ vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
 void
 vacuum_delay_point(void)
 {
+	bool require_sleep = false;
+
 	/* Always check for interrupts */
 	CHECK_FOR_INTERRUPTS();
 
-	/* Nap if appropriate */
-	if (VacuumCostActive && !InterruptPending &&
-		VacuumCostBalance >= VacuumCostLimit)
+	if (VacuumCostActive && !InterruptPending)
 	{
-		double		msec;
+		/*
+		 * If the vacuum cost balance is shared among parallel workers we
+		 * decide whether to sleep based on that.
+		 */
+		if (VacuumSharedCostBalance != NULL)
+		{
+			while (true)
+			{
+				uint32 shared_balance;
+				uint32 new_balance;
 
-		msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
-		if (msec > VacuumCostDelay * 4)
-			msec = VacuumCostDelay * 4;
+				require_sleep = false;
 
-		pg_usleep((long) (msec * 1000));
+				/* compute new balance by adding the local value */
+				shared_balance = pg_atomic_read_u32(VacuumSharedCostBalance);
+				new_balance = shared_balance + VacuumCostBalance;
 
-		VacuumCostBalance = 0;
+				if (new_balance >= VacuumCostLimit)
+				{
+					require_sleep = true;
+					new_balance -= VacuumCostLimit;
+				}
+
+				if (pg_atomic_compare_exchange_u32(VacuumSharedCostBalance,
+												   &shared_balance,
+												   new_balance))
+					break;
+			}
 
-		/* update balance values for workers */
-		AutoVacuumUpdateDelay();
+			/*
+			 * Reset the local balance as we accumulated it into the shared
+			 * value.
+			 */
+			VacuumCostBalance = 0;
+		}
+		else if (VacuumCostBalance >= VacuumCostLimit)
+		{
+			/* In single process vacuum check only the local balance */
+			require_sleep = true;
+		}
+
+		/* Nap if appropriate */
+		if (require_sleep)
+		{
+			double		msec;
 
-		/* Might have gotten an interrupt while sleeping */
-		CHECK_FOR_INTERRUPTS();
+			msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
+			if (msec > VacuumCostDelay * 4)
+				msec = VacuumCostDelay * 4;
+
+			pg_usleep((long) (msec * 1000));
+
+			VacuumCostBalance = 0;
+
+			/* update balance values for workers */
+			AutoVacuumUpdateDelay();
+
+			/* Might have gotten an interrupt while sleeping */
+			CHECK_FOR_INTERRUPTS();
+		}
 	}
 }
 
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 12065cc038..ac883f67d1 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -192,6 +192,7 @@ extern void SyncScanShmemInit(void);
 extern Size SyncScanShmemSize(void);
 
 /* in heap/vacuumlazy.c */
+extern pg_atomic_uint32	*VacuumSharedCostBalance;
 struct VacuumParams;
 extern void heap_vacuum_rel(Relation onerel,
 							struct VacuumParams *params, BufferAccessStrategy bstrategy);
-- 
2.22.0

From f8a88f7fcc4a19031fd4c42c9afa247b1655e51a Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.m...@gmail.com>
Date: Tue, 15 Oct 2019 17:03:22 +0900
Subject: [PATCH v31 1/6] Add an index AM field to check parallel index
 participation

---
 contrib/bloom/blutils.c                          | 1 +
 doc/src/sgml/indexam.sgml                        | 2 ++
 src/backend/access/brin/brin.c                   | 1 +
 src/backend/access/gin/ginutil.c                 | 1 +
 src/backend/access/gist/gist.c                   | 1 +
 src/backend/access/hash/hash.c                   | 1 +
 src/backend/access/nbtree/nbtree.c               | 1 +
 src/backend/access/spgist/spgutils.c             | 1 +
 src/include/access/amapi.h                       | 2 ++
 src/test/modules/dummy_index_am/dummy_index_am.c | 1 +
 10 files changed, 12 insertions(+)

diff --git a/contrib/bloom/blutils.c b/contrib/bloom/blutils.c
index 3d44616adc..98163c81bd 100644
--- a/contrib/bloom/blutils.c
+++ b/contrib/bloom/blutils.c
@@ -120,6 +120,7 @@ blhandler(PG_FUNCTION_ARGS)
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = false;
 	amroutine->amcanparallel = false;
+	amroutine->amcanparallelvacuum = true;
 	amroutine->amcaninclude = false;
 	amroutine->amkeytype = InvalidOid;
 
diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml
index dd54c68802..fa5682db04 100644
--- a/doc/src/sgml/indexam.sgml
+++ b/doc/src/sgml/indexam.sgml
@@ -120,6 +120,8 @@ typedef struct IndexAmRoutine
     bool        ampredlocks;
     /* does AM support parallel scan? */
     bool        amcanparallel;
+    /* does AM support parallel vacuum? */
+    bool        amcanparallelvacuum;
     /* does AM support columns included with clause INCLUDE? */
     bool        amcaninclude;
     /* type of data stored in index, or InvalidOid if variable */
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index ae7b729edd..6ea48fb555 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -100,6 +100,7 @@ brinhandler(PG_FUNCTION_ARGS)
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = false;
 	amroutine->amcanparallel = false;
+	amroutine->amcanparallelvacuum = true;
 	amroutine->amcaninclude = false;
 	amroutine->amkeytype = InvalidOid;
 
diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c
index cf9699ad18..0c33809c83 100644
--- a/src/backend/access/gin/ginutil.c
+++ b/src/backend/access/gin/ginutil.c
@@ -52,6 +52,7 @@ ginhandler(PG_FUNCTION_ARGS)
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = true;
 	amroutine->amcanparallel = false;
+	amroutine->amcanparallelvacuum = true;
 	amroutine->amcaninclude = false;
 	amroutine->amkeytype = InvalidOid;
 
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 0cc87911d6..0363bf814a 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -74,6 +74,7 @@ gisthandler(PG_FUNCTION_ARGS)
 	amroutine->amclusterable = true;
 	amroutine->ampredlocks = true;
 	amroutine->amcanparallel = false;
+	amroutine->amcanparallelvacuum = true;
 	amroutine->amcaninclude = true;
 	amroutine->amkeytype = InvalidOid;
 
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index 5cc30dac42..f21d9ac78f 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -73,6 +73,7 @@ hashhandler(PG_FUNCTION_ARGS)
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = true;
 	amroutine->amcanparallel = false;
+	amroutine->amcanparallelvacuum = true;
 	amroutine->amcaninclude = false;
 	amroutine->amkeytype = INT4OID;
 
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 4cfd5289ad..e885aadc21 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -122,6 +122,7 @@ bthandler(PG_FUNCTION_ARGS)
 	amroutine->amclusterable = true;
 	amroutine->ampredlocks = true;
 	amroutine->amcanparallel = true;
+	amroutine->amcanparallelvacuum = true;
 	amroutine->amcaninclude = true;
 	amroutine->amkeytype = InvalidOid;
 
diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c
index 45472db147..0c86b63f65 100644
--- a/src/backend/access/spgist/spgutils.c
+++ b/src/backend/access/spgist/spgutils.c
@@ -55,6 +55,7 @@ spghandler(PG_FUNCTION_ARGS)
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = false;
 	amroutine->amcanparallel = false;
+	amroutine->amcanparallelvacuum = true;
 	amroutine->amcaninclude = false;
 	amroutine->amkeytype = InvalidOid;
 
diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h
index 6e3db06eed..f7d2a1b7e3 100644
--- a/src/include/access/amapi.h
+++ b/src/include/access/amapi.h
@@ -195,6 +195,8 @@ typedef struct IndexAmRoutine
 	bool		ampredlocks;
 	/* does AM support parallel scan? */
 	bool		amcanparallel;
+	/* does AM support parallel vacuum? */
+	bool		amcanparallelvacuum;
 	/* does AM support columns included with clause INCLUDE? */
 	bool		amcaninclude;
 	/* type of data stored in index, or InvalidOid if variable */
diff --git a/src/test/modules/dummy_index_am/dummy_index_am.c b/src/test/modules/dummy_index_am/dummy_index_am.c
index bc68767f3a..f12eefbb24 100644
--- a/src/test/modules/dummy_index_am/dummy_index_am.c
+++ b/src/test/modules/dummy_index_am/dummy_index_am.c
@@ -300,6 +300,7 @@ dihandler(PG_FUNCTION_ARGS)
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = false;
 	amroutine->amcanparallel = false;
+	amroutine->amcanparallelvacuum = false;
 	amroutine->amcaninclude = false;
 	amroutine->amkeytype = InvalidOid;
 
-- 
2.22.0

Reply via email to