From c1a20d5c4de27f0059e4928405ed81c298c123d3 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 4 Jan 2021 13:35:35 +0900
Subject: [PATCH 2/3] Choose index vacuum strategy based on amvacuumstrategy
 IndexAM API.

If index_cleanup option is specified neither VACUUM command nor
storage option, lazy vacuum asks each index the vacuum strategy before
heap vacuum and decides whether or not to remove the collected garbage
tuples from the heap based on both the answers of amvacuumstrategy and
how many LP_DEAD items can be accumlated in a space of heap page left
by fillfactor.

The decision made by lazy vacuum is passed to ambulkdelete. Then each
index can choose whether or not to skip index bulk-deletion
accordingly.
---
 contrib/bloom/blvacuum.c                |   9 +-
 src/backend/access/brin/brin.c          |  10 +-
 src/backend/access/common/reloptions.c  |  35 ++-
 src/backend/access/gin/ginpostinglist.c |  30 +--
 src/backend/access/gin/ginvacuum.c      |  11 +
 src/backend/access/gist/gistvacuum.c    |  14 +-
 src/backend/access/hash/hash.c          |   7 +
 src/backend/access/heap/vacuumlazy.c    | 273 ++++++++++++++++++------
 src/backend/access/nbtree/nbtree.c      |  19 ++
 src/backend/access/spgist/spgvacuum.c   |  14 +-
 src/backend/catalog/index.c             |   2 +
 src/backend/commands/analyze.c          |   2 +
 src/backend/commands/vacuum.c           |  23 +-
 src/include/access/genam.h              |  15 ++
 src/include/access/htup_details.h       |  17 +-
 src/include/commands/vacuum.h           |  20 +-
 src/include/utils/rel.h                 |  17 +-
 17 files changed, 388 insertions(+), 130 deletions(-)

diff --git a/contrib/bloom/blvacuum.c b/contrib/bloom/blvacuum.c
index 982ebf97e6..9f8bfc2413 100644
--- a/contrib/bloom/blvacuum.c
+++ b/contrib/bloom/blvacuum.c
@@ -54,6 +54,13 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	BloomMetaPageData *metaData;
 	GenericXLogState *gxlogState;
 
+	/*
+	 * Skip deleting index entries if the corresponding heap tuples will
+	 * not be deleted.
+	 */
+	if (!info->will_vacuum_heap)
+		return NULL;
+
 	if (stats == NULL)
 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
 
@@ -181,7 +188,7 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 	BlockNumber npages,
 				blkno;
 
-	if (info->analyze_only)
+	if (info->analyze_only || !info->vacuumcleanup_requested)
 		return stats;
 
 	if (stats == NULL)
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 776ac3f64c..a429468c57 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -783,7 +783,8 @@ brinvacuumstrategy(IndexVacuumInfo *info)
 /*
  * brinbulkdelete
  *		Since there are no per-heap-tuple index tuples in BRIN indexes,
- *		there's not a lot we can do here.
+ *		there's not a lot we can do here regardless of
+ *		info->will_vacuum_heap.
  *
  * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
  * tuple is deleted), meaning the need to re-run summarization on the affected
@@ -809,8 +810,11 @@ brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 {
 	Relation	heapRel;
 
-	/* No-op in ANALYZE ONLY mode */
-	if (info->analyze_only)
+	/*
+	 * No-op in ANALYZE ONLY mode or when user requests to disable index
+	 * cleanup.
+	 */
+	if (info->analyze_only || !info->vacuumcleanup_requested)
 		return stats;
 
 	if (!stats)
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index c687d3ee9e..3080cbbc6b 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -27,6 +27,7 @@
 #include "catalog/pg_type.h"
 #include "commands/defrem.h"
 #include "commands/tablespace.h"
+#include "commands/vacuum.h"
 #include "commands/view.h"
 #include "nodes/makefuncs.h"
 #include "postmaster/postmaster.h"
@@ -140,15 +141,6 @@ static relopt_bool boolRelOpts[] =
 		},
 		false
 	},
-	{
-		{
-			"vacuum_index_cleanup",
-			"Enables index vacuuming and index cleanup",
-			RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
-			ShareUpdateExclusiveLock
-		},
-		true
-	},
 	{
 		{
 			"vacuum_truncate",
@@ -492,6 +484,18 @@ relopt_enum_elt_def viewCheckOptValues[] =
 	{(const char *) NULL}		/* list terminator */
 };
 
+/* values from VacOptTernaryValue */
+relopt_enum_elt_def vacOptTernaryOptValues[] =
+{
+	{"default", VACOPT_TERNARY_DEFAULT},
+	{"true", VACOPT_TERNARY_ENABLED},
+	{"false", VACOPT_TERNARY_DISABLED},
+	{"on", VACOPT_TERNARY_ENABLED},
+	{"off", VACOPT_TERNARY_DISABLED},
+	{"1", VACOPT_TERNARY_ENABLED},
+	{"0", VACOPT_TERNARY_DISABLED}
+};
+
 static relopt_enum enumRelOpts[] =
 {
 	{
@@ -516,6 +520,17 @@ static relopt_enum enumRelOpts[] =
 		VIEW_OPTION_CHECK_OPTION_NOT_SET,
 		gettext_noop("Valid values are \"local\" and \"cascaded\".")
 	},
+	{
+		{
+			"vacuum_index_cleanup",
+			"Enables index vacuuming and index cleanup",
+			RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+			ShareUpdateExclusiveLock
+		},
+		vacOptTernaryOptValues,
+		VACOPT_TERNARY_DEFAULT,
+		gettext_noop("Valid values are \"on\", \"off\", and \"default\".")
+	},
 	/* list terminator */
 	{{NULL}}
 };
@@ -1856,7 +1871,7 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind)
 		offsetof(StdRdOptions, user_catalog_table)},
 		{"parallel_workers", RELOPT_TYPE_INT,
 		offsetof(StdRdOptions, parallel_workers)},
-		{"vacuum_index_cleanup", RELOPT_TYPE_BOOL,
+		{"vacuum_index_cleanup", RELOPT_TYPE_ENUM,
 		offsetof(StdRdOptions, vacuum_index_cleanup)},
 		{"vacuum_truncate", RELOPT_TYPE_BOOL,
 		offsetof(StdRdOptions, vacuum_truncate)}
diff --git a/src/backend/access/gin/ginpostinglist.c b/src/backend/access/gin/ginpostinglist.c
index 216b2b9a2c..e49c94b860 100644
--- a/src/backend/access/gin/ginpostinglist.c
+++ b/src/backend/access/gin/ginpostinglist.c
@@ -22,29 +22,29 @@
 
 /*
  * For encoding purposes, item pointers are represented as 64-bit unsigned
- * integers. The lowest 11 bits represent the offset number, and the next
- * lowest 32 bits are the block number. That leaves 21 bits unused, i.e.
- * only 43 low bits are used.
+ * integers. The lowest 13 bits represent the offset number, and the next
+ * lowest 32 bits are the block number. That leaves 19 bits unused, i.e.
+ * only 45 low bits are used.
  *
- * 11 bits is enough for the offset number, because MaxHeapTuplesPerPage <
- * 2^11 on all supported block sizes. We are frugal with the bits, because
+ * 13 bits is enough for the offset number, because MaxHeapTuplesPerPage <
+ * 2^13 on all supported block sizes. We are frugal with the bits, because
  * smaller integers use fewer bytes in the varbyte encoding, saving disk
  * space. (If we get a new table AM in the future that wants to use the full
  * range of possible offset numbers, we'll need to change this.)
  *
- * These 43-bit integers are encoded using varbyte encoding. In each byte,
+ * These 45-bit integers are encoded using varbyte encoding. In each byte,
  * the 7 low bits contain data, while the highest bit is a continuation bit.
  * When the continuation bit is set, the next byte is part of the same
- * integer, otherwise this is the last byte of this integer. 43 bits need
+ * integer, otherwise this is the last byte of this integer. 45 bits need
  * at most 7 bytes in this encoding:
  *
  * 0XXXXXXX
- * 1XXXXXXX 0XXXXYYY
- * 1XXXXXXX 1XXXXYYY 0YYYYYYY
- * 1XXXXXXX 1XXXXYYY 1YYYYYYY 0YYYYYYY
- * 1XXXXXXX 1XXXXYYY 1YYYYYYY 1YYYYYYY 0YYYYYYY
- * 1XXXXXXX 1XXXXYYY 1YYYYYYY 1YYYYYYY 1YYYYYYY 0YYYYYYY
- * 1XXXXXXX 1XXXXYYY 1YYYYYYY 1YYYYYYY 1YYYYYYY 1YYYYYYY 0uuuuuuY
+ * 1XXXXXXX 0XXXXXXY
+ * 1XXXXXXX 1XXXXXXY 0YYYYYYY
+ * 1XXXXXXX 1XXXXXXY 1YYYYYYY 0YYYYYYY
+ * 1XXXXXXX 1XXXXXXY 1YYYYYYY 1YYYYYYY 0YYYYYYY
+ * 1XXXXXXX 1XXXXXXY 1YYYYYYY 1YYYYYYY 1YYYYYYY 0YYYYYYY
+ * 1XXXXXXX 1XXXXXXY 1YYYYYYY 1YYYYYYY 1YYYYYYY 1YYYYYYY 0uuuuYYY
  *
  * X = bits used for offset number
  * Y = bits used for block number
@@ -73,12 +73,12 @@
 
 /*
  * How many bits do you need to encode offset number? OffsetNumber is a 16-bit
- * integer, but you can't fit that many items on a page. 11 ought to be more
+ * integer, but you can't fit that many items on a page. 13 ought to be more
  * than enough. It's tempting to derive this from MaxHeapTuplesPerPage, and
  * use the minimum number of bits, but that would require changing the on-disk
  * format if MaxHeapTuplesPerPage changes. Better to leave some slack.
  */
-#define MaxHeapTuplesPerPageBits		11
+#define MaxHeapTuplesPerPageBits		13
 
 /* Max. number of bytes needed to encode the largest supported integer. */
 #define MaxBytesPerInteger				7
diff --git a/src/backend/access/gin/ginvacuum.c b/src/backend/access/gin/ginvacuum.c
index 4bd6d32435..3972c758d0 100644
--- a/src/backend/access/gin/ginvacuum.c
+++ b/src/backend/access/gin/ginvacuum.c
@@ -580,6 +580,13 @@ ginbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	BlockNumber rootOfPostingTree[BLCKSZ / (sizeof(IndexTupleData) + sizeof(ItemId))];
 	uint32		nRoot;
 
+	/*
+	 * Skip deleting index entries if the corresponding heap tuples will
+	 * not be deleted.
+	 */
+	if (!info->will_vacuum_heap)
+		return NULL;
+
 	gvs.tmpCxt = AllocSetContextCreate(CurrentMemoryContext,
 									   "Gin vacuum temporary context",
 									   ALLOCSET_DEFAULT_SIZES);
@@ -717,6 +724,10 @@ ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 		return stats;
 	}
 
+	/* Skip index cleanup if user requests to disable */
+	if (!info->vacuumcleanup_requested)
+		return stats;
+
 	/*
 	 * Set up all-zero stats and cleanup pending inserts if ginbulkdelete
 	 * wasn't called
diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c
index 7dc8c3d860..883d2e9d8d 100644
--- a/src/backend/access/gist/gistvacuum.c
+++ b/src/backend/access/gist/gistvacuum.c
@@ -68,6 +68,13 @@ IndexBulkDeleteResult *
 gistbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 			   IndexBulkDeleteCallback callback, void *callback_state)
 {
+	/*
+	 * Skip deleting index entries if the corresponding heap tuples will
+	 * not be deleted.
+	 */
+	if (!info->will_vacuum_heap)
+		return NULL;
+
 	/* allocate stats if first time through, else re-use existing struct */
 	if (stats == NULL)
 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
@@ -83,8 +90,11 @@ gistbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 IndexBulkDeleteResult *
 gistvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 {
-	/* No-op in ANALYZE ONLY mode */
-	if (info->analyze_only)
+	/*
+	 * No-op in ANALYZE ONLY mode or when user requests to disable index
+	 * cleanup.
+	 */
+	if (info->analyze_only || !info->vacuumcleanup_requested)
 		return stats;
 
 	/*
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index fd4626dfab..07348b39e0 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -478,6 +478,13 @@ hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	HashMetaPage metap;
 	HashMetaPage cachedmetap;
 
+	/*
+	 * Skip deleting index entries if the corresponding heap tuples will
+	 * not be deleted.
+	 */
+	if (!info->will_vacuum_heap)
+		return NULL;
+
 	tuples_removed = 0;
 	num_index_tuples = 0;
 
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index f3d2265fad..d77616a7a1 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -214,6 +214,18 @@ typedef struct LVShared
 	double		reltuples;
 	bool		estimated_count;
 
+	/*
+	 * Copied from LVRelStats. It tells index AM that lazy vacuum will remove
+	 * dead tuples from the heap after index vacuum.
+	 */
+	bool vacuum_heap;
+
+	/*
+	 * Copied from LVRelStats. It tells index AM whether amvacuumcleanup is
+	 * requested or not.
+	 */
+	bool vacuumcleanup_requested;
+
 	/*
 	 * In single process lazy vacuum we could consume more memory during index
 	 * vacuuming or cleanup apart from the memory for heap scanning.  In
@@ -293,8 +305,8 @@ typedef struct LVRelStats
 {
 	char	   *relnamespace;
 	char	   *relname;
-	/* useindex = true means two-pass strategy; false means one-pass */
-	bool		useindex;
+	/* hasindex = true means two-pass strategy; false means one-pass */
+	bool		hasindex;
 	/* Overall statistics about rel */
 	BlockNumber old_rel_pages;	/* previous value of pg_class.relpages */
 	BlockNumber rel_pages;		/* total number of pages */
@@ -313,6 +325,8 @@ typedef struct LVRelStats
 	int			num_index_scans;
 	TransactionId latestRemovedXid;
 	bool		lock_waiter_detected;
+	bool		vacuum_heap;	/* do we remove dead tuples from the heap? */
+	bool		vacuumcleanup_requested; /* INDEX_CLEANUP is set to false */
 
 	/* Used for error callback */
 	char	   *indname;
@@ -343,6 +357,13 @@ static BufferAccessStrategy vac_strategy;
 static void lazy_scan_heap(Relation onerel, VacuumParams *params,
 						   LVRelStats *vacrelstats, Relation *Irel, int nindexes,
 						   bool aggressive);
+static void choose_vacuum_strategy(Relation onerel, LVRelStats *vacrelstats,
+								   VacuumParams *params, Relation *Irel,
+								   int nindexes, int ndeaditems);
+static void lazy_vacuum_table_and_indexes(Relation onerel, VacuumParams *params,
+										  LVRelStats *vacrelstats, Relation *Irel,
+										  int nindexes, IndexBulkDeleteResult **stats,
+										  LVParallelState *lps, int *maxdeadtups);
 static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
 static bool lazy_check_needs_freeze(Buffer buf, bool *hastup,
 									LVRelStats *vacrelstats);
@@ -442,7 +463,6 @@ heap_vacuum_rel(Relation onerel, VacuumParams *params,
 	ErrorContextCallback errcallback;
 
 	Assert(params != NULL);
-	Assert(params->index_cleanup != VACOPT_TERNARY_DEFAULT);
 	Assert(params->truncate != VACOPT_TERNARY_DEFAULT);
 
 	/* not every AM requires these to be valid, but heap does */
@@ -501,8 +521,7 @@ heap_vacuum_rel(Relation onerel, VacuumParams *params,
 
 	/* Open all indexes of the relation */
 	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
-	vacrelstats->useindex = (nindexes > 0 &&
-							 params->index_cleanup == VACOPT_TERNARY_ENABLED);
+	vacrelstats->hasindex = (nindexes > 0);
 
 	/*
 	 * Setup error traceback support for ereport().  The idea is to set up an
@@ -763,6 +782,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 	BlockNumber empty_pages,
 				vacuumed_pages,
 				next_fsm_block_to_vacuum;
+	int			maxdeadtups = 0;	/* maximum # of dead tuples in a single page */
 	double		num_tuples,		/* total number of nonremovable tuples */
 				live_tuples,	/* live tuples (reltuples estimate) */
 				tups_vacuumed,	/* tuples cleaned up by vacuum */
@@ -811,14 +831,26 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 	vacrelstats->nonempty_pages = 0;
 	vacrelstats->latestRemovedXid = InvalidTransactionId;
 
+	/*
+	 * index vacuum cleanup is enabled if index cleanup is not disabled,
+	 * i.g., either default or enabled. For index bulk-deletion, it will
+	 * be decided by choose_vacuum_strategy() when INDEX_CLEANUP option is
+	 * default.
+	 */
+	vacrelstats->vacuumcleanup_requested =
+		(params->index_cleanup != VACOPT_TERNARY_DISABLED);
+
 	vistest = GlobalVisTestFor(onerel);
 
 	/*
 	 * Initialize state for a parallel vacuum.  As of now, only one worker can
 	 * be used for an index, so we invoke parallelism only if there are at
-	 * least two indexes on a table.
+	 * least two indexes on a table. When the index cleanup is disabled,
+	 * since index bulk-deletion is likely to be no-op we disable a parallel
+	 * vacuum.
 	 */
-	if (params->nworkers >= 0 && vacrelstats->useindex && nindexes > 1)
+	if (params->nworkers >= 0 && nindexes > 1 &&
+		params->index_cleanup != VACOPT_TERNARY_DISABLED)
 	{
 		/*
 		 * Since parallel workers cannot access data in temporary tables, we
@@ -1050,19 +1082,10 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 				vmbuffer = InvalidBuffer;
 			}
 
-			/* Work on all the indexes, then the heap */
-			lazy_vacuum_all_indexes(onerel, Irel, indstats,
-									vacrelstats, lps, nindexes);
-
-			/* Remove tuples from heap */
-			lazy_vacuum_heap(onerel, vacrelstats);
-
-			/*
-			 * Forget the now-vacuumed tuples, and press on, but be careful
-			 * not to reset latestRemovedXid since we want that value to be
-			 * valid.
-			 */
-			dead_tuples->num_tuples = 0;
+			/* Vacuum the table and its indexes */
+			lazy_vacuum_table_and_indexes(onerel, params, vacrelstats,
+										  Irel, nindexes, indstats,
+										  lps, &maxdeadtups);
 
 			/*
 			 * Vacuum the Free Space Map to make newly-freed space visible on
@@ -1512,32 +1535,16 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 
 		/*
 		 * If there are no indexes we can vacuum the page right now instead of
-		 * doing a second scan. Also we don't do that but forget dead tuples
-		 * when index cleanup is disabled.
+		 * doing a second scan.
 		 */
-		if (!vacrelstats->useindex && dead_tuples->num_tuples > 0)
+		if (!vacrelstats->hasindex && dead_tuples->num_tuples > 0)
 		{
-			if (nindexes == 0)
-			{
-				/* Remove tuples from heap if the table has no index */
-				lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer);
-				vacuumed_pages++;
-				has_dead_tuples = false;
-			}
-			else
-			{
-				/*
-				 * Here, we have indexes but index cleanup is disabled.
-				 * Instead of vacuuming the dead tuples on the heap, we just
-				 * forget them.
-				 *
-				 * Note that vacrelstats->dead_tuples could have tuples which
-				 * became dead after HOT-pruning but are not marked dead yet.
-				 * We do not process them because it's a very rare condition,
-				 * and the next vacuum will process them anyway.
-				 */
-				Assert(params->index_cleanup == VACOPT_TERNARY_DISABLED);
-			}
+			Assert(nindexes == 0);
+
+			/* Remove tuples from heap if the table has no index */
+			lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer);
+			vacuumed_pages++;
+			has_dead_tuples = false;
 
 			/*
 			 * Forget the now-vacuumed tuples, and press on, but be careful
@@ -1663,6 +1670,9 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 		 */
 		if (dead_tuples->num_tuples == prev_dead_count)
 			RecordPageWithFreeSpace(onerel, blkno, freespace);
+		else
+			maxdeadtups = Max(maxdeadtups,
+							  dead_tuples->num_tuples - prev_dead_count);
 	}
 
 	/* report that everything is scanned and vacuumed */
@@ -1702,14 +1712,9 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 	/* If any tuples need to be deleted, perform final vacuum cycle */
 	/* XXX put a threshold on min number of tuples here? */
 	if (dead_tuples->num_tuples > 0)
-	{
-		/* Work on all the indexes, and then the heap */
-		lazy_vacuum_all_indexes(onerel, Irel, indstats, vacrelstats,
-								lps, nindexes);
-
-		/* Remove tuples from heap */
-		lazy_vacuum_heap(onerel, vacrelstats);
-	}
+		lazy_vacuum_table_and_indexes(onerel, params, vacrelstats,
+									  Irel, nindexes, indstats,
+									  lps, &maxdeadtups);
 
 	/*
 	 * Vacuum the remainder of the Free Space Map.  We must do this whether or
@@ -1722,7 +1727,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno);
 
 	/* Do post-vacuum cleanup */
-	if (vacrelstats->useindex)
+	if (vacrelstats->hasindex)
 		lazy_cleanup_all_indexes(Irel, indstats, vacrelstats, lps, nindexes);
 
 	/*
@@ -1775,6 +1780,134 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 	pfree(buf.data);
 }
 
+/*
+ * Remove the collected garbage tuples from the table and its indexes.
+ */
+static void
+lazy_vacuum_table_and_indexes(Relation onerel, VacuumParams *params,
+							  LVRelStats *vacrelstats, Relation *Irel,
+							  int nindexes, IndexBulkDeleteResult **indstats,
+							  LVParallelState *lps, int *maxdeadtups)
+{
+	/*
+	 * Choose the vacuum strategy for this vacuum cycle.
+	 * choose_vacuum_strategy() will set the decision to
+	 * vacrelstats->vacuum_heap.
+	 */
+	choose_vacuum_strategy(onerel, vacrelstats, params, Irel, nindexes,
+						   *maxdeadtups);
+
+	/* Work on all the indexes, then the heap */
+	lazy_vacuum_all_indexes(onerel, Irel, indstats, vacrelstats, lps,
+							nindexes);
+
+	if (vacrelstats->vacuum_heap)
+	{
+		/* Remove tuples from heap */
+		lazy_vacuum_heap(onerel, vacrelstats);
+	}
+	else
+	{
+		/*
+		 * Here, we don't do heap vacuum in this cycle.
+		 *
+		 * Note that vacrelstats->dead_tuples could have tuples which
+		 * became dead after HOT-pruning but are not marked dead yet.
+		 * We do not process them because it's a very rare condition,
+		 * and the next vacuum will process them anyway.
+		 */
+		Assert(params->index_cleanup != VACOPT_TERNARY_ENABLED);
+	}
+
+	/*
+	 * Forget the now-vacuumed tuples, and press on, but be careful
+	 * not to reset latestRemovedXid since we want that value to be
+	 * valid.
+	 */
+	vacrelstats->dead_tuples->num_tuples = 0;
+	*maxdeadtups = 0;
+}
+
+/*
+ * Decide whether or not we remove the collected garbage tuples from the
+ * heap. The decision is set to vacrelstats->vacuum_heap. ndeaditems is
+ * maximum number of LP_DEAD items on any one heap page encountered during
+ * heap scan.
+ */
+static void
+choose_vacuum_strategy(Relation onerel, LVRelStats *vacrelstats,
+					   VacuumParams *params, Relation *Irel, int nindexes,
+					   int ndeaditems)
+{
+	bool vacuum_heap = true;
+
+	/*
+	 * If index cleanup option is specified, we use it.
+	 *
+	 * XXX: should we call amvacuumstrategy even if INDEX_CLEANUP
+	 * is specified?
+	 */
+	if (params->index_cleanup == VACOPT_TERNARY_ENABLED)
+		vacuum_heap = true;
+	else if (params->index_cleanup == VACOPT_TERNARY_DISABLED)
+		vacuum_heap = false;
+	else
+	{
+		int i;
+
+		/*
+		 * If index cleanup option is not specified, we decide the vacuum
+		 * strategy based on the returned values from amvacuumstrategy.
+		 * If even one index returns 'none', we skip heap vacuum in this
+		 * vacuum cycle.
+		 */
+		for (i = 0; i < nindexes; i++)
+		{
+			IndexVacuumStrategy ivacstrat;
+			IndexVacuumInfo ivinfo;
+
+			ivinfo.index = Irel[i];
+			ivinfo.message_level = elevel;
+
+			ivacstrat = index_vacuum_strategy(&ivinfo);
+
+			if (ivacstrat == INDEX_VACUUM_STRATEGY_NONE)
+			{
+				vacuum_heap = false;
+				break;
+			}
+		}
+
+		if (!vacuum_heap)
+		{
+			Size freespace = RelationGetTargetPageFreeSpace(onerel,
+															HEAP_DEFAULT_FILLFACTOR);
+			int ndeaditems_limit = (int) ((freespace / sizeof(ItemIdData)) * 0.7);
+
+			/*
+			 * The test of ndeaditems_limit is for the maximum number of LP_DEAD
+			 * items on any one heap page encountered during heap scan by caller.
+			 * The general idea here is to preserve the original pristine state of
+			 * the table when it is subject to constant non-HOT updates when heap
+			 * fill factor is reduced from its default.
+			 *
+			 * ndeaditems_limit is calculated by using the freespace left by
+			 * fillfactor -- we can fit (freespace / sizeof(ItemIdData)) LP_DEAD
+			 * items on heap pages before they start to "overflow" with that setting.
+			 * We're trying to avoid having VACUUM call lazy_vacuum_heap() in most
+			 * cases, but we don't want to be too aggressive: it would be risky to
+			 * make the value we test for much higher, since it might be too late
+			 * by the time we actually call lazy_vacuum_heap(). So  multiply by 0.7
+			 * is the safety factor.
+			 */
+			if (ndeaditems > ndeaditems_limit)
+				vacuum_heap = true;
+		}
+	}
+
+	vacrelstats->vacuum_heap = vacuum_heap;
+}
+
 /*
  *	lazy_vacuum_all_indexes() -- vacuum all indexes of relation.
  *
@@ -1827,7 +1960,6 @@ lazy_vacuum_all_indexes(Relation onerel, Relation *Irel,
 								 vacrelstats->num_index_scans);
 }
 
-
 /*
  *	lazy_vacuum_heap() -- second pass over the heap
  *
@@ -2120,6 +2252,10 @@ lazy_parallel_vacuum_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
 	 */
 	nworkers = Min(nworkers, lps->pcxt->nworkers);
 
+	/* Copy the information to the shared state */
+	lps->lvshared->vacuum_heap = vacrelstats->vacuum_heap;
+	lps->lvshared->vacuumcleanup_requested = vacrelstats->vacuumcleanup_requested;
+
 	/* Setup the shared cost-based vacuum delay and launch workers */
 	if (nworkers > 0)
 	{
@@ -2444,6 +2580,8 @@ lazy_vacuum_index(Relation indrel, IndexBulkDeleteResult **stats,
 	ivinfo.message_level = elevel;
 	ivinfo.num_heap_tuples = reltuples;
 	ivinfo.strategy = vac_strategy;
+	ivinfo.will_vacuum_heap = vacrelstats->vacuum_heap;
+	ivinfo.vacuumcleanup_requested = false;
 
 	/*
 	 * Update error traceback information.
@@ -2461,11 +2599,16 @@ lazy_vacuum_index(Relation indrel, IndexBulkDeleteResult **stats,
 	*stats = index_bulk_delete(&ivinfo, *stats,
 							   lazy_tid_reaped, (void *) dead_tuples);
 
-	ereport(elevel,
-			(errmsg("scanned index \"%s\" to remove %d row versions",
-					vacrelstats->indname,
-					dead_tuples->num_tuples),
-			 errdetail_internal("%s", pg_rusage_show(&ru0))));
+	/*
+	 * XXX: we don't want to report if ambulkdelete was no-op because of
+	 * will_vacuum_heap. But we cannot know it was or not.
+	 */
+	if (*stats)
+		ereport(elevel,
+				(errmsg("scanned index \"%s\" to remove %d row versions",
+						vacrelstats->indname,
+						dead_tuples->num_tuples),
+				 errdetail_internal("%s", pg_rusage_show(&ru0))));
 
 	/* Revert to the previous phase information for error traceback */
 	restore_vacuum_error_info(vacrelstats, &saved_err_info);
@@ -2495,9 +2638,10 @@ lazy_cleanup_index(Relation indrel,
 	ivinfo.report_progress = false;
 	ivinfo.estimated_count = estimated_count;
 	ivinfo.message_level = elevel;
-
 	ivinfo.num_heap_tuples = reltuples;
 	ivinfo.strategy = vac_strategy;
+	ivinfo.will_vacuum_heap = false;
+	ivinfo.vacuumcleanup_requested = vacrelstats->vacuumcleanup_requested;
 
 	/*
 	 * Update error traceback information.
@@ -2844,14 +2988,14 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
  * Return the maximum number of dead tuples we can record.
  */
 static long
-compute_max_dead_tuples(BlockNumber relblocks, bool useindex)
+compute_max_dead_tuples(BlockNumber relblocks, bool hasindex)
 {
 	long		maxtuples;
 	int			vac_work_mem = IsAutoVacuumWorkerProcess() &&
 	autovacuum_work_mem != -1 ?
 	autovacuum_work_mem : maintenance_work_mem;
 
-	if (useindex)
+	if (hasindex)
 	{
 		maxtuples = MAXDEADTUPLES(vac_work_mem * 1024L);
 		maxtuples = Min(maxtuples, INT_MAX);
@@ -2881,7 +3025,7 @@ lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
 	LVDeadTuples *dead_tuples = NULL;
 	long		maxtuples;
 
-	maxtuples = compute_max_dead_tuples(relblocks, vacrelstats->useindex);
+	maxtuples = compute_max_dead_tuples(relblocks, vacrelstats->hasindex);
 
 	dead_tuples = (LVDeadTuples *) palloc(SizeOfDeadTuples(maxtuples));
 	dead_tuples->num_tuples = 0;
@@ -3573,6 +3717,9 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	vacrelstats.indname = NULL;
 	vacrelstats.phase = VACUUM_ERRCB_PHASE_UNKNOWN; /* Not yet processing */
 
+	vacrelstats.vacuum_heap = lvshared->vacuum_heap;
+	vacrelstats.vacuumcleanup_requested = lvshared->vacuumcleanup_requested;
+
 	/* Setup error traceback support for ereport() */
 	errcallback.callback = vacuum_error_callback;
 	errcallback.arg = &vacrelstats;
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 800f7a14b6..c9a177d5e1 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -822,6 +822,18 @@ _bt_vacuum_needs_cleanup(IndexVacuumInfo *info)
 		 */
 		result = true;
 	}
+	else if (!info->vacuumcleanup_requested)
+	{
+		/*
+		 * Skip cleanup if INDEX_CLEANUP is set to false, even if there might
+		 * be a deleted page that can be recycled. If INDEX_CLEANUP continues
+		 * to be disabled, recyclable pages could be left by XID wraparound.
+		 * But in practice it's not so harmful since such workload doesn't need
+		 * to delete and recycle pages in any case and deletion of btree index
+		 * pages is relatively rare.
+		 */
+		result = false;
+	}
 	else if (TransactionIdIsValid(metad->btm_oldest_btpo_xact) &&
 			 GlobalVisCheckRemovableXid(NULL, metad->btm_oldest_btpo_xact))
 	{
@@ -887,6 +899,13 @@ btbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	Relation	rel = info->index;
 	BTCycleId	cycleid;
 
+	/*
+	 * Skip deleting index entries if the corresponding heap tuples will
+	 * not be deleted.
+	 */
+	if (!info->will_vacuum_heap)
+		return NULL;
+
 	/* allocate stats if first time through, else re-use existing struct */
 	if (stats == NULL)
 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c
index 1df6dfd5da..cc645435ea 100644
--- a/src/backend/access/spgist/spgvacuum.c
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -916,6 +916,13 @@ spgbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 {
 	spgBulkDeleteState bds;
 
+	/*
+	 * Skip deleting index entries if the corresponding heap tuples will
+	 * not be deleted.
+	 */
+	if (!info->will_vacuum_heap)
+		return NULL;
+
 	/* allocate stats if first time through, else re-use existing struct */
 	if (stats == NULL)
 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
@@ -946,8 +953,11 @@ spgvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 {
 	spgBulkDeleteState bds;
 
-	/* No-op in ANALYZE ONLY mode */
-	if (info->analyze_only)
+	/*
+	 * No-op in ANALYZE ONLY mode or when user requests to disable index
+	 * cleanup.
+	 */
+	if (info->analyze_only || !info->vacuumcleanup_requested)
 		return stats;
 
 	/*
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index cffbc0ac38..b0fde8eaad 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -3401,6 +3401,8 @@ validate_index(Oid heapId, Oid indexId, Snapshot snapshot)
 	ivinfo.message_level = DEBUG2;
 	ivinfo.num_heap_tuples = heapRelation->rd_rel->reltuples;
 	ivinfo.strategy = NULL;
+	ivinfo.will_vacuum_heap = true;
+	ivinfo.vacuumcleanup_requested = true;
 
 	/*
 	 * Encode TIDs as int8 values for the sort, rather than directly sorting
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 7295cf0215..5f9960c710 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -668,6 +668,8 @@ do_analyze_rel(Relation onerel, VacuumParams *params,
 			ivinfo.message_level = elevel;
 			ivinfo.num_heap_tuples = onerel->rd_rel->reltuples;
 			ivinfo.strategy = vac_strategy;
+			ivinfo.will_vacuum_heap = false;
+			ivinfo.vacuumcleanup_requested = true;
 
 			stats = index_vacuum_cleanup(&ivinfo, NULL);
 
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index b97d48ee01..079f0a44c9 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -1870,17 +1870,20 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
 	onerelid = onerel->rd_lockInfo.lockRelId;
 	LockRelationIdForSession(&onerelid, lmode);
 
-	/* Set index cleanup option based on reloptions if not yet */
-	if (params->index_cleanup == VACOPT_TERNARY_DEFAULT)
-	{
-		if (onerel->rd_options == NULL ||
-			((StdRdOptions *) onerel->rd_options)->vacuum_index_cleanup)
-			params->index_cleanup = VACOPT_TERNARY_ENABLED;
-		else
-			params->index_cleanup = VACOPT_TERNARY_DISABLED;
-	}
+	/*
+	 * Set index cleanup option if vacuum_index_cleanup reloption is set.
+	 * Otherwise we leave it as 'default', which means that we choose vacuum
+	 * strategy based on the table and index status. See choose_vacuum_strategy().
+	 */
+	if (params->index_cleanup == VACOPT_TERNARY_DEFAULT &&
+		onerel->rd_options != NULL)
+		params->index_cleanup =
+			((StdRdOptions *) onerel->rd_options)->vacuum_index_cleanup;
 
-	/* Set truncate option based on reloptions if not yet */
+	/*
+	 * Set truncate option based on reloptions if not yet. Truncate option
+	 * is true by default.
+	 */
 	if (params->truncate == VACOPT_TERNARY_DEFAULT)
 	{
 		if (onerel->rd_options == NULL ||
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index 112b90e4cf..0798f39d39 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -48,6 +48,21 @@ typedef struct IndexVacuumInfo
 	bool		analyze_only;	/* ANALYZE (without any actual vacuum) */
 	bool		report_progress;	/* emit progress.h status reports */
 	bool		estimated_count;	/* num_heap_tuples is an estimate */
+
+	/*
+	 * True if lazy vacuum won't delete the collected garbage tuples from
+	 * the heap.  In this case, the index AM can skip index bulk-deletion
+	 * safely. This field is used only when ambulkdelete.
+	 */
+	bool		will_vacuum_heap;
+
+	/*
+	 * amvacuumcleanup is requested by lazy vacuum. If false, the index AM
+	 * can skip index cleanup. This can be false if INDEX_CLEANUP vacuum option
+	 * is set to false. This field is used only when amvacuumcleanup.
+	 */
+	bool		vacuumcleanup_requested;
+
 	int			message_level;	/* ereport level for progress messages */
 	double		num_heap_tuples;	/* tuples remaining in heap */
 	BufferAccessStrategy strategy;	/* access strategy for reads */
diff --git a/src/include/access/htup_details.h b/src/include/access/htup_details.h
index 7c62852e7f..038e7cd580 100644
--- a/src/include/access/htup_details.h
+++ b/src/include/access/htup_details.h
@@ -563,17 +563,18 @@ do { \
 /*
  * MaxHeapTuplesPerPage is an upper bound on the number of tuples that can
  * fit on one heap page.  (Note that indexes could have more, because they
- * use a smaller tuple header.)  We arrive at the divisor because each tuple
- * must be maxaligned, and it must have an associated line pointer.
+ * use a smaller tuple header.)  We arrive at the divisor because each line
+ * pointer must be maxaligned.
  *
- * Note: with HOT, there could theoretically be more line pointers (not actual
- * tuples) than this on a heap page.  However we constrain the number of line
- * pointers to this anyway, to avoid excessive line-pointer bloat and not
- * require increases in the size of work arrays.
+ * We used to constrain the number of line pointers to avoid excessive
+ * line-pointer bloat and not require increases in the size of work arrays.
+ * But since index vacuum strategy had entered the picture, accumulating
+ * LP_DEAD line pointer has value of skipping index deletion.
+ *
+ * XXX: allowing to fill the heap page with only line pointer seems a overkill.
  */
 #define MaxHeapTuplesPerPage	\
-	((int) ((BLCKSZ - SizeOfPageHeaderData) / \
-			(MAXALIGN(SizeofHeapTupleHeader) + sizeof(ItemIdData))))
+	((int) ((BLCKSZ - SizeOfPageHeaderData) / (MAXALIGN(sizeof(ItemIdData)))))
 
 /*
  * MaxAttrSize is a somewhat arbitrary upper limit on the declared size of
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 857509287d..c3ea4e1cb8 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -21,6 +21,7 @@
 #include "parser/parse_node.h"
 #include "storage/buf.h"
 #include "storage/lock.h"
+#include "utils/rel.h"
 #include "utils/relcache.h"
 
 /*
@@ -186,19 +187,6 @@ typedef enum VacuumOption
 	VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7	/* don't skip any pages */
 } VacuumOption;
 
-/*
- * A ternary value used by vacuum parameters.
- *
- * DEFAULT value is used to determine the value based on other
- * configurations, e.g. reloptions.
- */
-typedef enum VacOptTernaryValue
-{
-	VACOPT_TERNARY_DEFAULT = 0,
-	VACOPT_TERNARY_DISABLED,
-	VACOPT_TERNARY_ENABLED,
-} VacOptTernaryValue;
-
 /*
  * Parameters customizing behavior of VACUUM and ANALYZE.
  *
@@ -218,8 +206,10 @@ typedef struct VacuumParams
 	int			log_min_duration;	/* minimum execution threshold in ms at
 									 * which  verbose logs are activated, -1
 									 * to use default */
-	VacOptTernaryValue index_cleanup;	/* Do index vacuum and cleanup,
-										 * default value depends on reloptions */
+	VacOptTernaryValue index_cleanup;	/* Do index vacuum and cleanup. In
+										 * default mode, it's decided based on
+										 * multiple factors. See
+										 * choose_vacuum_strategy. */
 	VacOptTernaryValue truncate;	/* Truncate empty pages at the end,
 									 * default value depends on reloptions */
 
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 10b63982c0..168dc5d466 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -295,6 +295,20 @@ typedef struct AutoVacOpts
 	float8		analyze_scale_factor;
 } AutoVacOpts;
 
+/*
+ * A ternary value used by vacuum parameters. This value also is used
+ * for VACUUM command options.
+ *
+ * DEFAULT value is used to determine the value based on other
+ * configurations, e.g. reloptions.
+ */
+typedef enum VacOptTernaryValue
+{
+	VACOPT_TERNARY_DEFAULT = 0,
+	VACOPT_TERNARY_DISABLED,
+	VACOPT_TERNARY_ENABLED,
+} VacOptTernaryValue;
+
 typedef struct StdRdOptions
 {
 	int32		vl_len_;		/* varlena header (do not touch directly!) */
@@ -304,7 +318,8 @@ typedef struct StdRdOptions
 	AutoVacOpts autovacuum;		/* autovacuum-related options */
 	bool		user_catalog_table; /* use as an additional catalog relation */
 	int			parallel_workers;	/* max number of parallel workers */
-	bool		vacuum_index_cleanup;	/* enables index vacuuming and cleanup */
+	VacOptTernaryValue	vacuum_index_cleanup;	/* enables index vacuuming
+												 * and cleanup */
 	bool		vacuum_truncate;	/* enables vacuum to truncate a relation */
 } StdRdOptions;
 
-- 
2.27.0

