On Mon, Sep 27, 2021 at 8:48 AM Masahiko Sawada <sawada.m...@gmail.com> wrote:
>
Hi,

Here is the first WIP patch for the decoupling table and index vacuum.
The first mail of the thread has already explained the complete
background of why we want to do this so instead of describing that I
will directly jump into explaining what these patches do.

Currently, the table vacuum and index vacuum are executed as a single
operation.  Basically, we vacuum the table by performing hot pruning
and remembering dead items in the cache and then we perform the index
vacuum and perform the second pass of the heap vacuum under which we
mark items unused.

In this patch, we make these multiple vacuum passes as independent
operations.  So the idea is that we provide multiple vacuum options
under that, the user can perform the independent operations i.e.
"VACUUM (heap_hot_prune) tbl_name" for performing just the hot prune
(first vacuum) pass, "VACUUM (heap_vacuum) tbl_name" for the second
heap pass to set dead item unused for which index vacuum is done. And
additionally, we are now allowing users to just perform the index
vacuum i.e. "VACUUM idx_name".

So under the heap_hot_prune pass, we will generate the dead tids and
instead of directly performing the index vacuum we will flush those
dead tids into the conveyor belt using Deadtidstore interfaces.  Then
in the index pass, we will read the data from the conveyor belt and
perform the index vacuum and at last, in the heap_vacuum pass, we will
read the data from the conveyor belt and mark all dead items unused.
However, in the second pass, we can only mark those items unused which
are dead, and for which all the indexes for the table are already
vacuumed.  So for identifying that in the pg_class entry we store the
conveyor belt pageno up to which we have already done the index vacuum
for the index related entry and we have already done the heap_vacuum
pass for the table related entry.  Additionally while doing the
hot_prune pass we also check if the item is already dead and index
vacuum is also done for that then we directly set it unused, for this,
we use Deadtidstore interfaces.

Deadtidstore provides interfaces over the conveyor belt for storing
and retrieving dead tids into the conveyor belt.  This module
maintains a DeadTidState which keeps track of the current insertion
progress i.e the first and the last conveyor belt page for the current
vacuum run. And on the completion of the vacuum run, this takes care
of setting the complete vacuum run bound by storing the last conveyor
belt pageno of the current vacuum run into the special space of the
first conveyor belt page for this run.  This also provides the
infrastructure to avoid adding duplicate tids into the conveyor belt.
Basically, if we perform the first vacuum pass multiple times without
executing the second vacuum pass then it is possible that we encounter
the same dead tids in the conveyor belt so this module maintains a
cache over the conveyor belt such that it only loads the data into the
cache w.r.t the current block the vacuum is processing so we don't
need to maintain a huge cache.

Test example:

CREATE TABLE t (a int);
CREATE INDEX idx on t(a);
INSERT INTO t VALUES (generate_series(1,1000000));
DELETE FROM t where a > 300;
VACUUM (heap_hot_prune) t;
VACUUM idx;
"VACUUM (heap_vacuum) t;

TODO:
- This is just a POC patch to discuss the design idea and needs a lot
of improvement and testing.
- We are using a slightly different format for storing the dead tids
into the conveyor belt which is explained in the patch but the
traditional multi-pass vacuum is still using the same format (array of
ItemPointeData), so we need to unify that format.
- Performance testing.
- Cleaner interfaces so that we can easily be integrated with auto
vacuum, currently, this is not provided for the manual vacuum.
- Add test cases.

Patches can be applied on the latest conveyor belt patches[1]

[1] 
https://www.postgresql.org/message-id/CAFiTN-sQUddO9JPiH3tz%2BvbNqRqi_pgndecy8k2yXAnO3ymqZA%40mail.gmail.com

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
From 7fb07fcb8aca82e3629dcf90113c1bbaf511c343 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Sat, 23 Oct 2021 15:33:13 +0530
Subject: [PATCH v1 1/3] Add DEADTID_FORK

---
 src/common/relpath.c         | 3 ++-
 src/include/common/relpath.h | 5 +++--
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/src/common/relpath.c b/src/common/relpath.c
index 636c96e..617b720 100644
--- a/src/common/relpath.c
+++ b/src/common/relpath.c
@@ -34,7 +34,8 @@ const char *const forkNames[] = {
 	"main",						/* MAIN_FORKNUM */
 	"fsm",						/* FSM_FORKNUM */
 	"vm",						/* VISIBILITYMAP_FORKNUM */
-	"init"						/* INIT_FORKNUM */
+	"init",						/* INIT_FORKNUM */
+	"tid"						/* DEADTID_FORKNUM */
 };
 
 StaticAssertDecl(lengthof(forkNames) == (MAX_FORKNUM + 1),
diff --git a/src/include/common/relpath.h b/src/include/common/relpath.h
index a4b5dc8..b110d82 100644
--- a/src/include/common/relpath.h
+++ b/src/include/common/relpath.h
@@ -43,7 +43,8 @@ typedef enum ForkNumber
 	MAIN_FORKNUM = 0,
 	FSM_FORKNUM,
 	VISIBILITYMAP_FORKNUM,
-	INIT_FORKNUM
+	INIT_FORKNUM,
+	DEADTID_FORKNUM
 
 	/*
 	 * NOTE: if you add a new fork, change MAX_FORKNUM and possibly
@@ -52,7 +53,7 @@ typedef enum ForkNumber
 	 */
 } ForkNumber;
 
-#define MAX_FORKNUM		INIT_FORKNUM
+#define MAX_FORKNUM		DEADTID_FORKNUM
 
 #define FORKNAMECHARS	4		/* max chars for a fork name */
 
-- 
1.8.3.1

From 68a3eadde9776785763bb54cca22d9fd6c25a1b2 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Wed, 26 Jan 2022 10:05:22 +0530
Subject: [PATCH v1 3/3] Decoupling table and index vacuum
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Currently, the table vacuum and index vacuum are executed as a single
operation.  Basically, we vacuum the table by performing hot pruning
and remembering dead items in the cache and then we perform the index
vacuum and perform the second pass of the heap vacuum under that we
mark items unused.  But, the problem with this is that every index has
its own needs, which could be separate from the needs of the tables.

In this patch, we make these multiple vacuum passes as independent
operations.  So the idea that we provide multiple vacuum options
under that the user can perform the independent operations
i.e. "VACUUM (heap_hot_prune) tbl_name" for performing just the
hot prune (first vacuum) pass, "VACUUM (heap_vacuum) tbl_name"
for the second heap pass to set dead item unused for which index
vacuum is done.  And additionally, we are now allowing users to just
perform the index vacuum i.e. "VACUUM idx_name".

So under the heap_hot_prune pass, we will generate the dead tids and
instead of directly performing the index vacuum we will flush those
dead tids to the conveyor belt using Deadtidstore interfaces.  Then
in the index pass, we will read the data from the conveyor belt and
perform the index vacuum and at last, in the heap_vacuum pass, we
will read the data from the conveyor belt and mark all dead items
unused.  However, in the second pass we can only mark those items
unused which are dead, and for which all the indexes for the table are
already vacuumed.  So for identifying that in the pg_class entry we
store the conveyor belt pageno up to which we have already done the
index vacuum for the index related entry and we have already done the
heap_vacuum pass for the table related entry.
---
 src/backend/access/heap/vacuumlazy.c | 782 ++++++++++++++++++++++++++++++++---
 src/backend/catalog/heap.c           |   1 +
 src/backend/commands/vacuum.c        |  48 ++-
 src/backend/utils/cache/relcache.c   |   1 +
 src/include/access/genam.h           |   3 +
 src/include/catalog/pg_class.h       |   5 +-
 src/include/commands/vacuum.h        |   3 +
 7 files changed, 783 insertions(+), 60 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 1749cc2..803ea59 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -35,6 +35,7 @@
 #include <math.h>
 
 #include "access/amapi.h"
+#include "access/deadtidstore.h"
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/heapam_xlog.h"
@@ -45,6 +46,7 @@
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "catalog/index.h"
+#include "catalog/indexing.h"
 #include "catalog/storage.h"
 #include "commands/dbcommands.h"
 #include "commands/progress.h"
@@ -62,6 +64,8 @@
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/pg_rusage.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
 #include "utils/timestamp.h"
 
 
@@ -125,6 +129,13 @@
  */
 #define ParallelVacuumIsActive(vacrel) ((vacrel)->pvs != NULL)
 
+/*
+ * Number of pages to store the dead items into the cache and total size of the
+ * dead item cache.  For details refer comments atop LVDeadOffsetCache.
+ */
+#define	DEAD_ITEM_CACHE_NUM_PAGE	2
+#define DEAD_ITEM_CACHE_SIZE	DTS_PageMaxDataSpace * DEAD_ITEM_CACHE_NUM_PAGE
+
 /* Phases of vacuum during which we report error context. */
 typedef enum
 {
@@ -136,6 +147,40 @@ typedef enum
 	VACUUM_ERRCB_PHASE_TRUNCATE
 } VacErrPhase;
 
+/*
+ * LVDeadOffsetCache maintains a temporary cache for holding dead items in the
+ * memory and tracking the size of the dead item so that we know exactly when
+ * to flush them to the cache.
+ *
+ * Although we are keeping the dead item in the cache, the format of this is
+ * not exactly the same as an array of ItemPointerData.  Instead, we use this
+ * slightly different format, which saves us memory and disc space. Basically
+ * they are stored as stored as [DTS_BlockHeader] followed by an array of
+ * [OffsetNumber].  We only convert this into the ItemPointerData array when we
+ * perform the index vacuum since the index vacuum expects data to be in that
+ * format.
+ *
+ * Ideally, we can keep the cache as one big single buffer but the problem is
+ * that while flushing into the conveyor belt if we could not fit all the data
+ * for a block in the remaining space of the page then we will have to do the
+ * page split i.e. we need to split the offset between two pages and
+ * accordingly we will have to update the DTS_BlockHeader as well.  So in order
+ * to avoid processing the data while flushing into the conveyor belt, we
+ * maintain this cache directly in form of conveyor belt pages so that we can
+ * directly flush page by page into the conveyor belt without any processing.
+ *
+ * XXX eventually we want to get rid of VacDeadItems cache and maintain a
+ * single format.
+ */
+typedef struct LVDeadOffsetCache
+{
+	int		activepageno;	/* Current active page for writing. */
+	int		writeoffset;	/* Write offset in active page. */
+	int		num_items;		/* Total item collected so far. */
+	int	   *datasizes;		/* Size for each page. */
+	char   *data;			/* Actual data. */
+} LVDeadOffsetCache;
+
 typedef struct LVRelState
 {
 	/* Target heap relation and its indexes */
@@ -187,6 +232,7 @@ typedef struct LVRelState
 	 * LP_UNUSED during second heap pass.
 	 */
 	VacDeadItems *dead_items;	/* TIDs whose index tuples we'll delete */
+	LVDeadOffsetCache *dead_itemcache;	/* Dead item cache. */
 	BlockNumber rel_pages;		/* total number of pages */
 	BlockNumber scanned_pages;	/* number of pages we examined */
 	BlockNumber pinskipped_pages;	/* # of pages skipped due to a pin */
@@ -210,6 +256,9 @@ typedef struct LVRelState
 									 * table */
 	int64		num_tuples;		/* total number of nonremovable tuples */
 	int64		live_tuples;	/* live tuples (reltuples estimate) */
+
+	/* dead tid state for storing and getting dead items. */
+	DTS_DeadTidState *deadtidstate;
 } LVRelState;
 
 /*
@@ -245,10 +294,11 @@ static void lazy_scan_heap(LVRelState *vacrel, VacuumParams *params,
 static void lazy_scan_prune(LVRelState *vacrel, Buffer buf,
 							BlockNumber blkno, Page page,
 							GlobalVisState *vistest,
-							LVPagePruneState *prunestate);
+							LVPagePruneState *prunestate,
+							bool hot_prune_only);
 static void lazy_vacuum(LVRelState *vacrel);
 static bool lazy_vacuum_all_indexes(LVRelState *vacrel);
-static void lazy_vacuum_heap_rel(LVRelState *vacrel);
+static void lazy_vacuum_heap_rel(LVRelState *vacrel, bool heap_vacuum_only);
 static int	lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
 								  Buffer buffer, int index, Buffer *vmbuffer);
 static bool lazy_check_needs_freeze(Buffer buf, bool *hastup,
@@ -268,6 +318,7 @@ static bool should_attempt_truncation(LVRelState *vacrel);
 static void lazy_truncate_heap(LVRelState *vacrel);
 static BlockNumber count_nondeletable_pages(LVRelState *vacrel,
 											bool *lock_waiter_detected);
+static int dead_items_max_items(LVRelState *vacrel);
 static void dead_items_alloc(LVRelState *vacrel, int nworkers);
 static void dead_items_cleanup(LVRelState *vacrel);
 static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
@@ -280,6 +331,15 @@ static void update_vacuum_error_info(LVRelState *vacrel,
 									 OffsetNumber offnum);
 static void restore_vacuum_error_info(LVRelState *vacrel,
 									  const LVSavedErrInfo *saved_vacrel);
+static void lazy_init_vacrel(LVRelState *vacrel, BlockNumber nblocks);
+static void lazy_vacuum_heap(LVRelState *vacrel, VacuumParams *params);
+static void UpdateRelationLastVaccumPage(Oid relid, CBPageNo pageno);
+static void store_deaditems(LVRelState *vacrel, BlockNumber blkno,
+							int noffsets, OffsetNumber *offsets);
+static void copy_deaditems_to_cache(LVDeadOffsetCache *dead_items,
+									BlockNumber blkno,
+									int noffsets,  OffsetNumber *offsets);
+static CBPageNo get_min_indexvac_page(LVRelState *vacrel);
 
 
 /*
@@ -456,10 +516,15 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	vacrel->MultiXactCutoff = MultiXactCutoff;
 
 	/*
-	 * Call lazy_scan_heap to perform all required heap pruning, index
-	 * vacuuming, and heap vacuuming (plus related processing)
+	 * If only heap vacuum is requested then call lazy_vacuum_heap to perform
+	 * the heap vacuuming.  Otherwise, call lazy_scan_heap to perform all
+	 * required heap pruning, index vacuuming, and heap vacuuming
+	 * (plus related processing).
 	 */
-	lazy_scan_heap(vacrel, params, aggressive);
+	if (params->options & VACOPT_HEAP_VACUUM)
+		lazy_vacuum_heap(vacrel, params);
+	else
+		lazy_scan_heap(vacrel, params, aggressive);
 
 	/* Done with indexes */
 	vac_close_indexes(vacrel->nindexes, vacrel->indrels, NoLock);
@@ -692,8 +757,15 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	/* Cleanup index statistics and index names */
 	for (int i = 0; i < vacrel->nindexes; i++)
 	{
-		if (vacrel->indstats[i])
+		/*
+			* If we are doing only the heap vacuum pass then we would not have
+			* allocated memory for the indstats.
+			*/
+		if (vacrel->indstats && vacrel->indstats[i])
+		{
+			Assert((params->options & VACOPT_HEAP_VACUUM) == 0);
 			pfree(vacrel->indstats[i]);
+		}
 
 		if (instrument)
 			pfree(indnames[i]);
@@ -747,6 +819,9 @@ lazy_scan_heap(LVRelState *vacrel, VacuumParams *params, bool aggressive)
 				next_fsm_block_to_vacuum;
 	Buffer		vmbuffer = InvalidBuffer;
 	bool		skipping_blocks;
+	bool		hot_prune_only = (params->options & VACOPT_HEAP_HOT_PRUNE);
+	int			max_items;
+	CBPageNo	min_idxvac_page = CB_INVALID_LOGICAL_PAGE;
 	const int	initprog_index[] = {
 		PROGRESS_VACUUM_PHASE,
 		PROGRESS_VACUUM_TOTAL_HEAP_BLKS,
@@ -759,22 +834,9 @@ lazy_scan_heap(LVRelState *vacrel, VacuumParams *params, bool aggressive)
 	next_unskippable_block = 0;
 	next_failsafe_block = 0;
 	next_fsm_block_to_vacuum = 0;
-	vacrel->rel_pages = nblocks;
-	vacrel->scanned_pages = 0;
-	vacrel->pinskipped_pages = 0;
-	vacrel->frozenskipped_pages = 0;
-	vacrel->tupcount_pages = 0;
-	vacrel->pages_removed = 0;
-	vacrel->lpdead_item_pages = 0;
-	vacrel->nonempty_pages = 0;
 
-	/* Initialize instrumentation counters */
-	vacrel->num_index_scans = 0;
-	vacrel->tuples_deleted = 0;
-	vacrel->lpdead_items = 0;
-	vacrel->new_dead_tuples = 0;
-	vacrel->num_tuples = 0;
-	vacrel->live_tuples = 0;
+	/* Initialize the vacrel */
+	lazy_init_vacrel(vacrel, nblocks);
 
 	vistest = GlobalVisTestFor(vacrel->rel);
 
@@ -788,18 +850,59 @@ lazy_scan_heap(LVRelState *vacrel, VacuumParams *params, bool aggressive)
 	 */
 	lazy_check_wraparound_failsafe(vacrel);
 
-	/*
-	 * Allocate the space for dead_items.  Note that this handles parallel
-	 * VACUUM initialization as part of allocating shared memory space used
-	 * for dead_items.
-	 */
-	dead_items_alloc(vacrel, params->nworkers);
-	dead_items = vacrel->dead_items;
+	/* Initialize deadtid state if doing only hot prune pass. */
+	if (hot_prune_only)
+	{
+		LVDeadOffsetCache	*dead_itemcache;
+
+		/*
+		* Process all the index and find out what is the lowest conveyor belt page
+		* upto which all the indexes for the given relation has been vacuumed.
+		*
+		* This will help us to identify that during the prune pass of the vacuum
+		* if some of the old item is already there in the conveyor belt and if
+		* index vacuum is also done for that then we can directly mark that item
+		* unused.
+		*/
+		min_idxvac_page = get_min_indexvac_page(vacrel);
+
+		/* Initialize deadtid state. */
+		vacrel->deadtidstate = DTS_InitDeadTidState(vacrel->rel,
+													min_idxvac_page);
+
+		/*
+		 * Allocate memory for deaditem cache.
+		 *
+		 * FIXME: eventually this should replace the vacrel->dead_item and this
+		 * allocation should go into dead_items_alloc and it its size should be
+		 * computed based on the maintenance_work_mem.  But for now this is
+		 * only used if we are doing only the prune pass of the vacuum so lets
+		 * not merge it.
+		 */
+		dead_itemcache =
+					(LVDeadOffsetCache *) palloc0(sizeof(LVDeadOffsetCache));
+		dead_itemcache->datasizes =
+					(int *) palloc(sizeof(int) * DEAD_ITEM_CACHE_NUM_PAGE);
+		dead_itemcache->data = (char *) palloc(DEAD_ITEM_CACHE_SIZE);
+		vacrel->dead_itemcache = dead_itemcache;
+		max_items = dead_items_max_items(vacrel);
+	}
+	else
+	{
+		/*
+		* Allocate the space for dead_items.  Note that this handles parallel
+		* VACUUM initialization as part of allocating shared memory space used
+		* for dead_items.
+		*/
+		dead_items_alloc(vacrel, params->nworkers);
+		dead_items = vacrel->dead_items;
+		max_items = dead_items->max_items;
+	}
 
 	/* Report that we're scanning the heap, advertising total # of blocks */
 	initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP;
 	initprog_val[1] = nblocks;
-	initprog_val[2] = dead_items->max_items;
+	initprog_val[2] = max_items;
 	pgstat_progress_update_multi_param(3, initprog_index, initprog_val);
 
 	/*
@@ -988,10 +1091,16 @@ lazy_scan_heap(LVRelState *vacrel, VacuumParams *params, bool aggressive)
 		 * Consider if we definitely have enough space to process TIDs on page
 		 * already.  If we are close to overrunning the available space for
 		 * dead_items TIDs, pause and do a cycle of vacuuming before we tackle
-		 * this page.
+		 * this page.  But if we are only performing the hot_prune pass then we
+		 * don't need to perform the vacuuming cycle, instead, the dead item
+		 * will be flushed to the conveyor belt if the cache is full but that
+		 * will be done inside lazy_scan_prune to avoid early flushing
+		 * half-filled buffer.
 		 */
-		Assert(dead_items->max_items >= MaxHeapTuplesPerPage);
-		if (dead_items->max_items - dead_items->num_items < MaxHeapTuplesPerPage)
+		Assert(hot_prune_only ||
+			   dead_items->max_items >= MaxHeapTuplesPerPage);
+		if (!hot_prune_only &&
+			dead_items->max_items - dead_items->num_items < MaxHeapTuplesPerPage)
 		{
 			/*
 			 * Before beginning index vacuuming, we release any pin we may
@@ -1007,6 +1116,7 @@ lazy_scan_heap(LVRelState *vacrel, VacuumParams *params, bool aggressive)
 
 			/* Perform a round of index and heap vacuuming */
 			vacrel->consider_bypass_optimization = false;
+
 			lazy_vacuum(vacrel);
 
 			/*
@@ -1198,7 +1308,8 @@ lazy_scan_heap(LVRelState *vacrel, VacuumParams *params, bool aggressive)
 		 * were pruned some time earlier.  Also considers freezing XIDs in the
 		 * tuple headers of remaining items with storage.
 		 */
-		lazy_scan_prune(vacrel, buf, blkno, page, vistest, &prunestate);
+		lazy_scan_prune(vacrel, buf, blkno, page, vistest, &prunestate,
+						hot_prune_only);
 
 		Assert(!prunestate.all_visible || !prunestate.has_lpdead_items);
 
@@ -1206,7 +1317,12 @@ lazy_scan_heap(LVRelState *vacrel, VacuumParams *params, bool aggressive)
 		if (prunestate.hastup)
 			vacrel->nonempty_pages = blkno + 1;
 
-		if (vacrel->nindexes == 0)
+		/*
+		 * XXX For now if we are only performing hot prune pass then we don't
+		 * perform the heap vacuum phase even there is no index on the table,
+		 * but we can improve this.
+		 */
+		if (!hot_prune_only && vacrel->nindexes == 0)
 		{
 			/*
 			 * Consider the need to do page-at-a-time heap vacuuming when
@@ -1417,8 +1533,42 @@ lazy_scan_heap(LVRelState *vacrel, VacuumParams *params, bool aggressive)
 		vmbuffer = InvalidBuffer;
 	}
 
-	/* Perform a final round of index and heap vacuuming */
-	if (dead_items->num_items > 0)
+	/*
+	 * If we are doing only the hot prune pass of the vacuum then we are done
+	 * now.  So flush the remaining item from the dead item cache to the
+	 * conveyor belt and cleanup the deadtid state and the dead_itemcache.
+	 * And, also update the last conveyor belt pageno upto which we have marked
+	 * the item unused.  Otherwise perform a round of index and heap vacuuming.
+	 */
+	if (hot_prune_only)
+	{
+		LVDeadOffsetCache *dead_itemcache = vacrel->dead_itemcache;
+
+		if (dead_itemcache->activepageno > 0 ||
+			dead_itemcache->writeoffset > 0)
+		{
+			dead_itemcache->datasizes[dead_itemcache->activepageno] =
+									dead_itemcache->writeoffset;
+			DTS_InsertDeadtids(vacrel->deadtidstate, dead_itemcache->data,
+							   dead_itemcache->datasizes,
+							   dead_itemcache->activepageno + 1);
+		}
+
+		pfree(dead_itemcache->datasizes);
+		pfree(dead_itemcache->data);
+		pfree(dead_itemcache);
+
+		DTS_ReleaseDeadTidState(vacrel->deadtidstate);
+
+		/*
+		 * If min_idxvac_page is not valid then we wouldn't have set any item
+		 * unused so no need to update the last vacuum page for the heap.
+		 */
+		if (min_idxvac_page != CB_INVALID_LOGICAL_PAGE)
+			UpdateRelationLastVaccumPage(RelationGetRelid(vacrel->rel),
+										min_idxvac_page);
+	}
+	else if (dead_items->num_items > 0)
 		lazy_vacuum(vacrel);
 
 	/*
@@ -1466,6 +1616,11 @@ lazy_scan_heap(LVRelState *vacrel, VacuumParams *params, bool aggressive)
  * that any items that make it into the dead_items array are simple LP_DEAD
  * line pointers, and that every remaining item with tuple storage is
  * considered as a candidate for freezing.
+ *
+ * Pass hot_prune_only to true if we are only performing the hot prune pass
+ * of the vacuum and not performing the index pass.  TODO: later once we unify
+ * the dead item format between single phase and multi phase vacuum then we can
+ * control this based on the dead item cache size.
  */
 static void
 lazy_scan_prune(LVRelState *vacrel,
@@ -1473,7 +1628,8 @@ lazy_scan_prune(LVRelState *vacrel,
 				BlockNumber blkno,
 				Page page,
 				GlobalVisState *vistest,
-				LVPagePruneState *prunestate)
+				LVPagePruneState *prunestate,
+				bool hot_prune_only)
 {
 	Relation	rel = vacrel->rel;
 	OffsetNumber offnum,
@@ -1488,7 +1644,9 @@ lazy_scan_prune(LVRelState *vacrel,
 				live_tuples;
 	int			nnewlpdead;
 	int			nfrozen;
+	int			nunused;
 	OffsetNumber deadoffsets[MaxHeapTuplesPerPage];
+	OffsetNumber unusedoffsets[MaxHeapTuplesPerPage];
 	xl_heap_freeze_tuple frozen[MaxHeapTuplesPerPage];
 
 	maxoff = PageGetMaxOffsetNumber(page);
@@ -1498,6 +1656,7 @@ retry:
 	/* Initialize (or reset) page-level counters */
 	tuples_deleted = 0;
 	lpdead_items = 0;
+	nunused = 0;
 	new_dead_tuples = 0;
 	num_tuples = 0;
 	live_tuples = 0;
@@ -1526,6 +1685,17 @@ retry:
 	prunestate->visibility_cutoff_xid = InvalidTransactionId;
 	nfrozen = 0;
 
+	/*
+	 * If we are performing only the hot prune pass of the vacuum then we are
+	 * going to flush the new dead items to the conveyor belt.  But we might
+	 * find some of the dead items already exist in the conveyor belt so we
+	 * don't want to add duplicate items.  So for checking that before starting
+	 * to scan the page, load the data from the conveyor for the block we are
+	 * going to prune.
+	 */
+	if (hot_prune_only)
+		DTS_LoadDeadtids(vacrel->deadtidstate, blkno);
+
 	for (offnum = FirstOffsetNumber;
 		 offnum <= maxoff;
 		 offnum = OffsetNumberNext(offnum))
@@ -1568,9 +1738,26 @@ retry:
 		 */
 		if (ItemIdIsDead(itemid))
 		{
-			deadoffsets[lpdead_items++] = offnum;
+			bool setunused = false;
+
+			/*
+			 * If we are performing only the first pass of the vacuum and item
+			 * already present in the conveyor belt then we don't need to add
+			 * that offset to the dead offsets array.  Additionally, if index
+			 * vacuum is already done for the offset then we can directly set
+			 * the item as unused so store it into the unused offsets array.
+			 */
+			if (!hot_prune_only || !DTS_DeadtidExists(vacrel->deadtidstate,
+													  blkno, offnum,
+													  &setunused))
+			{
+				deadoffsets[lpdead_items++] = offnum;
+				prunestate->has_lpdead_items = true;
+			}
+			else if (setunused)
+				unusedoffsets[nunused++] = offnum;
+
 			prunestate->all_visible = false;
-			prunestate->has_lpdead_items = true;
 			continue;
 		}
 
@@ -1768,6 +1955,52 @@ retry:
 									 frozen, nfrozen);
 			PageSetLSN(page, recptr);
 		}
+		END_CRIT_SECTION();
+	}
+
+	/*
+	 * If there are some items to be set unused then do so.
+	 *
+	 * XXX try to unify the code with lazy_vacuum_heap_page().
+	 */
+	if (nunused > 0)
+	{
+		START_CRIT_SECTION();
+
+		/* Mark collected items unused. */
+		for (int i = 0; i < nunused; i++)
+		{
+			itemid = PageGetItemId(page, unusedoffsets[i]);
+			ItemIdSetUnused(itemid);
+		}
+
+		/* Attempt to truncate line pointer array now */
+		PageTruncateLinePointerArray(page);
+
+		/*
+		* Mark buffer dirty before we write WAL.
+		*/
+		MarkBufferDirty(buf);
+
+		/* XLOG stuff */
+		if (RelationNeedsWAL(vacrel->rel))
+		{
+			xl_heap_vacuum xlrec;
+			XLogRecPtr	recptr;
+
+			xlrec.nunused = nunused;
+
+			XLogBeginInsert();
+			XLogRegisterData((char *) &xlrec, SizeOfHeapVacuum);
+
+			XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+			XLogRegisterBufData(0, (char *) unusedoffsets,
+								nunused * sizeof(OffsetNumber));
+
+			recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_VACUUM);
+
+			PageSetLSN(page, recptr);
+		}
 
 		END_CRIT_SECTION();
 	}
@@ -1814,25 +2047,44 @@ retry:
 	 */
 	if (lpdead_items > 0)
 	{
-		VacDeadItems *dead_items = vacrel->dead_items;
-		ItemPointerData tmp;
+		/*
+		 * If we are only doing the only the hot prune pass, then we don't need
+		 * to convert the dead offsets to the ItemPointerData right now.  So
+		 * flush the data directly into the dead item cache and if the dead item
+		 * cache is full then store_deaditems will internally free the cache by
+		 * flushing data to the conveyor belt.
+		 */
+		if (hot_prune_only)
+		{
+			store_deaditems(vacrel, blkno, lpdead_items, deadoffsets);
+			vacrel->dead_itemcache->num_items += lpdead_items;
+			pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
+										 vacrel->dead_itemcache->num_items);
 
-		Assert(!prunestate->all_visible);
-		Assert(prunestate->has_lpdead_items);
+		}
+		else
+		{
+			VacDeadItems *dead_items = vacrel->dead_items;
+			ItemPointerData tmp;
 
-		vacrel->lpdead_item_pages++;
+			Assert(!prunestate->all_visible);
+			Assert(prunestate->has_lpdead_items);
 
-		ItemPointerSetBlockNumber(&tmp, blkno);
+			vacrel->lpdead_item_pages++;
 
-		for (int i = 0; i < lpdead_items; i++)
-		{
-			ItemPointerSetOffsetNumber(&tmp, deadoffsets[i]);
-			dead_items->items[dead_items->num_items++] = tmp;
-		}
+			ItemPointerSetBlockNumber(&tmp, blkno);
+
+			for (int i = 0; i < lpdead_items; i++)
+			{
+				ItemPointerSetOffsetNumber(&tmp, deadoffsets[i]);
+				dead_items->items[dead_items->num_items++] = tmp;
+			}
+			Assert(dead_items->num_items <= dead_items->max_items);
 
-		Assert(dead_items->num_items <= dead_items->max_items);
-		pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
-									 dead_items->num_items);
+			pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
+										 vacrel->dead_items->num_items);
+
+		}
 	}
 
 	/* Finally, add page-local counts to whole-VACUUM counts */
@@ -1948,7 +2200,7 @@ lazy_vacuum(LVRelState *vacrel)
 		 * We successfully completed a round of index vacuuming.  Do related
 		 * heap vacuuming now.
 		 */
-		lazy_vacuum_heap_rel(vacrel);
+		lazy_vacuum_heap_rel(vacrel, false);
 	}
 	else
 	{
@@ -2079,7 +2331,7 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
  * index entry removal in batches as large as possible.
  */
 static void
-lazy_vacuum_heap_rel(LVRelState *vacrel)
+lazy_vacuum_heap_rel(LVRelState *vacrel, bool heap_vacuum_only)
 {
 	int			index;
 	BlockNumber vacuumed_pages;
@@ -2088,7 +2340,12 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 
 	Assert(vacrel->do_index_vacuuming);
 	Assert(vacrel->do_index_cleanup);
-	Assert(vacrel->num_index_scans > 0);
+
+	/*
+	 * If we are doing heap vacuum pass then we might not have done any index
+	 * scan in this pass.  So adjust the assert accordingly.
+	 */
+	Assert(heap_vacuum_only || vacrel->num_index_scans > 0);
 
 	/* Report that we are now vacuuming the heap */
 	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
@@ -2141,7 +2398,7 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 	 * the second heap pass.  No more, no less.
 	 */
 	Assert(index > 0);
-	Assert(vacrel->num_index_scans > 1 ||
+	Assert(heap_vacuum_only || vacrel->num_index_scans > 1 ||
 		   (index == vacrel->lpdead_items &&
 			vacuumed_pages == vacrel->lpdead_item_pages));
 
@@ -2485,6 +2742,250 @@ lazy_vacuum_one_index(Relation indrel, IndexBulkDeleteResult *istat,
 }
 
 /*
+ * UpdateRelationLastVaccumPage -- Update relvacuumpage in pg_class entry
+ *
+ *		Update relation's pg_class entry with the conveyor belt pageno
+ *		upto which we have done the vacuum.
+ */
+static void
+UpdateRelationLastVaccumPage(Oid relid, CBPageNo pageno)
+{
+	Relation		pgclass;
+	HeapTuple		reltup;
+
+	pgclass = table_open(RelationRelationId, RowExclusiveLock);
+
+	reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
+	if (!HeapTupleIsValid(reltup))
+		elog(ERROR, "cache lookup failed for relation %u", relid);
+
+	/* Update pg_class tuple as appropriate. */
+	((Form_pg_class) GETSTRUCT(reltup))->relvacuumpage = pageno;
+
+	CatalogTupleUpdate(pgclass, &reltup->t_self, reltup);
+
+	/* Clean up */
+	heap_freetuple(reltup);
+	table_close(pgclass, RowExclusiveLock);
+}
+
+/*
+ *	lazy_vacuum_index() -- vacuum index relation.
+ *
+ *		This function is used if we are performing just index vacuum pass.
+ *		It will read data from the conveyor belt, convert them to the
+ *		ItemPointerData array and perform the index vacuuming.
+ */
+void
+lazy_vacuum_index(Relation heaprel, Relation indrel,
+				  BufferAccessStrategy strategy)
+{
+	VacDeadItems   *dead_items;
+	CBPageNo		start_pageno,
+					next_runpage = CB_INVALID_LOGICAL_PAGE,
+					last_pageread = CB_INVALID_LOGICAL_PAGE;
+	LVRelState		vacrel = {0};
+	DTS_DeadTidState	   *dts;
+	IndexBulkDeleteResult  *istat = NULL;
+	ErrorContextCallback errcallback;
+
+	/* Initialize vacuum rel state. */
+	lazy_init_vacrel(&vacrel, InvalidBlockNumber);
+	vacrel.bstrategy = strategy;
+
+	/*
+	 * We are vacuuming just one index.  Although there is no harm in setting
+	 * the actual number of index but that might be confusing for the parallel
+	 * vacuum system.
+	 *
+	 * FIXME we should provide a mechanism that even if we are only doing
+	 * index vacuum then there should be option of giving multiple indexes or
+	 * all the indexes of the table.
+	 */
+	vacrel.nindexes = 1;
+
+	vacrel.indstats = (IndexBulkDeleteResult **)
+		palloc0(vacrel.nindexes * sizeof(IndexBulkDeleteResult *));
+
+	/* Setup error callback. */
+	errcallback.callback = vacuum_error_callback;
+	errcallback.arg = &vacrel;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	dead_items_alloc(&vacrel, -1);
+	dead_items = vacrel.dead_items;
+
+	/* Report that we are now vacuuming indexes */
+	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
+								 PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
+
+	/* Get the next cbpage from where we want to start vacuum. */
+	start_pageno = indrel->rd_rel->relvacuumpage;
+	if (start_pageno == CB_INVALID_LOGICAL_PAGE)
+		start_pageno = 0;
+
+	/* Initialize deadtid state. */
+	dts = DTS_InitDeadTidState(heaprel, CB_INVALID_LOGICAL_PAGE);
+
+	/*
+	 * Loop until we complete the index vacuum.  Read tuples which can fit into
+	 * the dead_items area and perform the index vacuum.  Repeat this until
+	 * there is no TID in the dead tuple store.
+	 */
+	while(1)
+	{
+		/* Read dead tids from the dead tuple store. */
+		dead_items->num_items = DTS_ReadDeadtids(dts, start_pageno,
+												 CB_INVALID_LOGICAL_PAGE,
+												 dead_items->max_items,
+												 dead_items->items,
+												 &last_pageread,
+												 &next_runpage);
+
+		/* No more dead tuples so we are done. */
+		if (dead_items->num_items == 0)
+			break;
+
+		/* Do index vacuuming. */
+		istat = lazy_vacuum_one_index(indrel, istat,
+									  heaprel->rd_rel->reltuples, &vacrel);
+
+		/* Go to the next logical conveyor belt page. */
+		start_pageno = last_pageread + 1;
+	}
+
+	/*
+	 * Remember the conveyor belt pageno into the pg_class entry so that next
+	 * time we can start vacuum from this point.
+	 */
+	if (last_pageread != CB_INVALID_LOGICAL_PAGE)
+		UpdateRelationLastVaccumPage(RelationGetRelid(indrel),
+									 last_pageread + 1);
+
+	/* Update vacuum progress. */
+	pgstat_progress_update_param(PROGRESS_VACUUM_NUM_INDEX_VACUUMS,
+								 vacrel.num_index_scans);
+
+	/* Update index statistics. */
+	if (istat && istat->estimated_count)
+		vac_update_relstats(indrel,
+							istat->num_pages,
+							istat->num_index_tuples,
+							0,
+							false,
+							InvalidTransactionId,
+							InvalidMultiXactId,
+							false);
+
+	/* Cleanup */
+	dead_items_cleanup(&vacrel);
+	if (istat)
+		pfree(istat);
+	pfree(dts);
+}
+
+/*
+ *	lazy_vacuum_heap() -- perform vacuum pass for one heap relation.
+ *
+ * 		This function is only used if we are performing only the vacuum pass.
+ *		We will read data from the conveyor belt and perform the vacuum pass
+ *		of the heap relation.
+ */
+static void
+lazy_vacuum_heap(LVRelState *vacrel, VacuumParams *params)
+{
+	CBPageNo	from_pageno;
+	CBPageNo	to_pageno = CB_INVALID_LOGICAL_PAGE;
+	CBPageNo	next_runpage = CB_INVALID_LOGICAL_PAGE;
+	CBPageNo	last_pageread;
+	VacDeadItems	   *dead_items;
+	DTS_DeadTidState   *dts;
+
+	/*
+	 * Initialize vacuum rel state.  We don't want to put the upper limit based
+	 * on max possible tuple in the relation, because we are directly
+	 * getting tuple from the dead tuple storage, so pass the rel_pages as
+	 * invalid block number.  And once we have allocated the space for the
+	 * dead items then set the proper values for the rel_pages.
+	 */
+	lazy_init_vacrel(vacrel, InvalidBlockNumber);
+
+	/*
+	 * XXX Currently, we have to allocate the dead item array and while
+	 * reading from the conveyor belt we need to convert it to dead item array
+	 * because lazy_vacuum_heap_rel expect dead item array.  But once we unify
+	 * the in memory format for dead items then we can avoid this conversion
+	 * and directly perform heap vacuum page by page.
+	 */
+	dead_items_alloc(vacrel, -1);
+	dead_items = vacrel->dead_items;
+	vacrel->rel_pages = RelationGetNumberOfBlocks(vacrel->rel);
+
+	/*
+	 * In order to identify the conveyor belt pageno upto which we have to do
+	 * the second heap pass, process all the index and find out what is the
+	 * lowest conveyor belt pageno upto which all the indexes for the relation
+	 * has been vacuumed.
+	 */
+	to_pageno = get_min_indexvac_page(vacrel);
+
+	/* Get the next cbpage from where we want to start vacuum. */
+	from_pageno = vacrel->rel->rd_rel->relvacuumpage;
+
+	/* If no vacuum is done for the heap yet then start from the page 0. */
+	if (from_pageno == CB_INVALID_LOGICAL_PAGE)
+		from_pageno = 0;
+
+	/* We have already done enough vacuum so nothing to be done. */
+	if (from_pageno == to_pageno || to_pageno == CB_INVALID_LOGICAL_PAGE)
+	{
+		dead_items_cleanup(vacrel);
+		return;
+	}
+
+	/* Initialize deadtid state. */
+	dts = DTS_InitDeadTidState(vacrel->rel, CB_INVALID_LOGICAL_PAGE);
+
+	/* Loop until we complete the heap vacuum. */
+	while(from_pageno < to_pageno)
+	{
+		/* Read dead tids from the conveyor belt. */
+		dead_items->num_items = DTS_ReadDeadtids(dts, from_pageno,
+												 to_pageno,
+												 dead_items->max_items,
+												 dead_items->items,
+												 &last_pageread,
+												 &next_runpage);
+
+		/* No more dead tids, we are done. */
+		if (dead_items->num_items == 0)
+			break;
+
+		/* Perform heap vacuum. */
+		lazy_vacuum_heap_rel(vacrel, true);
+
+		/* Go to the next logical page. */
+		from_pageno = last_pageread + 1;
+	}
+
+	/*
+	 * Remember the conveyor belt pageno into the pg_class entry so that next
+	 * time we can start vacuum from this point.
+	 */
+	if (last_pageread != CB_INVALID_LOGICAL_PAGE)
+		UpdateRelationLastVaccumPage(RelationGetRelid(vacrel->rel),
+									 last_pageread + 1);
+
+	/* Vacuum the dead tid store. */
+	DTS_Vacuum(dts, to_pageno);
+
+	/* Cleanup */
+	dead_items_cleanup(vacrel);
+	pfree(dts);
+}
+
+/*
  *	lazy_cleanup_one_index() -- do post-vacuum cleanup for index relation.
  *
  *		Calls index AM's amvacuumcleanup routine.  reltuples is the number
@@ -2861,7 +3362,8 @@ dead_items_max_items(LVRelState *vacrel)
 		max_items = Min(max_items, MAXDEADITEMS(MaxAllocSize));
 
 		/* curious coding here to ensure the multiplication can't overflow */
-		if ((BlockNumber) (max_items / MaxHeapTuplesPerPage) > rel_pages)
+		if (BlockNumberIsValid(rel_pages) &&
+			(BlockNumber) (max_items / MaxHeapTuplesPerPage) > rel_pages)
 			max_items = rel_pages * MaxHeapTuplesPerPage;
 
 		/* stay sane if small maintenance_work_mem */
@@ -3076,6 +3578,170 @@ heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
 }
 
 /*
+ * lazy_init_vacrel - initialize lazy vacuum rel state structure
+ */
+static void
+lazy_init_vacrel(LVRelState *vacrel, BlockNumber nblocks)
+{
+	vacrel->rel_pages = nblocks;
+	vacrel->scanned_pages = 0;
+	vacrel->pinskipped_pages = 0;
+	vacrel->frozenskipped_pages = 0;
+	vacrel->tupcount_pages = 0;
+	vacrel->pages_removed = 0;
+	vacrel->lpdead_item_pages = 0;
+	vacrel->nonempty_pages = 0;
+
+	/* Initialize instrumentation counters */
+	vacrel->num_index_scans = 0;
+	vacrel->tuples_deleted = 0;
+	vacrel->lpdead_items = 0;
+	vacrel->new_dead_tuples = 0;
+	vacrel->num_tuples = 0;
+	vacrel->live_tuples = 0;
+}
+
+/*
+ * store_deaditems - copy dead items to deaditem cache.
+ *
+ * Copy dead items to the cache, if cache doesn't have enough space then first
+ * flush the buffer to the conveyor belt and create the space and then copy
+ * offsets.
+ */
+static void
+store_deaditems(LVRelState *vacrel, BlockNumber blkno, int noffsets,
+				OffsetNumber *offsets)
+{
+	int		spaceleft;
+	int		copyoffsets = 0;
+	bool	switchpage = false;
+	LVDeadOffsetCache *dead_itemcache = vacrel->dead_itemcache;
+	int	   *datasizes = dead_itemcache->datasizes;
+
+	/*
+	 * If we have sufficient space in the cache to hold all the data then copy
+	 * the data directly into the cache and return.
+	 */
+	spaceleft = DTS_PageMaxDataSpace - dead_itemcache->writeoffset;
+	if (spaceleft >= DTS_BlkDataSize(noffsets))
+	{
+		copy_deaditems_to_cache(dead_itemcache, blkno, noffsets, offsets);
+		return;
+	}
+
+	/*
+	 * If there is no sufficient space even for holding the block header and at
+	 * least one offset for this block then there is no point in writing to
+	 * current cache page so switch to the next page.
+	 */
+	if (spaceleft < DTS_MinCopySize)
+		switchpage = true;
+
+	/* Loop until we copy all the offset. */
+	while (noffsets > 0)
+	{
+		/*
+		 * If switchpage is set then set the total data size for the current
+		 * cache page and switch to the next page.  But if this is the last
+		 * page of the cache then first flush the data to the conveyor belt
+		 * and reset the whole cache.
+		 */
+		if (switchpage)
+		{
+			datasizes[dead_itemcache->activepageno] =
+												dead_itemcache->writeoffset;
+
+			/*
+			 * Flush to the conveyor belt if this is the last page in the
+			 * cache.
+			 */
+			if ((dead_itemcache->activepageno + 1) == DEAD_ITEM_CACHE_NUM_PAGE)
+			{
+				DTS_InsertDeadtids(vacrel->deadtidstate,
+								   dead_itemcache->data,
+								   datasizes,
+								   dead_itemcache->activepageno + 1);
+				dead_itemcache->activepageno = 0;
+			}
+			else
+				dead_itemcache->activepageno++;
+
+			dead_itemcache->writeoffset = 0;
+			spaceleft = DTS_PageMaxDataSpace;
+			switchpage = false;
+		}
+
+		/*
+		 * Compute how many offsets to be copied in the current cache page and
+		 * copy those many offset to the cache and switch to the next page.
+		 */
+		copyoffsets =
+			(spaceleft - sizeof(DTS_BlockHeader)) / sizeof(OffsetNumber);
+		copyoffsets = Min(copyoffsets, noffsets);
+		copy_deaditems_to_cache(dead_itemcache, blkno, copyoffsets, offsets);
+		noffsets -= copyoffsets;
+		offsets += copyoffsets;
+		switchpage = true;
+	}
+}
+
+/*
+ * copy_deaditems_to_buffer - Helper function for store_deaditems.
+ */
+static void
+copy_deaditems_to_cache(LVDeadOffsetCache *dead_itemcache, BlockNumber blkno,
+						int noffsets, OffsetNumber *offsets)
+{
+	DTS_BlockHeader		blkhdr;
+	char			   *data;
+
+	/* Prepare the block header. */
+	BlockIdSet(&blkhdr.blkid, blkno);
+	blkhdr.noffsets = noffsets;
+
+	/* Compute the starting point of the cache for copying the data. */
+	data = dead_itemcache->data +
+			dead_itemcache->activepageno * DTS_PageMaxDataSpace +
+			dead_itemcache->writeoffset;
+
+	/* Copy block header. */
+	memcpy(data, &blkhdr, sizeof(DTS_BlockHeader));
+	data += sizeof(DTS_BlockHeader);
+
+	/* Copy block offsets. */
+	memcpy(data, offsets, sizeof(OffsetNumber) * noffsets);
+
+	/* Update write pointer for the active page in the cache. */
+	dead_itemcache->writeoffset += DTS_BlkDataSize(noffsets);
+}
+
+/*
+ * copy_deaditems_to_buffer - Get the min conveyor belt pageno upto which all
+ *							  indexes for the relation has done the vacuum.
+ */
+static CBPageNo
+get_min_indexvac_page(LVRelState *vacrel)
+{
+	CBPageNo	min_indexvac_page = CB_INVALID_LOGICAL_PAGE;
+	int			i;
+
+	for (i = 0; i < vacrel->nindexes; i++)
+	{
+		if (vacrel->indrels[i]->rd_rel->relvacuumpage ==
+			CB_INVALID_LOGICAL_PAGE)
+		{
+			min_indexvac_page = CB_INVALID_LOGICAL_PAGE;
+			break;
+		}
+		else if (min_indexvac_page == CB_INVALID_LOGICAL_PAGE ||
+			vacrel->indrels[i]->rd_rel->relvacuumpage < min_indexvac_page)
+			min_indexvac_page = vacrel->indrels[i]->rd_rel->relvacuumpage;
+	}
+
+	return min_indexvac_page;
+}
+
+/*
  * Update index statistics in pg_class if the statistics are accurate.
  */
 static void
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 7e99de8..01e75d8 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -935,6 +935,7 @@ InsertPgClassTuple(Relation pg_class_desc,
 	values[Anum_pg_class_relrewrite - 1] = ObjectIdGetDatum(rd_rel->relrewrite);
 	values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
 	values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid);
+	values[Anum_pg_class_relvacuumpage - 1] = CB_INVALID_LOGICAL_PAGE;
 	if (relacl != (Datum) 0)
 		values[Anum_pg_class_relacl - 1] = relacl;
 	else
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index d1dadc5..b8f9f16 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -114,6 +114,8 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
 	bool		full = false;
 	bool		disable_page_skipping = false;
 	bool		process_toast = true;
+	bool		heap_hot_prune = false;
+	bool		heap_vacuum = false;
 	ListCell   *lc;
 
 	/* index_cleanup and truncate values unspecified for now */
@@ -200,6 +202,10 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
 					params.nworkers = nworkers;
 			}
 		}
+		else if (strcmp(opt->defname, "heap_hot_prune") == 0)
+			heap_hot_prune = defGetBoolean(opt);
+		else if (strcmp(opt->defname, "heap_vacuum") == 0)
+			heap_vacuum = defGetBoolean(opt);
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -216,7 +222,9 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
 		(freeze ? VACOPT_FREEZE : 0) |
 		(full ? VACOPT_FULL : 0) |
 		(disable_page_skipping ? VACOPT_DISABLE_PAGE_SKIPPING : 0) |
-		(process_toast ? VACOPT_PROCESS_TOAST : 0);
+		(process_toast ? VACOPT_PROCESS_TOAST : 0) |
+		(heap_hot_prune ? VACOPT_HEAP_HOT_PRUNE : 0) |
+		(heap_vacuum ? VACOPT_HEAP_VACUUM : 0);
 
 	/* sanity checks on options */
 	Assert(params.options & (VACOPT_VACUUM | VACOPT_ANALYZE));
@@ -344,6 +352,22 @@ vacuum(List *relations, VacuumParams *params,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("VACUUM option DISABLE_PAGE_SKIPPING cannot be used with FULL")));
 
+	/*
+	 * Disable skipping is only used while scanning the relation during hot
+	 * pruning pass but if we are just doing the vacuum pass then this
+	 * parameter makes no sense.
+	 */
+	if ((params->options & VACOPT_HEAP_VACUUM) != 0 &&
+		(params->options & VACOPT_DISABLE_PAGE_SKIPPING) != 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("VACUUM option DISABLE_PAGE_SKIPPING cannot be used with HEAP_VACUUM")));
+
+	/*
+	 * TODO: Now we have some new options so check what all combinations needs
+	 * to throw error.
+	 */
+
 	/* sanity check for PROCESS_TOAST */
 	if ((params->options & VACOPT_FULL) != 0 &&
 		(params->options & VACOPT_PROCESS_TOAST) == 0)
@@ -1901,6 +1925,28 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
 	}
 
 	/*
+	 * If the relation type is index then perform the index vacuum and return
+	 * false because for the index vacuum we don't need to do ANALYZE.
+	 *
+	 * XXX maybe we could add a check and report error for the input
+	 * params->options which are not valid for the index vacuum.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_INDEX)
+	{
+		Relation	heapRel;
+
+		heapRel = table_open(rel->rd_index->indrelid, lmode);
+
+		lazy_vacuum_index(heapRel, rel, vac_strategy);
+
+		relation_close(rel, lmode);
+		relation_close(heapRel, lmode);
+		PopActiveSnapshot();
+		CommitTransactionCommand();
+		return false;
+	}
+
+	/*
 	 * Check that it's of a vacuumable relkind.
 	 */
 	if (rel->rd_rel->relkind != RELKIND_RELATION &&
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 2e760e8..853a4f3 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -3781,6 +3781,7 @@ RelationSetNewRelfilenode(Relation relation, char persistence)
 			classform->relpages = 0;	/* it's empty until further notice */
 			classform->reltuples = -1;
 			classform->relallvisible = 0;
+			classform->relvacuumpage = -1;
 		}
 		classform->relfrozenxid = freezeXid;
 		classform->relminmxid = minmulti;
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index 134b20f..d85dad8 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -14,6 +14,7 @@
 #ifndef GENAM_H
 #define GENAM_H
 
+#include "access/conveyor.h"
 #include "access/sdir.h"
 #include "access/skey.h"
 #include "nodes/tidbitmap.h"
@@ -180,6 +181,8 @@ extern IndexBulkDeleteResult *index_bulk_delete(IndexVacuumInfo *info,
 												IndexBulkDeleteResult *istat,
 												IndexBulkDeleteCallback callback,
 												void *callback_state);
+extern void lazy_vacuum_index(Relation heaprel, Relation indrel,
+										   BufferAccessStrategy strategy);
 extern IndexBulkDeleteResult *index_vacuum_cleanup(IndexVacuumInfo *info,
 												   IndexBulkDeleteResult *istat);
 extern bool index_can_return(Relation indexRelation, int attno);
diff --git a/src/include/catalog/pg_class.h b/src/include/catalog/pg_class.h
index 304e8c1..ca51c16 100644
--- a/src/include/catalog/pg_class.h
+++ b/src/include/catalog/pg_class.h
@@ -128,6 +128,9 @@ CATALOG(pg_class,1259,RelationRelationId) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83,Relat
 	/* all multixacts in this rel are >= this; it is really a MultiXactId */
 	TransactionId relminmxid BKI_DEFAULT(1);	/* FirstMultiXactId */
 
+	/* conveyor belt pageno upto which we have already vacuumed. */
+	int64		relvacuumpage BKI_DEFAULT(-1);
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* NOTE: These fields are not present in a relcache entry's rd_rel field. */
 	/* access permissions */
@@ -143,7 +146,7 @@ CATALOG(pg_class,1259,RelationRelationId) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83,Relat
 
 /* Size of fixed part of pg_class tuples, not counting var-length fields */
 #define CLASS_TUPLE_SIZE \
-	 (offsetof(FormData_pg_class,relminmxid) + sizeof(TransactionId))
+	 (offsetof(FormData_pg_class,relvacuumpage) + sizeof(int64))
 
 /* ----------------
  *		Form_pg_class corresponds to a pointer to a tuple with
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 5d0bdfa..d0e1863 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -189,6 +189,9 @@ typedef struct VacAttrStats
 #define VACOPT_PROCESS_TOAST 0x40	/* process the TOAST table, if any */
 #define VACOPT_DISABLE_PAGE_SKIPPING 0x80	/* don't skip any pages */
 
+#define VACOPT_HEAP_HOT_PRUNE 0x100	/* first heap pass */
+#define VACOPT_HEAP_VACUUM 0x200	/* second heap pass */
+
 /*
  * Values used by index_cleanup and truncate params.
  *
-- 
1.8.3.1

From 3e611fd3087e75a6b147c7a2d1dd2feff88e15e7 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Wed, 15 Dec 2021 14:58:17 +0530
Subject: [PATCH v1 2/3] Deadtidstore over the conveyor belt

This provides a mechanism for storing and retrieving dead tids into the
conveyor belt.  Basically, this is an interface using which a vacuum
module can interact with the conveyor belt.  This maintains a DeadTIDState
which keeps track of the current insertion progress i.e the first and the
last conveyor belt page for the current vacuum run.  And on the completion
of the vacuum run, this takes care of setting the complete vacuum run
bound by storing the last conveyor belt pageno of the current vacuum run
into the special space of the first conveyor belt page for this run.

This also provides the infrastructure to avoid adding duplicate tids into
the conveyor belt.  Basically, if we perform the first vacuum pass multiple
times without executing the second vacuum pass then it is possible that we
encounter the same dead tids in the conveyor belt so this module maintains
a cache over the conveyor belt such that it only loads the data into the
cache w.r.t the current block the vacuum is processing so we don't need to
maintain a huge cache.
---
 src/backend/access/heap/Makefile       |    1 +
 src/backend/access/heap/deadtidstore.c | 1165 ++++++++++++++++++++++++++++++++
 src/include/access/deadtidstore.h      |   76 +++
 3 files changed, 1242 insertions(+)
 create mode 100644 src/backend/access/heap/deadtidstore.c
 create mode 100644 src/include/access/deadtidstore.h

diff --git a/src/backend/access/heap/Makefile b/src/backend/access/heap/Makefile
index af0bd18..7ef8ce2 100644
--- a/src/backend/access/heap/Makefile
+++ b/src/backend/access/heap/Makefile
@@ -13,6 +13,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = \
+	deadtidstore.o \
 	heapam.o \
 	heapam_handler.o \
 	heapam_visibility.o \
diff --git a/src/backend/access/heap/deadtidstore.c b/src/backend/access/heap/deadtidstore.c
new file mode 100644
index 0000000..b6e4440
--- /dev/null
+++ b/src/backend/access/heap/deadtidstore.c
@@ -0,0 +1,1165 @@
+/*-------------------------------------------------------------------------
+ *
+ * deadtidstore.c
+ *	  Wrapper over conveyor belt to store and fetch deadtids.
+ *
+ * Create dead tid fork and manage storing and retriving deadtids using
+ * conveyor belt infrastructure.
+ *
+ * src/backend/access/heap/deadtidstore.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/conveyor.h"
+#include "access/deadtidstore.h"
+#include "access/htup_details.h"
+#include "miscadmin.h"
+#include "storage/shmem.h"
+#include "storage/smgr.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+
+/*
+ * Whenever we perform the first pass of the vacuum we flush the dead items to
+ * the conveyor belt multiple times, but the complete pass over the heap is
+ * considered as a single vacuum run in the conveyor belt and all the tids of a
+ * single run is in sorted order because during vacuum we scan and prune the
+ * heap in the block order.
+ *
+ * So while performing the first vacuum pass it is also possible that we
+ * encounter the same deadtid again which we already inserted into the conveyor
+ * belt in one of the previous vacuum run.  But there is no way to identify
+ * whether that is already there in the conveyor or not except for checking it
+ * in the conveyor belt itself.  However the whole conveyor belt data might
+ * not fit into the maintence_work_mem and that makes it difficult to search
+ * individual item in the conveyor belt.
+ *
+ * But since we know the fact that the individual vacuum runs in the conveyor
+ * belt are sorted so we can perform a merge over each run in order to generate
+ * a sorted data.  For that we maintain a cache over each run and pull a couple
+ * of pages in the cache from each run and then merge them into a block-level
+ * cache.  The block-level cache is a very small cache that only holds the
+ * offsets for the current block which we are going to prune in the vacuum.
+ *
+ * This data structure maintains the current state of the cache over the
+ * previous vacuum runs.
+ */
+typedef struct DTS_RunState
+{
+	int		num_runs;
+
+	/*
+	 * Current position of each run w.r.t the conveyor belt.
+	 *
+	 * startpage keep accound for the first conveyor belt pageno and the
+	 * nextpage keep account for the next page to be fetched from the conveyor
+	 * belt for each run.
+	 */
+	CBPageNo   *startpage;
+	CBPageNo   *nextpage;
+
+	/*
+	 * Cache information over each run.
+	 *
+	 * runoffset maintain the current offset into the cache over each run.
+	 * We do not maintain the exact offset in this instead we keep the offset
+	 * of the next block header.  And for fetching the offsets within a block
+	 * header we use a local index.  Total size of the cache is stored in
+	 * cachesize and the cache is the actual cache holds data from each run.
+	 * This cache is equally divided among all runs.  However this could be
+	 * ptimized such that instead of equally dividing we can maintain some
+	 * offset for each run inside this cache so we don't consume
+	 * equal space if some run are really small and don't need that much space.
+	 */
+	int	   *runoffset;
+	Size	cachesize;
+	char   *cache;
+} DTS_RunState;
+
+/*
+ * This maintains the state over the conveyor belt for a single vacuum pass.
+ * This tracks the start and last page of the conveyor belt for the current run
+ * and at the end of the current run, the last page will be updated in the
+ * special space of the first page of the run so that we knows the boundary for
+ * each vacuum run in the conveyor belt.
+ */
+struct DTS_DeadTidState
+{
+	/* Conveyor belt reference. */
+	ConveyorBelt   *cb;
+
+	/*
+	 * Current vacuum tracking
+	 *
+	 * During prune vacuum pass we can flush the dead tids to the conveyor belt
+	 * in multiple small check.  So the start_page remember what is the first
+	 * conveyor belt pageno where we started flushing and the last_page keep
+	 * track of the last conveyor belt into which we flushed.  At the end of
+	 * the vacuum pass we will set the last_page into the special space of the
+	 * first page for marking the vacuum run boundary.  Minimum conveyor belt
+	 * pageno upto which vacuum is done for all indexes is stored into
+	 * min_idxvac_page, so that during prune phase if we identify that some
+	 * items are already there in the conveyor belt and the index vacuum is
+	 * also done the we mark that item unused as well.
+	 */
+	CBPageNo		start_page;
+	CBPageNo		last_page;
+	CBPageNo		min_idxvac_page;
+
+	/*
+	 * Members for maintaning cache for duplicate check.
+	 *
+	 * completed is mark true if there is no more data to be fetched from the
+	 * deadtidrun cache, and num_offsets is the number dead offset loaded
+	 * into the offsets array.  We don't need to maintain the block number
+	 * because in this cache at a time we holds offsets only for a single
+	 * block.
+	 *
+	 * cansetunused keep a flag w.r.t each offset in the offsets array that a
+	 * particular offset can be set unused or not.  Basically we know the last
+	 * logical conveyor belt pageno for each vacuum run and while merging the
+	 * runs we also know that the particular offset is coming from which run.
+	 * So that time if the last page of the run is smaller than then minimum
+	 * index vacuum page then we mark this true, false otherwise.
+	 */
+	bool			completed;
+	int				num_offsets;
+	OffsetNumber   *offsets;
+	bool		   *cansetunused;
+
+	/* Dead tid run cache state. */
+	DTS_RunState   *deadtidrun;
+};
+
+/*
+ * DTS_GetRunBlkHdr - Get next block header in the given run.
+ *
+ * DTS_RunState's runoffset point to the current block header for each run in
+ * the runcache.  The data in the conveyor belt are always stored in form of
+ * [DTS_BlockHeader][Offset array], and this runoffset will always move over
+ * the block header, it will never point to any intermediate data.
+ */
+#define DTS_GetRunBlkHdr(runstate, run, sizeperrun) \
+	(DTS_BlockHeader *)(((char *)((runstate)->cache)) + \
+						((run) * (sizeperrun) + (runstate)->runoffset[(run)]))
+
+/* Initial number of vacuum runs in the conveyor belt. */
+#define DTS_NUM_VACUUM_RUN		100
+
+/* non-export function prototypes */
+static void dts_page_init(Page page);
+static int32 dts_offsetcmp(const void *a, const void *b);
+static void dts_load_run(DTS_DeadTidState *deadtidstate, int run);
+static void dts_merge_runs(DTS_DeadTidState *deadtidstate, BlockNumber blkno);
+static void dts_process_runs(DTS_DeadTidState *deadtidstate);
+static int dts_read_pagedata(Page page, ItemPointerData *deadtids,
+							 int maxtids);
+static void AssertCheckPageData(Page page);
+
+/*
+ * DTS_InitDeadTidState - intialize the deadtid state
+ *
+ * This create new deadtid fork for the given relation and initialize the
+ * conveyor belt.  The deadtid state will maintain the current state of
+ * insertion/read from the conveyor belt.
+ */
+DTS_DeadTidState *
+DTS_InitDeadTidState(Relation rel, CBPageNo min_idxvac_page)
+{
+	CBPageNo	oldest_pageno;
+	CBPageNo	next_pageno;
+	SMgrRelation		reln;
+	ConveyorBelt	   *cb;
+	DTS_DeadTidState   *deadtidstate;
+
+	/* Open the relation at smgr level. */
+	reln = RelationGetSmgr(rel);
+
+	/* Allocate memory for deadtidstate. */
+	deadtidstate = palloc0(sizeof(DTS_DeadTidState));
+
+	/*
+	 * If the DEADTID_FORKNUM doesn't exist then create it and initialize the
+	 * conveyor belt, otherwise just open the conveyor belt.
+	 */
+	if (!smgrexists(reln, DEADTID_FORKNUM))
+	{
+		smgrcreate(reln, DEADTID_FORKNUM, false);
+		cb = ConveyorBeltInitialize(rel,
+									DEADTID_FORKNUM,
+									1024,	/* XXX What is the best value ?*/
+									CurrentMemoryContext);
+	}
+	else
+		cb = ConveyorBeltOpen(rel, DEADTID_FORKNUM, CurrentMemoryContext);
+
+	/* Initialize the dead tid state. */
+	deadtidstate->cb = cb;
+	deadtidstate->start_page = CB_INVALID_LOGICAL_PAGE;
+	deadtidstate->last_page = CB_INVALID_LOGICAL_PAGE;
+	deadtidstate->num_offsets = -1;
+	deadtidstate->completed = false;
+
+	ConveyorBeltGetBounds(cb, &oldest_pageno, &next_pageno);
+
+	if (min_idxvac_page != CB_INVALID_LOGICAL_PAGE &&
+		min_idxvac_page > oldest_pageno)
+		deadtidstate->min_idxvac_page = min_idxvac_page;
+	else
+		deadtidstate->min_idxvac_page = CB_INVALID_LOGICAL_PAGE;
+
+	return deadtidstate;
+}
+
+/*
+ * DTS_InsertDeadtids() -- Insert given data into the conveyor belt.
+ *
+ * 'data' is data to be inserted into the conveyor belt and this data is
+ * in multiple of conveyor belt pages. The 'npages' tell us actually how many
+ * pages to be inserted and 'datasizes' is bytes to be inserted into each
+ * page.  We should know exactly how many valid bytes are copied into each
+ * conveyor belt page so that we can set proper value for the pd_lower.  We
+ * to do that because in dts_load_run we copy data from multiple pages into
+ * cache and we don't want there to be any gap between the data and for that
+ * we need to know the exact size of valid data.
+ */
+void
+DTS_InsertDeadtids(DTS_DeadTidState *deadtidstate, char *data, int *datasizes,
+				   int npages)
+{
+	ConveyorBelt   *cb = deadtidstate->cb;
+	CBPageNo		pageno;
+	int				nwritten = 0;
+
+	/* Loop until we write all the pages in to the conveyor belt. */
+	while(nwritten < npages)
+	{
+		Buffer	buffer;
+		Page	page;
+		PageHeader	phdr;
+		char   *pagedata;
+		int		copysz = 0;
+
+		/* Get a new page from the conveyor belt. */
+		buffer = ConveyorBeltGetNewPage(cb, &pageno);
+		page = BufferGetPage(buffer);
+		dts_page_init(page);
+
+		phdr = (PageHeader) page;
+		pagedata = (char *) page + phdr->pd_lower;
+
+		START_CRIT_SECTION();
+
+		copysz = datasizes[nwritten];
+		if (copysz > DTS_PageMaxDataSpace)
+			copysz = DTS_PageMaxDataSpace;
+
+		/* Write data into the current page and update pd_lower. */
+		memcpy(pagedata, data, copysz);
+		phdr->pd_lower += copysz;
+
+		Assert(phdr->pd_lower <= phdr->pd_upper);
+
+		/*
+		 * Add new page to the conveyor belt this will be WAL logged
+		 * internally.
+		 */
+		ConveyorBeltPerformInsert(cb, buffer);
+
+		END_CRIT_SECTION();
+
+		AssertCheckPageData(page);
+
+		/*
+		 * Update the data pointer and the remaining size based on how much
+		 * data we have copied to this page.
+		 */
+		data += DTS_PageMaxDataSpace;
+		nwritten++;
+		ConveyorBeltCleanupInsert(cb, buffer);
+
+		/* Set the start page for this run if not yet set. */
+		if (deadtidstate->start_page == CB_INVALID_LOGICAL_PAGE)
+			deadtidstate->start_page = pageno;
+	}
+
+	deadtidstate->last_page = pageno;
+}
+
+/*
+ * DTS_ReadDeadtids - Read dead tids from the conveyor belt
+ *
+ * 'from_pageno' is the conveyor belt page number from where we want to start
+ * and the 'to_pageno' upto which we want to read.
+ * 'deadtids' is pre allocated array for holding the dead tids, the allocated
+ * size is sufficient to hold 'maxtids'.
+ *
+ * On successful read last page number we have read from conveyor belt will be
+ * stored into the '*last_pageread' and the first page of the next run will
+ * be stored into the '*next_runpage'
+ *
+ * Returns number of dead tids actually read from the conveyor belt.  Should
+ * always be <= maxtids.
+ */
+int
+DTS_ReadDeadtids(DTS_DeadTidState *deadtidstate, CBPageNo from_pageno,
+				 CBPageNo to_pageno, int maxtids, ItemPointerData *deadtids,
+				 CBPageNo *last_pageread, CBPageNo *next_runpage)
+{
+	ConveyorBelt   *cb = deadtidstate->cb;
+	Page			page;
+	Buffer			buffer;
+	CBPageNo		oldest_pageno;
+	CBPageNo		next_pageno;
+	int				count = 0;
+	int				ndeadtids = 0;
+	int				prevpageno = from_pageno;
+
+	/* Read the conveyor belt bounds and check page validity. */
+	ConveyorBeltGetBounds(cb, &oldest_pageno, &next_pageno);
+	if (from_pageno >= next_pageno)
+		return 0;
+
+	if (*next_runpage == CB_INVALID_LOGICAL_PAGE)
+		*next_runpage = from_pageno;
+
+	Assert(from_pageno >= oldest_pageno &&
+		   (to_pageno == CB_INVALID_LOGICAL_PAGE || to_pageno <= next_pageno));
+
+	/*
+	 * If caller has passed a valid to_pageno then don't try to read beyond
+	 * that.
+	 */
+	if (to_pageno != CB_INVALID_LOGICAL_PAGE && to_pageno < next_pageno)
+		next_pageno = to_pageno;
+
+	/* Loop until we read maxtids. */
+	while(maxtids > 0 && from_pageno < next_pageno)
+	{
+		DTS_PageData *deadtidpd;
+
+		/* Read page from the conveyor belt. */
+		buffer = ConveyorBeltReadBuffer(cb, from_pageno, BUFFER_LOCK_SHARE,
+										NULL);
+		if (BufferIsInvalid(buffer))
+			break;
+
+		page = BufferGetPage(buffer);
+
+		/*
+		 * While reading the data we want to ensure that we never read the data
+		 * for the ongoing vacuum pass.  So at every run header page check
+		 * whether the next runpage is set or not, if it is not set the we are
+		 * done.
+		 */
+		if (from_pageno == *next_runpage)
+		{
+			deadtidpd = (DTS_PageData *) PageGetSpecialPointer(page);
+			*next_runpage = deadtidpd->nextrunpage;
+			if (*next_runpage == CB_INVALID_LOGICAL_PAGE)
+			{
+				UnlockReleaseBuffer(buffer);
+				break;
+			}
+		}
+
+		/*
+		 * Don't stop reading in middle of the page, so if remaining tids to
+		 * read are less than the tid in this page then stop here.
+		 */
+		if (maxtids < DTS_PageMaxDataSpace / sizeof(OffsetNumber))
+		{
+			UnlockReleaseBuffer(buffer);
+			break;
+		}
+
+		/* Read page and convert to the deadtids. */
+		ndeadtids = dts_read_pagedata(page, deadtids, maxtids);
+
+		deadtids += ndeadtids;
+		maxtids -= ndeadtids;
+		count += ndeadtids;
+
+		prevpageno = from_pageno;
+
+		/* Go to the next page. */
+		from_pageno++;
+		UnlockReleaseBuffer(buffer);
+	}
+
+	*last_pageread = prevpageno;
+
+	return count;
+}
+
+/*
+ * DTS_LoadDeadtids - Load dead offset from conveyor belt for given block.
+ *
+ * 'blkno' is current heap block we are going to scan, so this function need to
+ * ensure that we have all the previous dead offsets loaded into the cache for
+ * for this block from the conveyor belt.
+ *
+ * This is to ensure that when we are adding dead tids to the conveyor belt
+ * we don't add the duplicate tids.
+ */
+void
+DTS_LoadDeadtids(DTS_DeadTidState *deadtidstate, BlockNumber blkno)
+{
+	CBPageNo		oldest_pageno;
+	CBPageNo		next_pageno;
+
+	Assert(deadtidstate != NULL);
+
+	/*
+	 * If there are no more pending data to be pulled from any of the vacuum
+	 * run from the conveyor belt then just return.
+	 */
+	if (deadtidstate->completed)
+		return;
+
+	/* Get conveyor belt page range. */
+	ConveyorBeltGetBounds(deadtidstate->cb, &oldest_pageno, &next_pageno);
+
+	/*
+	 * If the oldest_page is same as next page then there is no data in the
+	 * conveyor belt, simmilarlly if oldest page is same as the start page of
+	 * this run then there is no previous runs in the conveyor belt so we don't
+	 * need to worry about checking the duplicates.
+	 */
+	if ((oldest_pageno == next_pageno) ||
+		(oldest_pageno == deadtidstate->start_page))
+	{
+		deadtidstate->completed = true;
+		deadtidstate->num_offsets = 0;
+
+		return;
+	}
+
+	/* If deadtid cache is not yet intialize then do it now. */
+	if (deadtidstate->num_offsets == -1)
+	{
+		Size	cachesize;
+
+		/* Allocate space for dead offset cache. */
+		deadtidstate->offsets =
+		(OffsetNumber *) palloc0(MaxHeapTuplesPerPage * sizeof(OffsetNumber));
+		deadtidstate->cansetunused =
+						(bool *) palloc0(MaxHeapTuplesPerPage * sizeof(bool));
+
+		/*
+		 * Compute the number of vacuum runs available in the conveyor belt and
+		 * also get the start conveyor belt pageno for each run.
+		 */
+		deadtidstate->deadtidrun = palloc0(sizeof(DTS_RunState));
+		cachesize = Min(maintenance_work_mem * 1024, MaxAllocSize);
+		cachesize = Min(cachesize, (next_pageno - oldest_pageno) * BLCKSZ);
+		deadtidstate->deadtidrun->cachesize = cachesize;
+		dts_process_runs(deadtidstate);
+	}
+
+	/*
+	 * Read data from different vacuum run cache and merge into a sorted offset
+	 * array for given 'blkno'.
+	 */
+	dts_merge_runs(deadtidstate, blkno);
+}
+
+/*
+ * DTS_DeadtidExists - Check whether the tid already exists.
+ *
+ * Returns true if the offset for the given block is already present in the
+ * conveyor belt.  Also set the '*setunused' flag to true if index vacuum
+ * is also done for this, that mean the caller can directly mark this item
+ * as unused.
+ */
+bool
+DTS_DeadtidExists(DTS_DeadTidState *deadtidstate, BlockNumber blkno,
+				  OffsetNumber offset, bool *setunused)
+{
+	OffsetNumber	*res;
+
+	if (deadtidstate == NULL)
+		return false;
+
+	/* No data to be compared for this block then just return. */
+	if (deadtidstate->num_offsets == 0)
+		return false;
+
+	/* Search the offset in the cache. */
+	res = (OffsetNumber *) bsearch((void *) &offset,
+									(void *) deadtidstate->offsets,
+									deadtidstate->num_offsets,
+									sizeof(OffsetNumber),
+									dts_offsetcmp);
+	if (res == NULL)
+		return false;
+
+	/* Get the flag whether this can be marked unused. */
+	*setunused = deadtidstate->cansetunused[res - deadtidstate->offsets];
+
+	return true;
+}
+
+/*
+ * DTS_ReleaseDeadTidState - Release the dead tid state.
+ *
+ * Release memory for the deadtid state and its members.  This also mark the
+ * current vacuum run is complete by storing the last page of the run into the
+ * special space of the first page of the run.
+ */
+void
+DTS_ReleaseDeadTidState(DTS_DeadTidState *deadtidstate)
+{
+	DTS_PageData *deadtidpd;
+	Buffer	buffer;
+	Page	page;
+
+	/* There is no data in this run so nothing to be done. */
+	if (deadtidstate->start_page == CB_INVALID_LOGICAL_PAGE)
+		return;
+
+	/* Read page from the conveyor belt. */
+	buffer = ConveyorBeltReadBuffer(deadtidstate->cb, deadtidstate->start_page,
+									BUFFER_LOCK_EXCLUSIVE, NULL);
+	if (BufferIsInvalid(buffer))
+		elog(ERROR, "invalid conveyor belt page" UINT64_FORMAT,
+			 deadtidstate->start_page);
+
+	page = BufferGetPage(buffer);
+	deadtidpd = (DTS_PageData *) PageGetSpecialPointer(page);
+	deadtidpd->nextrunpage = deadtidstate->last_page + 1;
+
+	/*
+	 * If min_idxvac_page is valid then we must have marked the offset unused
+	 * upto this page along with pruning so we can vacuum the conveyor belt
+	 * as well.
+	 */
+	if (deadtidstate->min_idxvac_page != CB_INVALID_LOGICAL_PAGE)
+		DTS_Vacuum(deadtidstate, deadtidstate->min_idxvac_page);
+
+	/* TODO - WAL log this operation*/
+	elog(LOG, "run complete setting next run page = "UINT64_FORMAT,
+		 deadtidpd->nextrunpage);
+
+	UnlockReleaseBuffer(buffer);
+
+	/* Cleanup and release the deadtid state. */
+	if (deadtidstate->deadtidrun != NULL)
+	{
+		DTS_RunState   *deadtidrun = deadtidstate->deadtidrun;
+
+		if (deadtidrun->cache != NULL)
+			pfree(deadtidrun->cache);
+		if (deadtidrun->startpage != NULL)
+			pfree(deadtidrun->startpage);
+		if (deadtidrun->nextpage != NULL)
+			pfree(deadtidrun->nextpage);
+		if (deadtidrun->runoffset != NULL)
+			pfree(deadtidrun->runoffset);
+
+		pfree(deadtidrun);
+	}
+	if (deadtidstate->offsets)
+		pfree(deadtidstate->offsets);
+	if (deadtidstate->cansetunused)
+		pfree(deadtidstate->cansetunused);
+
+	pfree(deadtidstate);
+}
+
+/*
+ * DTS_Vacuum - truncate and vacuum the underlying conveyor belt.
+ */
+void
+DTS_Vacuum(DTS_DeadTidState	*deadtidstate, CBPageNo	pageno)
+{
+	/* Truncate the conveyor belt pages which we have already processed. */
+	ConveyorBeltLogicalTruncate(deadtidstate->cb, pageno);
+
+	/* Vacuum the conveyor belt. */
+	ConveyorBeltVacuum(deadtidstate->cb);
+}
+
+/*
+ * dts_page_init - Initialize an deadtid page.
+ */
+static void
+dts_page_init(Page page)
+{
+	PageHeader	p = (PageHeader) page;
+	DTS_PageData	   *deadtidpd;
+
+	PageInit(page, BLCKSZ, sizeof(DTS_PageData));
+	deadtidpd = (DTS_PageData *) PageGetSpecialPointer(page);
+	deadtidpd->nextrunpage = CB_INVALID_LOGICAL_PAGE;
+
+	/* Start writing data from the maxaligned offset. */
+	p->pd_lower = MAXALIGN(SizeOfPageHeaderData);
+}
+
+/*
+ * dts_read_pagedata - Read page data and convert to deadtids
+ *
+ * In conveyor belt data are copied in form of [DST_BlockHeader] and
+ * [offset array], so now we process the page and convert it to tid format.
+ */
+static int
+dts_read_pagedata(Page page, ItemPointerData *deadtids, int maxtids)
+{
+	char	   *pagedata;
+	int			ntids = 0;
+	int			nbyteread = 0;
+	DTS_BlockHeader *blkhdr = NULL;
+
+	pagedata = PageGetContents(page);
+
+	/* Loop until we convert complete page. */
+	while(1)
+	{
+		ItemPointerData tmp;
+		OffsetNumber   *offsets;
+		BlockNumber		blkno;
+		int		i;
+
+		/*
+		 * While copying the data into the page we had ensured that we will
+		 * copy the block data to the page iff we can copy block header and
+		 * at least one offset, so based on the current nbyteread if the
+		 * remaining space of the page is not enough for that data then we can
+		 * stop.
+		 */
+		if (DTS_PageMaxDataSpace - nbyteread < DTS_MinCopySize)
+			break;
+
+		/*
+		 * Read next block header, if the block header is not initialized i.e
+		 * noffsets is 0 then we are done with this page.
+		 */
+		blkhdr = (DTS_BlockHeader *) (pagedata + nbyteread);
+		blkno = BlockIdGetBlockNumber(&blkhdr->blkid);
+		if (blkhdr->noffsets == 0)
+			break;
+
+		/*
+		 * Skip the block header so that we reach to the offset array for this
+		 * block number.
+		 */
+		nbyteread += sizeof(DTS_BlockHeader);
+		offsets = (OffsetNumber *) (pagedata + nbyteread);
+
+		/*
+		 * Use the same block number as in the block header and generate tid
+		 * w.r.t. each offset in the offset array.
+		 */
+		ItemPointerSetBlockNumber(&tmp, blkno);
+		for (i = 0; i < blkhdr->noffsets; i++)
+		{
+			ItemPointerSetOffsetNumber(&tmp, offsets[i]);
+			deadtids[ntids++] = tmp;
+		}
+
+		/*
+		 * Skip all the offset w.r.t. the current block so that we point to the
+		 * next block header in the page.
+		 */
+		nbyteread += (blkhdr->noffsets * sizeof(OffsetNumber));
+
+		/*
+		 * The caller has ensured that all the tids in this page should fit
+		 * into the buffer so it should never exceed that.
+		 */
+		Assert(ntids <= maxtids);
+	}
+
+	return ntids;
+}
+
+/*
+ * dts_offsetcmp() - Compare two offset numbers, return -1, 0, or +1.
+ */
+static int32
+dts_offsetcmp(const void *a, const void *b)
+{
+	OffsetNumber offset1 = *((const OffsetNumber *) a);
+	OffsetNumber offset2 = *((const OffsetNumber *) b);
+
+	if (offset1 < offset2)
+		return -1;
+	else if (offset1 > offset2)
+		return 1;
+	else
+		return 0;
+}
+
+/*
+ * dts_load_run - load data from conveyor belt vacuum run into the run cache.
+ *
+ * Pull more data from the conveyor belt into the cache.  This will pull the
+ * data only for the input vacuum 'run'.
+ */
+static void
+dts_load_run(DTS_DeadTidState *deadtidstate, int run)
+{
+	DTS_RunState   *runstate = deadtidstate->deadtidrun;
+	ConveyorBelt   *cb = deadtidstate->cb;
+	CBPageNo		from_pageno;
+	CBPageNo		endpage;
+	CBPageNo		prevpage;
+	CBPageNo		oldest_pageno;
+	CBPageNo		next_pageno;
+	char		   *cache;
+	int				runsize = runstate->cachesize / runstate->num_runs;
+	int				nremaining = runsize;
+
+	/*
+	 * If the next conveyor belt page to be processed for this run is set to
+	 * CB_INVALID_LOGICAL_PAGE that means we have already pulled all the data
+	 * from this run.  So set the runoffset for this run to -1 so that next
+	 * time the merge will skip this run.
+	 */
+	prevpage = from_pageno = runstate->nextpage[run];
+	if (from_pageno == CB_INVALID_LOGICAL_PAGE)
+	{
+		runstate->runoffset[run] = -1;
+		return;
+	}
+
+	/* Refilling this run's cache with new data so reset the runoffset. */
+	runstate->runoffset[run] = 0;
+
+	ConveyorBeltGetBounds(deadtidstate->cb, &oldest_pageno, &next_pageno);
+
+	/*
+	 * Set the endpage so that while reading the data for this run we do not
+	 * overflow to the next run.
+	 */
+	if (run < runstate->num_runs - 1)
+		endpage = runstate->startpage[run + 1];
+	else
+		endpage = next_pageno;
+
+	/* make cache to point to the current run's space. */
+	cache = runstate->cache + runsize * run;
+	memset(cache, 0, runsize);
+
+	/* We must read at least one conveyor belt page. */
+	Assert(nremaining >= DTS_PageMaxDataSpace);
+
+	while(nremaining > 0 && from_pageno < endpage)
+	{
+		Buffer		buffer;
+		PageHeader	phdr;
+		Page		page;
+		int			copysz;
+
+		/* Read page from the conveyor belt. */
+		buffer = ConveyorBeltReadBuffer(cb, from_pageno, BUFFER_LOCK_SHARE,
+										NULL);
+		if (BufferIsInvalid(buffer))
+			break;
+
+		page = BufferGetPage(buffer);
+		phdr = (PageHeader) page;
+
+		/*
+		 * There might be some empty space at the end of the page so while
+		 * copying skip that size, that size will be equal to difference
+		 * between pd_upper and pd_lower.
+		 */
+		copysz = DTS_PageMaxDataSpace - (phdr->pd_upper - phdr->pd_lower);
+		memcpy(cache, PageGetContents(page), copysz);
+		nremaining -= copysz;
+		cache += copysz;
+
+		prevpage = from_pageno;
+
+		/* Go to the next page. */
+		from_pageno++;
+		UnlockReleaseBuffer(buffer);
+
+		/*
+		 * Don't stop reading in middle of the page, so if the remaining read
+		 * size is less than the size of the page then stop, we will read this
+		 * page in the next round.
+		 */
+		if (nremaining < DTS_PageMaxDataSpace)
+			break;
+	}
+
+	if (prevpage >= endpage)
+		runstate->nextpage[run] = CB_INVALID_LOGICAL_PAGE;
+	else if (from_pageno == prevpage)
+		runstate->nextpage[run] = prevpage;
+	else
+		runstate->nextpage[run] = prevpage + 1;
+}
+
+/*
+ * dts_merge_runs - process and merge vacuum runs from run cache.
+ *
+ * Pass through each vacuum run in the conveyor belt and load a few blocks from
+ * starting of each vacuum runs based on the cache size.
+ *
+ * On return this will have all the offset for given 'blkno' stored into the
+ * 'deadtidstate->offsets' array in sorted order.  And this will also mark
+ * 'deadtidstate->cansetunused' flag w.r.t each offset indicating whether the
+ * index vacuum is already done for this or not.
+ */
+static void
+dts_merge_runs(DTS_DeadTidState *deadtidstate, BlockNumber targetblk)
+{
+	DTS_RunState	   *runstate = deadtidstate->deadtidrun;
+	DTS_BlockHeader	   *blkhdr;
+	OffsetNumber		smallestoffset = InvalidOffsetNumber;
+	CBPageNo			min_idxvac_page = deadtidstate->min_idxvac_page;
+	int		run;
+	int		smallestrun;
+	int		runsize;
+	int		noffsets = 0;
+	int	   *runindex;
+	int	   *runoffset = runstate->runoffset;
+	bool	nopendingdata = true;
+
+	/*
+	 * If we can not pull at least one block from each run then we can not
+	 * perform the in memory merge so we will have to perform disk based
+	 * merging.  Which is not yet implemented.
+	 *
+	 * XXX maybe instead of pulling whole conveyor belt page we can consider
+	 * that at least if we can pull all the offset w.r.t. one block header then
+	 * also we can consider doing the in-memory sort merge.
+	 */
+	if (runstate->num_runs * DTS_PageMaxDataSpace > runstate->cachesize)
+		elog(ERROR, "disk based merging is not yet implemeted");
+
+	/* Compute the max tids can be fit into each run cache. */
+	runsize = runstate->cachesize / runstate->num_runs;
+
+	/*
+	 * Maintain a local index for each run, this tracks which offset currently
+	 * we are merging for each run within the current block header offset
+	 * array, as soon as we go to the next block header we will reset it.
+	 */
+	runindex = palloc0(sizeof(int) * runstate->num_runs);
+
+	/*
+	 * Loop untill we fetch all the deadoffset from the conveyor belt for the
+	 * input blkno.
+	 */
+	while (1)
+	{
+		for (run = 0; run < runstate->num_runs; run++)
+		{
+			DTS_BlockHeader *blkhdr;
+			OffsetNumber   *nextoffset;
+			BlockNumber		blkno;
+
+			if (runoffset[run] == -1)
+				continue;
+
+			nopendingdata = false;
+			blkhdr = DTS_GetRunBlkHdr(runstate, run, runsize);
+			blkno = BlockIdGetBlockNumber(&blkhdr->blkid);
+
+			/*
+			 * Make sure that before we start merging offsets from this run,
+			 * the run is pointing to the right block header.  If block no is
+			 * of the block header already higher than the targetblk that
+			 * means this run might not have more data for this block so don't
+			 * process this run further.  If block header is for smaller block
+			 * than we are looking for one or it is uninitialized then move
+			 * until we reach to the right block header.
+			 */
+			while (blkno < targetblk || (blkno == 0 && blkhdr->noffsets == 0))
+			{
+				/*
+				 * There is no more data into the current run so nothing to do
+				 * for the current run.
+				 */
+				if (runoffset[run] == -1)
+					break;
+
+				/*
+				 * If the current block header shows 0 offsets that means we
+				 * have reached to the end of the cached data for this run so
+				 * pull more data for this run from the conveyor belt.
+				 * Otherwise, Move runoffset to point to the next block header.
+				 */
+				if (blkhdr->noffsets == 0)
+				{
+					dts_load_run(deadtidstate, run);
+					runindex[run] = 0;
+				}
+				else
+					runoffset[smallestrun] +=
+										DTS_BlkDataSize(blkhdr->noffsets);
+
+				/* Fetch the current block header from the run. */
+				blkhdr = DTS_GetRunBlkHdr(runstate, run, runsize);
+				blkno = BlockIdGetBlockNumber(&blkhdr->blkid);
+			}
+
+			/* We have found a matching block header. */
+			if (blkno == targetblk)
+			{
+				/* Block header can not be with 0 offsets. */
+				Assert(blkhdr->noffsets != 0);
+
+				/* Skip block header to point to the first offset. */
+				nextoffset = (OffsetNumber *)
+								((char *) blkhdr + sizeof(DTS_BlockHeader));
+				nextoffset += runindex[run];
+
+				/*
+				 * Set the smallest offset and remember the run from which we
+				 * got the smallest offset.
+				 */
+				if (!(OffsetNumberIsValid(smallestoffset)) ||
+					*nextoffset < smallestoffset)
+				{
+					smallestoffset = *nextoffset;
+					smallestrun = run;
+				}
+			}
+		}
+
+		/*
+		 * If we could not fetch next offset that means we don't have any more
+		 * offset for the current block.  So we are done for now.
+		 */
+		if (!OffsetNumberIsValid(smallestoffset))
+			break;
+
+		/* Fetch block header from the current position of the run. */
+		blkhdr = DTS_GetRunBlkHdr(runstate, smallestrun, runsize);
+
+		/*
+		 * Go to the next offset of the run from which we have found the
+		 * smallest offset.
+		 */
+		runindex[smallestrun]++;
+
+		/*
+		 * If we have processed all the offset for this block header then move
+		 * to the next block header, because due to the page split it is
+		 * possible that we might get the block header for the same block
+		 * again.
+		 */
+		if (runindex[smallestrun] >= blkhdr->noffsets)
+		{
+			runoffset[smallestrun] += DTS_BlkDataSize(blkhdr->noffsets);
+			runindex[smallestrun] = 0;
+		}
+
+		/*
+		 * Store the offset and also set a flag that whether we have already
+		 * done index vacuum for this offset or not.
+		 */
+		deadtidstate->offsets[noffsets] = smallestoffset;
+		if (min_idxvac_page != CB_INVALID_LOGICAL_PAGE &&
+			runstate->startpage[smallestrun + 1] <= min_idxvac_page)
+			deadtidstate->cansetunused[noffsets] = true;
+		else
+			deadtidstate->cansetunused[noffsets] = false;
+
+		smallestoffset = InvalidOffsetNumber;
+		noffsets++;
+
+		/*
+		 * In deadtidstate->offsets cache we are collecting the offsets only
+		 * for the one block so it can never be > MaxHeapTuplesPerPage.
+		 */
+		Assert(noffsets <= MaxHeapTuplesPerPage);
+	}
+
+	/*
+	 * Store number of offset into the the deadtidstate and also mark it
+	 * completed if we have processed all the data from the conveyor belt.
+	 */
+	deadtidstate->num_offsets = noffsets;
+	deadtidstate->completed = nopendingdata;
+
+	/* Cleanup. */
+	pfree(runindex);
+}
+
+/*
+ * dts_process_runs - Compute number of vacuums runs in conveyor belt.
+ *
+ * Pass over the vacuum run in the conveyor belt and remember the start page
+ * for each vacuum run.  Later dts_load_run() will maintain cache over each
+ * run and pull the data from each run as and when needed.
+ */
+static void
+dts_process_runs(DTS_DeadTidState *deadtidstate)
+{
+	ConveyorBelt   *cb = deadtidstate->cb;
+	DTS_RunState   *deadtidrun = deadtidstate->deadtidrun;
+	CBPageNo		oldest_pageno;
+	CBPageNo		next_pageno;
+	CBPageNo		curpageno;
+	CBPageNo	   *next_run_page;
+	int				maxrun;
+	int				nRunCount = 0;
+	int				nelement;
+
+	/* Get conveyor belt page bounds. */
+	ConveyorBeltGetBounds(cb, &oldest_pageno, &next_pageno);
+	curpageno = oldest_pageno;
+
+	/*
+	 * Allocate next_run_page, this will hold the next page to load from each
+	 * run.  Initially start with DTS_NUM_VACUUM_RUN and while processing the
+	 * runs, if the number of runs are more that this value then expand the
+	 * memory.
+	 */
+	maxrun = DTS_NUM_VACUUM_RUN;
+	next_run_page = palloc(maxrun * sizeof(CBPageNo));
+	deadtidrun->cache = palloc0(deadtidrun->cachesize);
+
+	/*
+	 * Read page from the conveyor belt.
+	 *
+	 * TODO: this conveyor belt reading logic is duplicated in multiple
+	 * functions with some minor differences so this could be unified.
+	 */
+	while(curpageno < next_pageno)
+	{
+		DTS_PageData   *deadtidpd;
+		Buffer			buffer;
+		Page			page;
+		CBPageNo		nextrunpage;
+
+		/* Read page from the conveyor belt. */
+		buffer = ConveyorBeltReadBuffer(cb, curpageno, BUFFER_LOCK_SHARE,
+										NULL);
+		if (BufferIsInvalid(buffer))
+			elog(ERROR, "invalid conveyor belt page" UINT64_FORMAT, curpageno);
+
+		page = BufferGetPage(buffer);
+		deadtidpd = (DTS_PageData *) PageGetSpecialPointer(page);
+		nextrunpage = deadtidpd->nextrunpage;
+		UnlockReleaseBuffer(buffer);
+
+		/* If nextrun page is set then go to the next run. */
+		if (nextrunpage != CB_INVALID_LOGICAL_PAGE)
+		{
+			/* If current memory is not enough then resize it. */
+			if (nRunCount == maxrun - 1)
+			{
+				maxrun *= 2;
+				next_run_page = repalloc(next_run_page,
+										 maxrun * sizeof(CBPageNo));
+			}
+
+			/* Remember the start page of the current run. */
+			next_run_page[nRunCount++] = curpageno;
+			curpageno = nextrunpage;
+		}
+		else
+			break;
+	}
+
+	/*
+	 * If we are in this function we must have at least one previous vacuum
+	 * run, otherwise caller should have exited before only based on conveyor
+	 * belt page bounds.
+	 */
+	Assert(nRunCount > 0);
+
+	/*
+	 * We have the start page for each run and the end page can be computed
+	 * as the start page of the next run so only for the last run we need to
+	 * store one extra element which will be treated as a end page for the
+	 * last run.
+	 */
+	nelement = nRunCount + 1;
+	next_run_page[nRunCount] = curpageno;
+	deadtidrun->num_runs = nRunCount;
+	deadtidrun->startpage = palloc(nelement * sizeof(CBPageNo));
+	deadtidrun->nextpage = palloc(nelement * sizeof(CBPageNo));
+	deadtidrun->runoffset = palloc0(nelement * sizeof(int));
+
+	/*
+	 * Set startpage and next page for each run.  Initially both will be same
+	 * but as we pull data from the conveyor belt the nextpage for each run
+	 * will move to the next page to be fetched from the conveyor belt whereas
+	 * the startpage will remain the same throughout complete vacuum pass.
+	 */
+	memcpy(deadtidrun->startpage, next_run_page, nelement * sizeof(CBPageNo));
+	memcpy(deadtidrun->nextpage, next_run_page, nelement * sizeof(CBPageNo));
+	pfree(next_run_page);
+}
+
+/*
+ * AssertCheckPageData - Verify whether the page data format is valid or not.
+ *
+ * Check whether all page data are written proper format i.e.
+ * [Block Header][Offset Array] and all boundries are sane.
+ *
+ * No-op if assertions are not in use.
+ */
+static void
+AssertCheckPageData(Page page)
+{
+#ifdef USE_ASSERT_CHECKING
+	PageHeader	phdr;
+	char   *pagedata;
+	int		nbyteread = 0;
+	DTS_BlockHeader *blkhdr;
+
+	phdr = (PageHeader) page;
+	pagedata = (char *) PageGetContents(page);;
+
+	/* Loop until we convert complete page. */
+	while(1)
+	{
+		OffsetNumber   *offsets;
+		BlockNumber		blkno;
+		int		i;
+
+		/*
+		 * While copying the data into the page we have ensure that we will
+		 * copy the block data to the page iff we can copy block header and
+		 * at least one offset, so based on the current nbyteread if writable
+		 * space of the page is not enough for that data then we can stop.
+		 */
+		if (DTS_PageMaxDataSpace - nbyteread < DTS_MinCopySize)
+			break;
+
+		/*
+		 * Read next block header, if the block header is not initialized i.e
+		 * noffsets is 0 then we are done with this page.
+		 */
+		blkhdr = (DTS_BlockHeader *) (pagedata + nbyteread);
+		if (blkhdr->noffsets == 0)
+			break;
+
+		blkno = BlockIdGetBlockNumber(&blkhdr->blkid);
+
+		/*
+		 * Skip the block header so that we reach to the offset array for this
+		 * block number.
+		 */
+		nbyteread += sizeof(DTS_BlockHeader);
+		offsets = (OffsetNumber *) (pagedata + nbyteread);
+
+		/*
+		 * Use the same block number as in the block header and generate tid
+		 * w.r.t. each offset in the offset array.
+		 */
+		Assert(BlockNumberIsValid(blkno));
+		for (i = 0; i < blkhdr->noffsets; i++)
+			Assert(OffsetNumberIsValid(offsets[i]));
+
+		/*
+		 * Skip all the offset w.r.t. the current block so that we point to the
+		 * next block header in the page.
+		 */
+		nbyteread += (blkhdr->noffsets * sizeof(OffsetNumber));
+		Assert(nbyteread <= phdr->pd_lower);
+	}
+#endif
+}
diff --git a/src/include/access/deadtidstore.h b/src/include/access/deadtidstore.h
new file mode 100644
index 0000000..688c6ca
--- /dev/null
+++ b/src/include/access/deadtidstore.h
@@ -0,0 +1,76 @@
+/*-------------------------------------------------------------------------
+ *
+ * deadtidstore.h
+ *		Public API for deadtidstore.
+ *
+ *
+ * Copyright (c) 2016-2021, PostgreSQL Global Development Group
+ *
+ * src/include/access/deadtidstore.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef DEADTIDSTORE_H
+#define DEADTIDSTORE_H
+
+#include "access/conveyor.h"
+#include "storage/itemptr.h"
+#include "utils/relcache.h"
+
+/*
+ * In conveyor belt instead of saving the complete deadtids we will store the
+ * compressed format, i.e. [blkno, noffsets] [off-1][off-2]..[off-n], and
+ * whenever we are going for the index vacuum we can convert it back to the
+ * dead tid array.  So instead of converting it right after pruning each page
+ * as we are doing it now, we will delay it until we are not going for index
+ * vacuum.  Instead of BlockNumber, we use BlockIdData to avoid alignment
+ * padding.
+ */
+typedef struct DTS_BlockHeader
+{
+	BlockIdData		blkid;
+	uint16			noffsets;
+} DTS_BlockHeader;
+
+/*
+ * In the special space of the page we store the first pageno of the next
+ * vacuum run.  For more details refer comment atop DTS_RunState.
+ */
+typedef struct DTS_PageData
+{
+	CBPageNo	nextrunpage;
+} DTS_PageData;
+
+typedef struct DTS_DeadTidState DTS_DeadTidState;
+
+/* Usable space of the page. */
+#define DTS_PageMaxDataSpace \
+		(BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - sizeof(DTS_PageData))
+
+/* Compute required space for storing (block header + noffset). */
+#define DTS_BlkDataSize(noffset) \
+		sizeof(DTS_BlockHeader) + sizeof (OffsetNumber) * (noffset)
+
+/*
+ * We will copy the block data to the page iff the remaining space can fit
+ * the block header and at least one offset.
+ */
+#define DTS_MinCopySize	sizeof(DTS_BlockHeader) + sizeof(OffsetNumber)
+
+extern DTS_DeadTidState *DTS_InitDeadTidState(Relation rel,
+											  CBPageNo min_idxvac_page);
+extern void DTS_InsertDeadtids(DTS_DeadTidState *deadtidstate, char *data,
+							   int *pdatasizes, int npages);
+extern int DTS_ReadDeadtids(DTS_DeadTidState *deadtidstate,
+							CBPageNo from_pageno, CBPageNo to_pageno,
+							int maxtids, ItemPointerData *deadtids,
+							CBPageNo *last_pageread, CBPageNo *next_runpage);
+extern void DTS_LoadDeadtids(DTS_DeadTidState *deadtidstate,
+							 BlockNumber blkno);
+extern bool DTS_DeadtidExists(DTS_DeadTidState *deadtidstate,
+							  BlockNumber blkno, OffsetNumber offset,
+							  bool *setunused);
+extern void DTS_ReleaseDeadTidState(DTS_DeadTidState *deadtidstate);
+extern void DTS_Vacuum(DTS_DeadTidState	*deadtidstate, CBPageNo	pageno);
+
+#endif							/* DEADTIDSTORE_H */
-- 
1.8.3.1

Reply via email to