From 904ae364b7493bd17496a87b2eefa6dffe642d4b Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Wed, 9 Jan 2019 09:22:20 +0900
Subject: [PATCH v4] Add DISABLE_INDEX_CLEANUP option to VACUUM command.

---
 doc/src/sgml/ref/vacuum.sgml         |  20 ++++-
 src/backend/commands/vacuum.c        |   8 +-
 src/backend/commands/vacuumlazy.c    | 138 +++++++++++++++++++++++++++--------
 src/backend/parser/gram.y            |   2 +
 src/include/nodes/parsenodes.h       |   4 +-
 src/test/regress/expected/vacuum.out |   4 +
 src/test/regress/sql/vacuum.sql      |   3 +
 7 files changed, 146 insertions(+), 33 deletions(-)

diff --git a/doc/src/sgml/ref/vacuum.sgml b/doc/src/sgml/ref/vacuum.sgml
index fd911f5..6f6cc44 100644
--- a/doc/src/sgml/ref/vacuum.sgml
+++ b/doc/src/sgml/ref/vacuum.sgml
@@ -31,6 +31,7 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
     VERBOSE
     ANALYZE
     DISABLE_PAGE_SKIPPING
+    DISABLE_INDEX_CLEANUP
     SKIP_LOCKED
 
 <phrase>and <replaceable class="parameter">table_and_columns</replaceable> is:</phrase>
@@ -161,7 +162,24 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
     </listitem>
    </varlistentry>
 
-   <varlistentry>
+    <varlistentry>
+    <term><literal>DISABLE_INDEX_CLEANUP</literal></term>
+    <listitem>
+     <para>
+      <command>VACUUM</command> removes dead tuples and prunes HOT-updated
+      tuples chain for live tuples on table. If the table has any dead tuple
+      it removes them from both table and indexes for re-use. With this
+      option <command>VACUUM</command> marks tuples as dead (i.e., it doesn't
+      remove tuple storage) and disables removing dead tuples from indexes.
+      This is suitable for avoiding transaction ID wraparound but not
+      sufficient for avoiding index bloat. This option is ignored if the
+      table doesn't have index. Also, this cannot be used in conjunction
+      with <literal>FULL</literal> option.
+     </para>
+    </listitem>
+   </varlistentry>
+
+    <varlistentry>
     <term><literal>SKIP_LOCKED</literal></term>
     <listitem>
      <para>
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index ff1e178..5f0e7a4 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -204,7 +204,8 @@ vacuum(int options, List *relations, VacuumParams *params,
 						stmttype)));
 
 	/*
-	 * Sanity check DISABLE_PAGE_SKIPPING option.
+	 * Sanity check DISABLE_PAGE_SKIPPING option and DISABLE_INDEX_CLEANUP
+	 * option.
 	 */
 	if ((options & VACOPT_FULL) != 0 &&
 		(options & VACOPT_DISABLE_PAGE_SKIPPING) != 0)
@@ -212,6 +213,11 @@ vacuum(int options, List *relations, VacuumParams *params,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("VACUUM option DISABLE_PAGE_SKIPPING cannot be used with FULL")));
 
+	if ((options & VACOPT_FULL) != 0 &&
+		(options & VACOPT_DISABLE_INDEX_CLEANUP) != 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("VACUUM option DISABLE_INDEX_CLEANUP cannot be used with FULL")));
 	/*
 	 * Send info about dead objects to the statistics collector, unless we are
 	 * in autovacuum --- autovacuum.c does this for itself.
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index b67267f..656786f 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -113,7 +113,10 @@
 
 typedef struct LVRelStats
 {
-	/* hasindex = true means two-pass strategy; false means one-pass */
+	/*
+	 * hasindex = true means two-pass strategy; false means one-pass. But we
+	 * always use the one-pass strategy when index vacuum is disabled.
+	 */
 	bool		hasindex;
 	/* Overall statistics about rel */
 	BlockNumber old_rel_pages;	/* previous value of pg_class.relpages */
@@ -163,12 +166,14 @@ static void lazy_cleanup_index(Relation indrel,
 				   IndexBulkDeleteResult *stats,
 				   LVRelStats *vacrelstats);
 static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
-				 int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer);
+							int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer,
+							bool setdead);
 static bool should_attempt_truncation(LVRelStats *vacrelstats);
 static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
 static BlockNumber count_nondeletable_pages(Relation onerel,
 						 LVRelStats *vacrelstats);
-static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
+static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks,
+							 bool skip_index_vacuum);
 static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
 					   ItemPointer itemptr);
 static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
@@ -261,6 +266,12 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
 	vacrelstats->hasindex = (nindexes > 0);
 
+	/* Notify user that DISABLE_INDEX_CLEANUP option is ignored */
+	if (!vacrelstats->hasindex && (options & VACOPT_DISABLE_INDEX_CLEANUP))
+		ereport(NOTICE,
+				(errmsg("DISABLE_INDEX_CLEANUP is ignored because table \"%s\" does not have index",
+						RelationGetRelationName(onerel))));
+
 	/* Do the vacuuming */
 	lazy_scan_heap(onerel, options, vacrelstats, Irel, nindexes, aggressive);
 
@@ -284,9 +295,11 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 		scanned_all_unfrozen = true;
 
 	/*
-	 * Optionally truncate the relation.
+	 * Optionally truncate the relation. If DISABLE_INDEX_CLEANUP is specified,
+	 * since we skipped to remove dead tuples there is no need to truncate heap.
 	 */
-	if (should_attempt_truncation(vacrelstats))
+	if (should_attempt_truncation(vacrelstats) &&
+		(options & VACOPT_DISABLE_INDEX_CLEANUP) == 0)
 		lazy_truncate_heap(onerel, vacrelstats);
 
 	/* Report that we are now doing final cleanup */
@@ -493,6 +506,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 	Buffer		vmbuffer = InvalidBuffer;
 	BlockNumber next_unskippable_block;
 	bool		skipping_blocks;
+	bool		skip_index_vacuum;
 	xl_heap_freeze_tuple *frozen;
 	StringInfoData buf;
 	const int	initprog_index[] = {
@@ -530,7 +544,14 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 	vacrelstats->nonempty_pages = 0;
 	vacrelstats->latestRemovedXid = InvalidTransactionId;
 
-	lazy_space_alloc(vacrelstats, nblocks);
+	/*
+	 * Skip index vacuum if it's requested for table with indexes. In this
+	 * case, we use the one-pass strategy and don't remove tuple storage.
+	 */
+	skip_index_vacuum =
+		(options & VACOPT_DISABLE_INDEX_CLEANUP) != 0 && vacrelstats->hasindex;
+
+		lazy_space_alloc(vacrelstats, nblocks, skip_index_vacuum);
 	frozen = palloc(sizeof(xl_heap_freeze_tuple) * MaxHeapTuplesPerPage);
 
 	/* Report that we're scanning the heap, advertising total # of blocks */
@@ -723,6 +744,8 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			};
 			int64		hvp_val[2];
 
+			Assert(!skip_index_vacuum);
+
 			/*
 			 * Before beginning index vacuuming, we release any pin we may
 			 * hold on the visibility map page.  This isn't necessary for
@@ -1201,14 +1224,21 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		}
 
 		/*
-		 * If there are no indexes then we can vacuum the page right now
-		 * instead of doing a second scan.
+		 * If either there are no indexes or skip index vacuums then we can
+		 * vacuum the page right now instead of doing a second scan.
 		 */
-		if (nindexes == 0 &&
+		if ((nindexes == 0 || skip_index_vacuum) &&
 			vacrelstats->num_dead_tuples > 0)
 		{
-			/* Remove tuples from heap */
-			lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer);
+			/*
+			 * If we skip index vacuum, we set the recorded tuples as
+			 * dead, which removes its tuple storage but leaves ItemIDs.
+			 * By leaving dead ItemIDs an index lookup can properly see the
+			 * tuple is already dead. The remained dead ItemIds will be
+			 * removed by the next vacuum enabled the index cleanup.
+			 */
+			lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer,
+							 skip_index_vacuum);
 			has_dead_tuples = false;
 
 			/*
@@ -1374,6 +1404,8 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		};
 		int64		hvp_val[2];
 
+		Assert(!skip_index_vacuum);
+
 		/* Log cleanup info before we touch indexes */
 		vacuum_log_cleanup_info(onerel, vacrelstats);
 
@@ -1412,15 +1444,24 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 								 PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
 
 	/* Do post-vacuum cleanup and statistics update for each index */
-	for (i = 0; i < nindexes; i++)
-		lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
+	if (!skip_index_vacuum)
+		for (i = 0; i < nindexes; i++)
+			lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
 
-	/* If no indexes, make log report that lazy_vacuum_heap would've made */
+	/* Make log report that lazy_vacuum_heap would've made */
 	if (vacuumed_pages)
-		ereport(elevel,
-				(errmsg("\"%s\": removed %.0f row versions in %u pages",
-						RelationGetRelationName(onerel),
-						tups_vacuumed, vacuumed_pages)));
+	{
+		if (skip_index_vacuum)
+			ereport(elevel,
+					(errmsg("\"%s\": marked %.0f row versions as dead in %u pages",
+							RelationGetRelationName(onerel),
+							tups_vacuumed, vacuumed_pages)));
+		else
+			ereport(elevel,
+					(errmsg("\"%s\": removed %.0f row versions in %u pages",
+							RelationGetRelationName(onerel),
+							tups_vacuumed, vacuumed_pages)));
+	}
 
 	/*
 	 * This is pretty messy, but we split it up so that we can skip emitting
@@ -1498,7 +1539,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
 			continue;
 		}
 		tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats,
-									&vmbuffer);
+									&vmbuffer, false);
 
 		/* Now that we've compacted the page, record its available space */
 		page = BufferGetPage(buf);
@@ -1526,6 +1567,10 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
  *	lazy_vacuum_page() -- free dead tuples on a page
  *					 and repair its fragmentation.
  *
+ * If 'setdead' is true (e.g., when index cleanup is disabled) then we
+ * set dead tuples as DEAD instead of as UNUSED and skip the visibility
+ * update.
+ *
  * Caller must hold pin and buffer cleanup lock on the buffer.
  *
  * tupindex is the index in vacrelstats->dead_tuples of the first dead
@@ -1534,11 +1579,12 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
  */
 static int
 lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
-				 int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer)
+				 int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer,
+				 bool setdead)
 {
 	Page		page = BufferGetPage(buffer);
-	OffsetNumber unused[MaxOffsetNumber];
-	int			uncnt = 0;
+	OffsetNumber items[MaxOffsetNumber];
+	int			cnt = 0;
 	TransactionId visibility_cutoff_xid;
 	bool		all_frozen;
 
@@ -1557,8 +1603,23 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 			break;				/* past end of tuples for this block */
 		toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]);
 		itemid = PageGetItemId(page, toff);
-		ItemIdSetUnused(itemid);
-		unused[uncnt++] = toff;
+
+		if (setdead)
+		{
+			/*
+			 * When index cleanup is disabled, vacrelstats->dead_tuples could
+			 * have tuples that has already been set as DEAD by HOT-pruning.
+			 * So skip them to not WAL-logging again.
+			 */
+			if (ItemIdIsDead(itemid))
+				continue;
+
+			ItemIdSetDead(itemid);
+		}
+		else
+			ItemIdSetUnused(itemid);
+
+		items[cnt++] = toff;
 	}
 
 	PageRepairFragmentation(page);
@@ -1573,10 +1634,19 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 	{
 		XLogRecPtr	recptr;
 
-		recptr = log_heap_clean(onerel, buffer,
-								NULL, 0, NULL, 0,
-								unused, uncnt,
-								vacrelstats->latestRemovedXid);
+		if (setdead)
+			recptr = log_heap_clean(onerel, buffer,
+									NULL, 0,	/* redirected */
+									items, cnt,	/* dead */
+									NULL, 0,	/* unused */
+									vacrelstats->latestRemovedXid);
+		else
+			recptr = log_heap_clean(onerel, buffer,
+									NULL, 0,	/* redirected */
+									NULL, 0,	/* dead */
+									items, cnt,	/* unused */
+									vacrelstats->latestRemovedXid);
+
 		PageSetLSN(page, recptr);
 	}
 
@@ -1589,6 +1659,13 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 	END_CRIT_SECTION();
 
 	/*
+	 * Since we didn't actually remove dead tuples the visibility is not
+	 * changed.
+	 */
+	if (setdead)
+		return tupindex;
+
+	/*
 	 * Now that we have removed the dead tuples from the page, once again
 	 * check if the page has become all-visible.  The page is already marked
 	 * dirty, exclusively locked, and, if needed, a full page image has been
@@ -2079,14 +2156,15 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
  * See the comments at the head of this file for rationale.
  */
 static void
-lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
+lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks,
+				 bool skip_index_vacuum)
 {
 	long		maxtuples;
 	int			vac_work_mem = IsAutoVacuumWorkerProcess() &&
 	autovacuum_work_mem != -1 ?
 	autovacuum_work_mem : maintenance_work_mem;
 
-	if (vacrelstats->hasindex)
+	if (vacrelstats->hasindex && !skip_index_vacuum)
 	{
 		maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData);
 		maxtuples = Min(maxtuples, INT_MAX);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index c086235..e688c04 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -10511,6 +10511,8 @@ vacuum_option_elem:
 				{
 					if (strcmp($1, "disable_page_skipping") == 0)
 						$$ = VACOPT_DISABLE_PAGE_SKIPPING;
+					else if (strcmp($1, "disable_index_cleanup") == 0)
+						$$ = VACOPT_DISABLE_INDEX_CLEANUP;
 					else if (strcmp($1, "skip_locked") == 0)
 						$$ = VACOPT_SKIP_LOCKED;
 					else
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 27782fe..8266eaf 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3152,7 +3152,9 @@ typedef enum VacuumOption
 	VACOPT_FULL = 1 << 4,		/* FULL (non-concurrent) vacuum */
 	VACOPT_SKIP_LOCKED = 1 << 5,	/* skip if cannot get lock */
 	VACOPT_SKIPTOAST = 1 << 6,	/* don't process the TOAST table, if any */
-	VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7	/* don't skip any pages */
+	VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7,	/* don't skip any pages */
+	VACOPT_DISABLE_INDEX_CLEANUP = 1 << 8	/* don't remove dead tuple and
+											 * cleanup indexes */
 } VacuumOption;
 
 /*
diff --git a/src/test/regress/expected/vacuum.out b/src/test/regress/expected/vacuum.out
index fa9d663..402a8be 100644
--- a/src/test/regress/expected/vacuum.out
+++ b/src/test/regress/expected/vacuum.out
@@ -80,6 +80,10 @@ CONTEXT:  SQL function "do_analyze" statement 1
 SQL function "wrap_do_analyze" statement 1
 VACUUM FULL vactst;
 VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
+VACUUM (DISABLE_INDEX_CLEANUP) vaccluster;
+VACUUM (DISABLE_INDEX_CLEANUP) vactst; -- DISABLE_INDEX_CLEANUP is ignored
+NOTICE:  DISABLE_INDEX_CLEANUP is ignored because table "vactst" does not have index
+VACUUM (DISABLE_INDEX_CLEANUP, FREEZE) vaccluster;
 -- partitioned table
 CREATE TABLE vacparted (a int, b char) PARTITION BY LIST (a);
 CREATE TABLE vacparted1 PARTITION OF vacparted FOR VALUES IN (1);
diff --git a/src/test/regress/sql/vacuum.sql b/src/test/regress/sql/vacuum.sql
index 9defa0d..9c4bdb7 100644
--- a/src/test/regress/sql/vacuum.sql
+++ b/src/test/regress/sql/vacuum.sql
@@ -61,6 +61,9 @@ VACUUM FULL vaccluster;
 VACUUM FULL vactst;
 
 VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
+VACUUM (DISABLE_INDEX_CLEANUP) vaccluster;
+VACUUM (DISABLE_INDEX_CLEANUP) vactst; -- DISABLE_INDEX_CLEANUP is ignored
+VACUUM (DISABLE_INDEX_CLEANUP, FREEZE) vaccluster;
 
 -- partitioned table
 CREATE TABLE vacparted (a int, b char) PARTITION BY LIST (a);
-- 
2.10.5

