On Wed, Mar 13, 2024 at 8:39 AM Masahiko Sawada <sawada.m...@gmail.com> wrote:

> As I mentioned above, if we implement the test cases in C, we can use
> the debug-build array in the test code. And we won't use it in AND/OR
> operations tests in the future.

That's a really interesting idea, so I went ahead and tried that for
v71. This seems like a good basis for testing larger, randomized
inputs, once we decide how best to hide that from the expected output.
The tests use SQL functions do_set_block_offsets() and
check_set_block_offsets(). The latter does two checks against a tid
array, and replaces test_dump_tids(). Funnily enough, the debug array
itself gave false failures when using a similar array in the test
harness, because it didn't know all the places where the array should
have been sorted -- it only worked by chance before because of what
order things were done.

I squashed everything from v70 and also took the liberty of switching
on shared memory for tid store tests. The only reason we didn't do
this with the radix tree tests is that the static attach/detach
functions would raise warnings since they are not used.
From 9b6b517b9f9977999034d16b4dc33e91094ae7ba Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Tue, 12 Dec 2023 22:36:24 +0900
Subject: [PATCH v71 2/6] DEV: Debug TidStore.

---
 src/backend/access/common/tidstore.c | 203 ++++++++++++++++++++++++++-
 1 file changed, 201 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/common/tidstore.c b/src/backend/access/common/tidstore.c
index b725b62d4c..33753d8ed2 100644
--- a/src/backend/access/common/tidstore.c
+++ b/src/backend/access/common/tidstore.c
@@ -29,6 +29,11 @@
 #include "utils/dsa.h"
 #include "utils/memutils.h"
 
+/* Enable TidStore debugging if USE_ASSERT_CHECKING */
+#ifdef USE_ASSERT_CHECKING
+#define TIDSTORE_DEBUG
+#include "catalog/index.h"
+#endif
 
 #define WORDNUM(x)	((x) / BITS_PER_BITMAPWORD)
 #define BITNUM(x)	((x) % BITS_PER_BITMAPWORD)
@@ -88,6 +93,13 @@ struct TidStore
 
 	/* DSA area for TidStore if using shared memory */
 	dsa_area   *area;
+
+#ifdef TIDSTORE_DEBUG
+	ItemPointerData	*tids;
+	int64		max_tids;
+	int64		num_tids;
+	bool		tids_unordered;
+#endif
 };
 #define TidStoreIsShared(ts) ((ts)->area != NULL)
 
@@ -105,11 +117,25 @@ struct TidStoreIter
 
 	/* output for the caller */
 	TidStoreIterResult output;
+
+#ifdef TIDSTORE_DEBUG
+	/* iterator index for the ts->tids array */
+	int64		tids_idx;
+#endif
 };
 
 static void tidstore_iter_extract_tids(TidStoreIter *iter, uint64 key,
 									   BlocktableEntry *page);
 
+/* debug functions available only when TIDSTORE_DEBUG */
+#ifdef TIDSTORE_DEBUG
+static void ts_debug_set_block_offsets(TidStore *ts, BlockNumber blkno,
+									   OffsetNumber *offsets, int num_offsets);
+static void ts_debug_iter_check_tids(TidStoreIter *iter);
+static bool ts_debug_is_member(TidStore *ts, ItemPointer tid);
+static int itemptr_cmp(const void *left, const void *right);
+#endif
+
 /*
  * Create a TidStore. The TidStore will live in the memory context that is
  * CurrentMemoryContext at the time of this call. The TID storage, backed
@@ -154,6 +180,17 @@ TidStoreCreate(size_t max_bytes, dsa_area *area)
 	else
 		ts->tree.local = local_rt_create(ts->rt_context);
 
+#ifdef TIDSTORE_DEBUG
+	{
+		int64		max_tids = max_bytes / sizeof(ItemPointerData);
+
+		ts->tids = palloc(sizeof(ItemPointerData) * max_tids);
+		ts->max_tids = max_tids;
+		ts->num_tids = 0;
+		ts->tids_unordered = false;
+	}
+#endif
+
 	return ts;
 }
 
@@ -191,6 +228,7 @@ TidStoreDetach(TidStore *ts)
 	Assert(TidStoreIsShared(ts));
 
 	shared_rt_detach(ts->tree.shared);
+
 	pfree(ts);
 }
 
@@ -241,6 +279,11 @@ TidStoreDestroy(TidStore *ts)
 
 	MemoryContextDelete(ts->rt_context);
 
+#ifdef TIDSTORE_DEBUG
+	if (!TidStoreIsShared(ts))
+		pfree(ts->tids);
+#endif
+
 	pfree(ts);
 }
 
@@ -297,6 +340,14 @@ TidStoreSetBlockOffsets(TidStore *ts, BlockNumber blkno, OffsetNumber *offsets,
 		found = local_rt_set(ts->tree.local, blkno, page);
 
 	Assert(!found);
+
+#ifdef TIDSTORE_DEBUG
+	if (!TidStoreIsShared(ts))
+	{
+		/* Insert TIDs into the TID array too */
+		ts_debug_set_block_offsets(ts, blkno, offsets, num_offsets);
+	}
+#endif
 }
 
 /* Return true if the given TID is present in the TidStore */
@@ -310,6 +361,13 @@ TidStoreIsMember(TidStore *ts, ItemPointer tid)
 	OffsetNumber off = ItemPointerGetOffsetNumber(tid);
 	bool		ret;
 
+#ifdef TIDSTORE_DEBUG
+	bool ret_debug = false;
+
+	if (!TidStoreIsShared(ts))
+		ret_debug = ts_debug_is_member(ts, tid);
+#endif
+
 	if (TidStoreIsShared(ts))
 		page = shared_rt_find(ts->tree.shared, blk);
 	else
@@ -317,17 +375,29 @@ TidStoreIsMember(TidStore *ts, ItemPointer tid)
 
 	/* no entry for the blk */
 	if (page == NULL)
-		return false;
+	{
+		ret = false;
+		goto done;
+	}
 
 	wordnum = WORDNUM(off);
 	bitnum = BITNUM(off);
 
 	/* no bitmap for the off */
 	if (wordnum >= page->nwords)
-		return false;
+	{
+		ret = false;
+		goto done;
+	}
 
 	ret = (page->words[wordnum] & ((bitmapword) 1 << bitnum)) != 0;
 
+done:
+#ifdef TIDSTORE_DEBUG
+	if (!TidStoreIsShared(ts))
+		Assert(ret == ret_debug);
+#endif
+
 	return ret;
 }
 
@@ -360,6 +430,11 @@ TidStoreBeginIterate(TidStore *ts)
 	else
 		iter->tree_iter.local = local_rt_begin_iterate(ts->tree.local);
 
+#ifdef TIDSTORE_DEBUG
+	if (!TidStoreIsShared(ts))
+		iter->tids_idx = 0;
+#endif
+
 	return iter;
 }
 
@@ -387,6 +462,11 @@ TidStoreIterateNext(TidStoreIter *iter)
 	/* Collect TIDs extracted from the key-value pair */
 	tidstore_iter_extract_tids(iter, key, page);
 
+#ifdef TIDSTORE_DEBUG
+	if (!TidStoreIsShared(iter->ts))
+		ts_debug_iter_check_tids(iter);
+#endif
+
 	return result;
 }
 
@@ -459,3 +539,122 @@ tidstore_iter_extract_tids(TidStoreIter *iter, uint64 key, BlocktableEntry *page
 		}
 	}
 }
+
+#ifdef TIDSTORE_DEBUG
+/* Comparator routines for ItemPointer */
+static int
+itemptr_cmp(const void *left, const void *right)
+{
+	BlockNumber lblk,
+		rblk;
+	OffsetNumber loff,
+		roff;
+
+	lblk = ItemPointerGetBlockNumber((ItemPointer) left);
+	rblk = ItemPointerGetBlockNumber((ItemPointer) right);
+
+	if (lblk < rblk)
+		return -1;
+	if (lblk > rblk)
+		return 1;
+
+	loff = ItemPointerGetOffsetNumber((ItemPointer) left);
+	roff = ItemPointerGetOffsetNumber((ItemPointer) right);
+
+	if (loff < roff)
+		return -1;
+	if (loff > roff)
+		return 1;
+
+	return 0;
+}
+
+/* Insert TIDs to the TID array for debugging */
+static void
+ts_debug_set_block_offsets(TidStore *ts, BlockNumber blkno, OffsetNumber *offsets,
+						   int num_offsets)
+{
+	if (ts->num_tids > 0 &&
+		blkno < ItemPointerGetBlockNumber(&(ts->tids[ts->num_tids - 1])))
+	{
+		/* The array will be sorted at ts_debug_is_member() */
+		ts->tids_unordered = true;
+	}
+
+	for (int i = 0; i < num_offsets; i++)
+	{
+		ItemPointer tid;
+		int idx = ts->num_tids + i;
+
+		/* Enlarge the TID array if necessary */
+		if (idx >= ts->max_tids)
+		{
+			ts->max_tids *= 2;
+			ts->tids = repalloc(ts->tids, sizeof(ItemPointerData) * ts->max_tids);
+		}
+
+		tid = &(ts->tids[idx]);
+
+		ItemPointerSetBlockNumber(tid, blkno);
+		ItemPointerSetOffsetNumber(tid, offsets[i]);
+	}
+
+	ts->num_tids += num_offsets;
+}
+
+/* Return true if the given TID is present in the TID array */
+static bool
+ts_debug_is_member(TidStore *ts, ItemPointer tid)
+{
+	int64	litem,
+		ritem,
+		item;
+	ItemPointer res;
+
+	if (ts->num_tids == 0)
+		return false;
+
+	/* Make sure the TID array is sorted */
+	if (ts->tids_unordered)
+	{
+		qsort(ts->tids, ts->num_tids, sizeof(ItemPointerData), itemptr_cmp);
+		ts->tids_unordered = false;
+	}
+
+	litem = itemptr_encode(&ts->tids[0]);
+	ritem = itemptr_encode(&ts->tids[ts->num_tids - 1]);
+	item = itemptr_encode(tid);
+
+	/*
+	 * Doing a simple bound check before bsearch() is useful to avoid the
+	 * extra cost of bsearch(), especially if dead items on the heap are
+	 * concentrated in a certain range.	Since this function is called for
+	 * every index tuple, it pays to be really fast.
+	 */
+	if (item < litem || item > ritem)
+		return false;
+
+	res = bsearch(tid, ts->tids, ts->num_tids, sizeof(ItemPointerData),
+				  itemptr_cmp);
+
+	return (res != NULL);
+}
+
+/* Verify if the iterator output matches the TIDs in the array for debugging */
+static void
+ts_debug_iter_check_tids(TidStoreIter *iter)
+{
+	BlockNumber blkno = iter->output.blkno;
+
+	for (int i = 0; i < iter->output.num_offsets; i++)
+	{
+		ItemPointer tid = &(iter->ts->tids[iter->tids_idx + i]);
+
+		Assert((iter->tids_idx + i) < iter->ts->max_tids);
+		Assert(ItemPointerGetBlockNumber(tid) == blkno);
+		Assert(ItemPointerGetOffsetNumber(tid) == iter->output.offsets[i]);
+	}
+
+	iter->tids_idx += iter->output.num_offsets;
+}
+#endif
-- 
2.44.0

From 4ed9f578c2f8d20f4477c80703a085b3dd3bef01 Mon Sep 17 00:00:00 2001
From: John Naylor <john.naylor@postgresql.org>
Date: Wed, 13 Mar 2024 14:56:56 +0700
Subject: [PATCH v71 3/6] DEV: Fix failure to sort debug array for iteration

---
 src/backend/access/common/tidstore.c | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/src/backend/access/common/tidstore.c b/src/backend/access/common/tidstore.c
index 33753d8ed2..4f5882818b 100644
--- a/src/backend/access/common/tidstore.c
+++ b/src/backend/access/common/tidstore.c
@@ -433,6 +433,13 @@ TidStoreBeginIterate(TidStore *ts)
 #ifdef TIDSTORE_DEBUG
 	if (!TidStoreIsShared(ts))
 		iter->tids_idx = 0;
+
+	/* Make sure the TID array is sorted */
+	if (ts->tids_unordered)
+	{
+		qsort(ts->tids, ts->num_tids, sizeof(ItemPointerData), itemptr_cmp);
+		ts->tids_unordered = false;
+	}
 #endif
 
 	return iter;
-- 
2.44.0

From 753c38866b5185ce399f214e05e1869abb8d2dc2 Mon Sep 17 00:00:00 2001
From: John Naylor <john.naylor@postgresql.org>
Date: Wed, 13 Mar 2024 16:13:54 +0700
Subject: [PATCH v71 5/6] Use shared memory in TID store tests

---
 src/test/modules/test_tidstore/expected/test_tidstore.out | 5 +++--
 src/test/modules/test_tidstore/sql/test_tidstore.sql      | 5 +++--
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/src/test/modules/test_tidstore/expected/test_tidstore.out b/src/test/modules/test_tidstore/expected/test_tidstore.out
index 4eab5d30ba..84ec4c3a64 100644
--- a/src/test/modules/test_tidstore/expected/test_tidstore.out
+++ b/src/test/modules/test_tidstore/expected/test_tidstore.out
@@ -23,8 +23,9 @@ SELECT t_ctid
     LATERAL test_lookup_tids(foo.tids)
   WHERE found ORDER BY t_ctid;
 END;
--- Test a local tdistore. A shared tidstore is created by passing true.
-SELECT test_create(false);
+-- Create a TID store in shared memory. We can't do that for for the radix tree
+-- tests, because unused static functions would raise warnings there.
+SELECT test_create(true);
  test_create 
 -------------
  
diff --git a/src/test/modules/test_tidstore/sql/test_tidstore.sql b/src/test/modules/test_tidstore/sql/test_tidstore.sql
index 7317f52058..55535d4905 100644
--- a/src/test/modules/test_tidstore/sql/test_tidstore.sql
+++ b/src/test/modules/test_tidstore/sql/test_tidstore.sql
@@ -27,8 +27,9 @@ SELECT t_ctid
   WHERE found ORDER BY t_ctid;
 END;
 
--- Test a local tdistore. A shared tidstore is created by passing true.
-SELECT test_create(false);
+-- Create a TID store in shared memory. We can't do that for for the radix tree
+-- tests, because unused static functions would raise warnings there.
+SELECT test_create(true);
 
 -- Test on empty tidstore.
 SELECT *
-- 
2.44.0

From d09dc6f4ed069aa038700cd19300cf4d3bf80fc0 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 1 Mar 2024 16:04:49 +0900
Subject: [PATCH v71 6/6] Use TidStore to store dead tuple TIDs during lazy
 vacuum.

Previously, we used VacDeadItems, a simple ItemPointerData array, for
dead tuple's TID storage during lazyvacuum. It was not space efficient
and lookup performant.

With this change, lazyvacuum makes use of TidStore for dead tuple TIDs
storage. We use a new struct VacDeadItemsInfo to store additional
information for that such as the number of TIDs collected so
far. TidStore and VacDeadItemsInfo are shared among the parallel
vacuum worker in parallel vacuum case.

As of now, there is no concurrent multiple writes during parallel
vacuum, we don't need to take locks for now, though.

As for the progress reporting, reporting number of tuples does no
longer provide any meaningful insights for users. So it also changes
to report byte-based progress reporting. The columns of
pg_stat_progress_vacuum are also renamed accordingly: max_dead_tuples
and dead_tuple_bytes.

XXX: bump catalog version.
XXX: update typedef.list.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch-through:
---
 doc/src/sgml/monitoring.sgml                  |   8 +-
 src/backend/access/heap/vacuumlazy.c          | 272 ++++++++----------
 src/backend/catalog/system_views.sql          |   2 +-
 src/backend/commands/vacuum.c                 |  79 +----
 src/backend/commands/vacuumparallel.c         | 100 +++++--
 src/backend/storage/lmgr/lwlock.c             |   1 +
 .../utils/activity/wait_event_names.txt       |   2 +-
 src/include/commands/progress.h               |   4 +-
 src/include/commands/vacuum.h                 |  28 +-
 src/include/storage/lwlock.h                  |   1 +
 src/test/regress/expected/rules.out           |   4 +-
 src/tools/pgindent/typedefs.list              |   2 +-
 12 files changed, 228 insertions(+), 275 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index c8d76906aa..875c92d9f3 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -6240,10 +6240,10 @@ FROM pg_stat_get_backend_idset() AS backendid;
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>max_dead_tuples</structfield> <type>bigint</type>
+       <structfield>max_dead_tuple_bytes</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of dead tuples that we can store before needing to perform
+       Amount of dead tuple data that we can store before needing to perform
        an index vacuum cycle, based on
        <xref linkend="guc-maintenance-work-mem"/>.
       </para></entry>
@@ -6251,10 +6251,10 @@ FROM pg_stat_get_backend_idset() AS backendid;
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>num_dead_tuples</structfield> <type>bigint</type>
+       <structfield>dead_tuple_bytes</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of dead tuples collected since the last index vacuum cycle.
+       Amount of dead tuple data collected since the last index vacuum cycle.
       </para></entry>
      </row>
 
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 1800490775..b157664f2e 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -3,18 +3,17 @@
  * vacuumlazy.c
  *	  Concurrent ("lazy") vacuuming.
  *
- * The major space usage for vacuuming is storage for the array of dead TIDs
+ * The major space usage for vacuuming is TidStore, a storage for dead TIDs
  * that are to be removed from indexes.  We want to ensure we can vacuum even
  * the very largest relations with finite memory space usage.  To do that, we
- * set upper bounds on the number of TIDs we can keep track of at once.
+ * set upper bounds on the maximum memory that can be used for keeping track
+ * of dead TIDs at once.
  *
  * We are willing to use at most maintenance_work_mem (or perhaps
- * autovacuum_work_mem) memory space to keep track of dead TIDs.  We initially
- * allocate an array of TIDs of that size, with an upper limit that depends on
- * table size (this limit ensures we don't allocate a huge area uselessly for
- * vacuuming small tables).  If the array threatens to overflow, we must call
- * lazy_vacuum to vacuum indexes (and to vacuum the pages that we've pruned).
- * This frees up the memory space dedicated to storing dead TIDs.
+ * autovacuum_work_mem) memory space to keep track of dead TIDs.  If the
+ * TidStore is full, we must call lazy_vacuum to vacuum indexes (and to vacuum
+ * the pages that we've pruned). This frees up the memory space dedicated to
+ * to store dead TIDs.
  *
  * In practice VACUUM will often complete its initial pass over the target
  * heap relation without ever running out of space to store TIDs.  This means
@@ -39,6 +38,7 @@
 #include "access/heapam_xlog.h"
 #include "access/htup_details.h"
 #include "access/multixact.h"
+#include "access/tidstore.h"
 #include "access/transam.h"
 #include "access/visibilitymap.h"
 #include "access/xloginsert.h"
@@ -179,8 +179,13 @@ typedef struct LVRelState
 	 * that has been processed by lazy_scan_prune.  Also needed by
 	 * lazy_vacuum_heap_rel, which marks the same LP_DEAD line pointers as
 	 * LP_UNUSED during second heap pass.
+	 *
+	 * Both dead_items and di_info are allocated in shared memory in parallel
+	 * vacuum cases.
 	 */
-	VacDeadItems *dead_items;	/* TIDs whose index tuples we'll delete */
+	TidStore *dead_items;	/* TIDs whose index tuples we'll delete */
+	VacDeadItemsInfo	*di_info;
+
 	BlockNumber rel_pages;		/* total number of pages */
 	BlockNumber scanned_pages;	/* # pages examined (not skipped via VM) */
 	BlockNumber removed_pages;	/* # pages removed by relation truncation */
@@ -239,8 +244,9 @@ static bool lazy_scan_noprune(LVRelState *vacrel, Buffer buf,
 static void lazy_vacuum(LVRelState *vacrel);
 static bool lazy_vacuum_all_indexes(LVRelState *vacrel);
 static void lazy_vacuum_heap_rel(LVRelState *vacrel);
-static int	lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
-								  Buffer buffer, int index, Buffer vmbuffer);
+static void	lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
+								  Buffer buffer, OffsetNumber *offsets,
+								  int num_offsets, Buffer vmbuffer);
 static bool lazy_check_wraparound_failsafe(LVRelState *vacrel);
 static void lazy_cleanup_all_indexes(LVRelState *vacrel);
 static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel,
@@ -257,6 +263,9 @@ static void lazy_truncate_heap(LVRelState *vacrel);
 static BlockNumber count_nondeletable_pages(LVRelState *vacrel,
 											bool *lock_waiter_detected);
 static void dead_items_alloc(LVRelState *vacrel, int nworkers);
+static void dead_items_add(LVRelState *vacrel, BlockNumber blkno, OffsetNumber *offsets,
+						   int num_offsets);
+static void dead_items_reset(LVRelState *vacrel);
 static void dead_items_cleanup(LVRelState *vacrel);
 static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
 									 TransactionId *visibility_cutoff_xid, bool *all_frozen);
@@ -472,11 +481,11 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	}
 
 	/*
-	 * Allocate dead_items array memory using dead_items_alloc.  This handles
-	 * parallel VACUUM initialization as part of allocating shared memory
-	 * space used for dead_items.  (But do a failsafe precheck first, to
-	 * ensure that parallel VACUUM won't be attempted at all when relfrozenxid
-	 * is already dangerously old.)
+	 * Allocate dead_items memory using dead_items_alloc.  This handles parallel
+	 * VACUUM initialization as part of allocating shared memory space used for
+	 * dead_items.  (But do a failsafe precheck first, to ensure that parallel
+	 * VACUUM won't be attempted at all when relfrozenxid is already dangerously
+	 * old.)
 	 */
 	lazy_check_wraparound_failsafe(vacrel);
 	dead_items_alloc(vacrel, params->nworkers);
@@ -782,7 +791,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
  *		have collected the TIDs whose index tuples need to be removed.
  *
  *		Finally, invokes lazy_vacuum_heap_rel to vacuum heap pages, which
- *		largely consists of marking LP_DEAD items (from collected TID array)
+ *		largely consists of marking LP_DEAD items (from vacrel->dead_items)
  *		as LP_UNUSED.  This has to happen in a second, final pass over the
  *		heap, to preserve a basic invariant that all index AMs rely on: no
  *		extant index tuple can ever be allowed to contain a TID that points to
@@ -811,19 +820,20 @@ lazy_scan_heap(LVRelState *vacrel)
 				next_fsm_block_to_vacuum = 0;
 	bool		all_visible_according_to_vm;
 
-	VacDeadItems *dead_items = vacrel->dead_items;
+	TidStore *dead_items = vacrel->dead_items;
+	VacDeadItemsInfo *di_info = vacrel->di_info;
 	Buffer		vmbuffer = InvalidBuffer;
 	const int	initprog_index[] = {
 		PROGRESS_VACUUM_PHASE,
 		PROGRESS_VACUUM_TOTAL_HEAP_BLKS,
-		PROGRESS_VACUUM_MAX_DEAD_TUPLES
+		PROGRESS_VACUUM_MAX_DEAD_TUPLE_BYTES
 	};
 	int64		initprog_val[3];
 
 	/* Report that we're scanning the heap, advertising total # of blocks */
 	initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP;
 	initprog_val[1] = rel_pages;
-	initprog_val[2] = dead_items->max_items;
+	initprog_val[2] = di_info->max_bytes;
 	pgstat_progress_update_multi_param(3, initprog_index, initprog_val);
 
 	/* Initialize for the first heap_vac_scan_next_block() call */
@@ -866,8 +876,7 @@ lazy_scan_heap(LVRelState *vacrel)
 		 * dead_items TIDs, pause and do a cycle of vacuuming before we tackle
 		 * this page.
 		 */
-		Assert(dead_items->max_items >= MaxHeapTuplesPerPage);
-		if (dead_items->max_items - dead_items->num_items < MaxHeapTuplesPerPage)
+		if (TidStoreMemoryUsage(dead_items) > di_info->max_bytes)
 		{
 			/*
 			 * Before beginning index vacuuming, we release any pin we may
@@ -930,11 +939,11 @@ lazy_scan_heap(LVRelState *vacrel)
 
 		/*
 		 * If we didn't get the cleanup lock, we can still collect LP_DEAD
-		 * items in the dead_items array for later vacuuming, count live and
-		 * recently dead tuples for vacuum logging, and determine if this
-		 * block could later be truncated. If we encounter any xid/mxids that
-		 * require advancing the relfrozenxid/relminxid, we'll have to wait
-		 * for a cleanup lock and call lazy_scan_prune().
+		 * items in the dead_items for later vacuuming, count live and recently
+		 * dead tuples for vacuum logging, and determine if this block could
+		 * later be truncated. If we encounter any xid/mxids that require
+		 * advancing the relfrozenxid/relminxid, we'll have to wait for a
+		 * cleanup lock and call lazy_scan_prune().
 		 */
 		if (!got_cleanup_lock &&
 			!lazy_scan_noprune(vacrel, buf, blkno, page, &has_lpdead_items))
@@ -958,9 +967,9 @@ lazy_scan_heap(LVRelState *vacrel)
 		 * Like lazy_scan_noprune(), lazy_scan_prune() will count
 		 * recently_dead_tuples and live tuples for vacuum logging, determine
 		 * if the block can later be truncated, and accumulate the details of
-		 * remaining LP_DEAD line pointers on the page in the dead_items
-		 * array. These dead items include those pruned by lazy_scan_prune()
-		 * as well we line pointers previously marked LP_DEAD.
+		 * remaining LP_DEAD line pointers on the page in the dead_items.
+		 * These dead items include those pruned by lazy_scan_prune() as well
+		 * we line pointers previously marked LP_DEAD.
 		 */
 		if (got_cleanup_lock)
 			lazy_scan_prune(vacrel, buf, blkno, page,
@@ -1037,7 +1046,7 @@ lazy_scan_heap(LVRelState *vacrel)
 	 * Do index vacuuming (call each index's ambulkdelete routine), then do
 	 * related heap vacuuming
 	 */
-	if (dead_items->num_items > 0)
+	if (di_info->num_items > 0)
 		lazy_vacuum(vacrel);
 
 	/*
@@ -1763,22 +1772,9 @@ lazy_scan_prune(LVRelState *vacrel,
 	 */
 	if (lpdead_items > 0)
 	{
-		VacDeadItems *dead_items = vacrel->dead_items;
-		ItemPointerData tmp;
-
 		vacrel->lpdead_item_pages++;
 
-		ItemPointerSetBlockNumber(&tmp, blkno);
-
-		for (int i = 0; i < lpdead_items; i++)
-		{
-			ItemPointerSetOffsetNumber(&tmp, deadoffsets[i]);
-			dead_items->items[dead_items->num_items++] = tmp;
-		}
-
-		Assert(dead_items->num_items <= dead_items->max_items);
-		pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
-									 dead_items->num_items);
+		dead_items_add(vacrel, blkno, deadoffsets, lpdead_items);
 
 		/*
 		 * It was convenient to ignore LP_DEAD items in all_visible earlier on
@@ -1925,7 +1921,7 @@ lazy_scan_prune(LVRelState *vacrel,
  * lazy_scan_prune, which requires a full cleanup lock.  While pruning isn't
  * performed here, it's quite possible that an earlier opportunistic pruning
  * operation left LP_DEAD items behind.  We'll at least collect any such items
- * in the dead_items array for removal from indexes.
+ * in the dead_items for removal from indexes.
  *
  * For aggressive VACUUM callers, we may return false to indicate that a full
  * cleanup lock is required for processing by lazy_scan_prune.  This is only
@@ -2084,7 +2080,7 @@ lazy_scan_noprune(LVRelState *vacrel,
 	vacrel->NewRelfrozenXid = NoFreezePageRelfrozenXid;
 	vacrel->NewRelminMxid = NoFreezePageRelminMxid;
 
-	/* Save any LP_DEAD items found on the page in dead_items array */
+	/* Save any LP_DEAD items found on the page in dead_items */
 	if (vacrel->nindexes == 0)
 	{
 		/* Using one-pass strategy (since table has no indexes) */
@@ -2104,9 +2100,6 @@ lazy_scan_noprune(LVRelState *vacrel,
 	}
 	else if (lpdead_items > 0)
 	{
-		VacDeadItems *dead_items = vacrel->dead_items;
-		ItemPointerData tmp;
-
 		/*
 		 * Page has LP_DEAD items, and so any references/TIDs that remain in
 		 * indexes will be deleted during index vacuuming (and then marked
@@ -2114,17 +2107,7 @@ lazy_scan_noprune(LVRelState *vacrel,
 		 */
 		vacrel->lpdead_item_pages++;
 
-		ItemPointerSetBlockNumber(&tmp, blkno);
-
-		for (int i = 0; i < lpdead_items; i++)
-		{
-			ItemPointerSetOffsetNumber(&tmp, deadoffsets[i]);
-			dead_items->items[dead_items->num_items++] = tmp;
-		}
-
-		Assert(dead_items->num_items <= dead_items->max_items);
-		pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
-									 dead_items->num_items);
+		dead_items_add(vacrel, blkno, deadoffsets, lpdead_items);
 
 		vacrel->lpdead_items += lpdead_items;
 	}
@@ -2174,7 +2157,7 @@ lazy_vacuum(LVRelState *vacrel)
 	if (!vacrel->do_index_vacuuming)
 	{
 		Assert(!vacrel->do_index_cleanup);
-		vacrel->dead_items->num_items = 0;
+		dead_items_reset(vacrel);
 		return;
 	}
 
@@ -2203,7 +2186,7 @@ lazy_vacuum(LVRelState *vacrel)
 		BlockNumber threshold;
 
 		Assert(vacrel->num_index_scans == 0);
-		Assert(vacrel->lpdead_items == vacrel->dead_items->num_items);
+		Assert(vacrel->lpdead_items == vacrel->di_info->num_items);
 		Assert(vacrel->do_index_vacuuming);
 		Assert(vacrel->do_index_cleanup);
 
@@ -2230,8 +2213,8 @@ lazy_vacuum(LVRelState *vacrel)
 		 * cases then this may need to be reconsidered.
 		 */
 		threshold = (double) vacrel->rel_pages * BYPASS_THRESHOLD_PAGES;
-		bypass = (vacrel->lpdead_item_pages < threshold &&
-				  vacrel->lpdead_items < MAXDEADITEMS(32L * 1024L * 1024L));
+		bypass = (vacrel->lpdead_item_pages < threshold) &&
+			TidStoreMemoryUsage(vacrel->dead_items) < (32L * 1024L * 1024L);
 	}
 
 	if (bypass)
@@ -2276,7 +2259,7 @@ lazy_vacuum(LVRelState *vacrel)
 	 * Forget the LP_DEAD items that we just vacuumed (or just decided to not
 	 * vacuum)
 	 */
-	vacrel->dead_items->num_items = 0;
+	dead_items_reset(vacrel);
 }
 
 /*
@@ -2368,7 +2351,7 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 	 * place).
 	 */
 	Assert(vacrel->num_index_scans > 0 ||
-		   vacrel->dead_items->num_items == vacrel->lpdead_items);
+		   vacrel->di_info->num_items == vacrel->lpdead_items);
 	Assert(allindexes || VacuumFailsafeActive);
 
 	/*
@@ -2390,9 +2373,8 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 /*
  *	lazy_vacuum_heap_rel() -- second pass over the heap for two pass strategy
  *
- * This routine marks LP_DEAD items in vacrel->dead_items array as LP_UNUSED.
- * Pages that never had lazy_scan_prune record LP_DEAD items are not visited
- * at all.
+ * This routine marks LP_DEAD items in vacrel->dead_items as LP_UNUSED. Pages
+ * that never had lazy_scan_prune record LP_DEAD items are not visited at all.
  *
  * We may also be able to truncate the line pointer array of the heap pages we
  * visit.  If there is a contiguous group of LP_UNUSED items at the end of the
@@ -2408,10 +2390,11 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 static void
 lazy_vacuum_heap_rel(LVRelState *vacrel)
 {
-	int			index = 0;
 	BlockNumber vacuumed_pages = 0;
 	Buffer		vmbuffer = InvalidBuffer;
 	LVSavedErrInfo saved_err_info;
+	TidStoreIter *iter;
+	TidStoreIterResult *iter_result;
 
 	Assert(vacrel->do_index_vacuuming);
 	Assert(vacrel->do_index_cleanup);
@@ -2426,7 +2409,8 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 							 VACUUM_ERRCB_PHASE_VACUUM_HEAP,
 							 InvalidBlockNumber, InvalidOffsetNumber);
 
-	while (index < vacrel->dead_items->num_items)
+	iter = TidStoreBeginIterate(vacrel->dead_items);
+	while ((iter_result = TidStoreIterateNext(iter)) != NULL)
 	{
 		BlockNumber blkno;
 		Buffer		buf;
@@ -2435,7 +2419,7 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 
 		vacuum_delay_point();
 
-		blkno = ItemPointerGetBlockNumber(&vacrel->dead_items->items[index]);
+		blkno = iter_result->blkno;
 		vacrel->blkno = blkno;
 
 		/*
@@ -2449,7 +2433,8 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 		buf = ReadBufferExtended(vacrel->rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
 								 vacrel->bstrategy);
 		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
-		index = lazy_vacuum_heap_page(vacrel, blkno, buf, index, vmbuffer);
+		lazy_vacuum_heap_page(vacrel, blkno, buf, iter_result->offsets,
+							  iter_result->num_offsets, vmbuffer);
 
 		/* Now that we've vacuumed the page, record its available space */
 		page = BufferGetPage(buf);
@@ -2459,6 +2444,7 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 		RecordPageWithFreeSpace(vacrel->rel, blkno, freespace);
 		vacuumed_pages++;
 	}
+	TidStoreEndIterate(iter);
 
 	vacrel->blkno = InvalidBlockNumber;
 	if (BufferIsValid(vmbuffer))
@@ -2468,14 +2454,13 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 	 * We set all LP_DEAD items from the first heap pass to LP_UNUSED during
 	 * the second heap pass.  No more, no less.
 	 */
-	Assert(index > 0);
 	Assert(vacrel->num_index_scans > 1 ||
-		   (index == vacrel->lpdead_items &&
+		   (vacrel->di_info->num_items == vacrel->lpdead_items &&
 			vacuumed_pages == vacrel->lpdead_item_pages));
 
 	ereport(DEBUG2,
-			(errmsg("table \"%s\": removed %lld dead item identifiers in %u pages",
-					vacrel->relname, (long long) index, vacuumed_pages)));
+			(errmsg("table \"%s\": removed " INT64_FORMAT "dead item identifiers in %u pages",
+					vacrel->relname, vacrel->di_info->num_items, vacuumed_pages)));
 
 	/* Revert to the previous phase information for error traceback */
 	restore_vacuum_error_info(vacrel, &saved_err_info);
@@ -2483,21 +2468,17 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 
 /*
  *	lazy_vacuum_heap_page() -- free page's LP_DEAD items listed in the
- *						  vacrel->dead_items array.
+ *						  vacrel->dead_items store.
  *
  * Caller must have an exclusive buffer lock on the buffer (though a full
  * cleanup lock is also acceptable).  vmbuffer must be valid and already have
  * a pin on blkno's visibility map page.
- *
- * index is an offset into the vacrel->dead_items array for the first listed
- * LP_DEAD item on the page.  The return value is the first index immediately
- * after all LP_DEAD items for the same page in the array.
  */
-static int
+static void
 lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
-					  int index, Buffer vmbuffer)
+					  OffsetNumber *deadoffsets, int num_offsets,
+					  Buffer vmbuffer)
 {
-	VacDeadItems *dead_items = vacrel->dead_items;
 	Page		page = BufferGetPage(buffer);
 	OffsetNumber unused[MaxHeapTuplesPerPage];
 	int			nunused = 0;
@@ -2516,16 +2497,11 @@ lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
 
 	START_CRIT_SECTION();
 
-	for (; index < dead_items->num_items; index++)
+	for (int i = 0; i < num_offsets; i++)
 	{
-		BlockNumber tblk;
-		OffsetNumber toff;
 		ItemId		itemid;
+		OffsetNumber	toff = deadoffsets[i];
 
-		tblk = ItemPointerGetBlockNumber(&dead_items->items[index]);
-		if (tblk != blkno)
-			break;				/* past end of tuples for this block */
-		toff = ItemPointerGetOffsetNumber(&dead_items->items[index]);
 		itemid = PageGetItemId(page, toff);
 
 		Assert(ItemIdIsDead(itemid) && !ItemIdHasStorage(itemid));
@@ -2595,7 +2571,6 @@ lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
 
 	/* Revert to the previous phase information for error traceback */
 	restore_vacuum_error_info(vacrel, &saved_err_info);
-	return index;
 }
 
 /*
@@ -2722,8 +2697,8 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
  *	lazy_vacuum_one_index() -- vacuum index relation.
  *
  *		Delete all the index tuples containing a TID collected in
- *		vacrel->dead_items array.  Also update running statistics.
- *		Exact details depend on index AM's ambulkdelete routine.
+ *		vacrel->dead_items.  Also update running statistics. Exact
+ *		details depend on index AM's ambulkdelete routine.
  *
  *		reltuples is the number of heap tuples to be passed to the
  *		bulkdelete callback.  It's always assumed to be estimated.
@@ -2760,7 +2735,8 @@ lazy_vacuum_one_index(Relation indrel, IndexBulkDeleteResult *istat,
 							 InvalidBlockNumber, InvalidOffsetNumber);
 
 	/* Do bulk deletion */
-	istat = vac_bulkdel_one_index(&ivinfo, istat, (void *) vacrel->dead_items);
+	istat = vac_bulkdel_one_index(&ivinfo, istat, (void *) vacrel->dead_items,
+								  vacrel->di_info);
 
 	/* Revert to the previous phase information for error traceback */
 	restore_vacuum_error_info(vacrel, &saved_err_info);
@@ -3125,46 +3101,6 @@ count_nondeletable_pages(LVRelState *vacrel, bool *lock_waiter_detected)
 	return vacrel->nonempty_pages;
 }
 
-/*
- * Returns the number of dead TIDs that VACUUM should allocate space to
- * store, given a heap rel of size vacrel->rel_pages, and given current
- * maintenance_work_mem setting (or current autovacuum_work_mem setting,
- * when applicable).
- *
- * See the comments at the head of this file for rationale.
- */
-static int
-dead_items_max_items(LVRelState *vacrel)
-{
-	int64		max_items;
-	int			vac_work_mem = AmAutoVacuumWorkerProcess() &&
-		autovacuum_work_mem != -1 ?
-		autovacuum_work_mem : maintenance_work_mem;
-
-	if (vacrel->nindexes > 0)
-	{
-		BlockNumber rel_pages = vacrel->rel_pages;
-
-		max_items = MAXDEADITEMS(vac_work_mem * 1024L);
-		max_items = Min(max_items, INT_MAX);
-		max_items = Min(max_items, MAXDEADITEMS(MaxAllocSize));
-
-		/* curious coding here to ensure the multiplication can't overflow */
-		if ((BlockNumber) (max_items / MaxHeapTuplesPerPage) > rel_pages)
-			max_items = rel_pages * MaxHeapTuplesPerPage;
-
-		/* stay sane if small maintenance_work_mem */
-		max_items = Max(max_items, MaxHeapTuplesPerPage);
-	}
-	else
-	{
-		/* One-pass case only stores a single heap page's TIDs at a time */
-		max_items = MaxHeapTuplesPerPage;
-	}
-
-	return (int) max_items;
-}
-
 /*
  * Allocate dead_items (either using palloc, or in dynamic shared memory).
  * Sets dead_items in vacrel for caller.
@@ -3175,11 +3111,10 @@ dead_items_max_items(LVRelState *vacrel)
 static void
 dead_items_alloc(LVRelState *vacrel, int nworkers)
 {
-	VacDeadItems *dead_items;
-	int			max_items;
-
-	max_items = dead_items_max_items(vacrel);
-	Assert(max_items >= MaxHeapTuplesPerPage);
+	VacDeadItemsInfo *di_info;
+	int			vac_work_mem = AmAutoVacuumWorkerProcess() &&
+		autovacuum_work_mem != -1 ?
+		autovacuum_work_mem * 1024L : maintenance_work_mem * 1024L;
 
 	/*
 	 * Initialize state for a parallel vacuum.  As of now, only one worker can
@@ -3206,24 +3141,65 @@ dead_items_alloc(LVRelState *vacrel, int nworkers)
 		else
 			vacrel->pvs = parallel_vacuum_init(vacrel->rel, vacrel->indrels,
 											   vacrel->nindexes, nworkers,
-											   max_items,
+											   vac_work_mem,
 											   vacrel->verbose ? INFO : DEBUG2,
 											   vacrel->bstrategy);
 
 		/* If parallel mode started, dead_items space is allocated in DSM */
 		if (ParallelVacuumIsActive(vacrel))
 		{
-			vacrel->dead_items = parallel_vacuum_get_dead_items(vacrel->pvs);
+			vacrel->dead_items = parallel_vacuum_get_dead_items(vacrel->pvs,
+																&vacrel->di_info);
 			return;
 		}
 	}
 
 	/* Serial VACUUM case */
-	dead_items = (VacDeadItems *) palloc(vac_max_items_to_alloc_size(max_items));
-	dead_items->max_items = max_items;
-	dead_items->num_items = 0;
+	vacrel->dead_items = TidStoreCreate(vac_work_mem, NULL);
+
+	di_info = (VacDeadItemsInfo *) palloc(sizeof(VacDeadItemsInfo));
+	di_info->max_bytes = vac_work_mem;
+	di_info->num_items = 0;
+	vacrel->di_info = di_info;
+}
+
+/*
+ * Add the given block number and offset numbers to dead_items.
+ */
+static void
+dead_items_add(LVRelState *vacrel, BlockNumber blkno, OffsetNumber *offsets,
+			   int num_offsets)
+{
+	TidStore	*dead_items = vacrel->dead_items;
+
+	TidStoreSetBlockOffsets(dead_items, blkno, offsets, num_offsets);
+	vacrel->di_info->num_items += num_offsets;
+
+	/* update the memory usage report */
+	pgstat_progress_update_param(PROGRESS_VACUUM_DEAD_TUPLE_BYTES,
+								 TidStoreMemoryUsage(dead_items));
+}
+
+/*
+ * Forget all collected dead items.
+ */
+static void
+dead_items_reset(LVRelState *vacrel)
+{
+	TidStore	*dead_items = vacrel->dead_items;
+
+	if (ParallelVacuumIsActive(vacrel))
+	{
+		parallel_vacuum_reset_dead_items(vacrel->pvs);
+		return;
+	}
+
+	/* Recreate the tidstore with the same max_bytes limitation */
+	TidStoreDestroy(dead_items);
+	vacrel->dead_items = TidStoreCreate(vacrel->di_info->max_bytes, NULL);
 
-	vacrel->dead_items = dead_items;
+	/* Reset the counter */
+	vacrel->di_info->num_items = 0;
 }
 
 /*
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 04227a72d1..b6990ac0da 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1221,7 +1221,7 @@ CREATE VIEW pg_stat_progress_vacuum AS
                       END AS phase,
         S.param2 AS heap_blks_total, S.param3 AS heap_blks_scanned,
         S.param4 AS heap_blks_vacuumed, S.param5 AS index_vacuum_count,
-        S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples,
+        S.param6 AS max_dead_tuple_bytes, S.param7 AS dead_tuple_bytes,
         S.param8 AS indexes_total, S.param9 AS indexes_processed
     FROM pg_stat_get_progress_info('VACUUM') AS S
         LEFT JOIN pg_database D ON S.datid = D.oid;
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 25281bbed9..53c3da660b 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -116,7 +116,6 @@ static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
 static double compute_parallel_delay(void);
 static VacOptValue get_vacoptval_from_boolean(DefElem *def);
 static bool vac_tid_reaped(ItemPointer itemptr, void *state);
-static int	vac_cmp_itemptr(const void *left, const void *right);
 
 /*
  * GUC check function to ensure GUC value specified is within the allowable
@@ -2473,16 +2472,15 @@ get_vacoptval_from_boolean(DefElem *def)
  */
 IndexBulkDeleteResult *
 vac_bulkdel_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat,
-					  VacDeadItems *dead_items)
+					  TidStore *dead_items, VacDeadItemsInfo *di_info)
 {
 	/* Do bulk deletion */
 	istat = index_bulk_delete(ivinfo, istat, vac_tid_reaped,
 							  (void *) dead_items);
 
 	ereport(ivinfo->message_level,
-			(errmsg("scanned index \"%s\" to remove %d row versions",
-					RelationGetRelationName(ivinfo->index),
-					dead_items->num_items)));
+			(errmsg("scanned index \"%s\" to remove " INT64_FORMAT " row versions",
+					RelationGetRelationName(ivinfo->index), di_info->num_items)));
 
 	return istat;
 }
@@ -2513,82 +2511,15 @@ vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat)
 	return istat;
 }
 
-/*
- * Returns the total required space for VACUUM's dead_items array given a
- * max_items value.
- */
-Size
-vac_max_items_to_alloc_size(int max_items)
-{
-	Assert(max_items <= MAXDEADITEMS(MaxAllocSize));
-
-	return offsetof(VacDeadItems, items) + sizeof(ItemPointerData) * max_items;
-}
-
 /*
  *	vac_tid_reaped() -- is a particular tid deletable?
  *
  *		This has the right signature to be an IndexBulkDeleteCallback.
- *
- *		Assumes dead_items array is sorted (in ascending TID order).
  */
 static bool
 vac_tid_reaped(ItemPointer itemptr, void *state)
 {
-	VacDeadItems *dead_items = (VacDeadItems *) state;
-	int64		litem,
-				ritem,
-				item;
-	ItemPointer res;
-
-	litem = itemptr_encode(&dead_items->items[0]);
-	ritem = itemptr_encode(&dead_items->items[dead_items->num_items - 1]);
-	item = itemptr_encode(itemptr);
-
-	/*
-	 * Doing a simple bound check before bsearch() is useful to avoid the
-	 * extra cost of bsearch(), especially if dead items on the heap are
-	 * concentrated in a certain range.  Since this function is called for
-	 * every index tuple, it pays to be really fast.
-	 */
-	if (item < litem || item > ritem)
-		return false;
-
-	res = (ItemPointer) bsearch(itemptr,
-								dead_items->items,
-								dead_items->num_items,
-								sizeof(ItemPointerData),
-								vac_cmp_itemptr);
-
-	return (res != NULL);
-}
-
-/*
- * Comparator routines for use with qsort() and bsearch().
- */
-static int
-vac_cmp_itemptr(const void *left, const void *right)
-{
-	BlockNumber lblk,
-				rblk;
-	OffsetNumber loff,
-				roff;
-
-	lblk = ItemPointerGetBlockNumber((ItemPointer) left);
-	rblk = ItemPointerGetBlockNumber((ItemPointer) right);
-
-	if (lblk < rblk)
-		return -1;
-	if (lblk > rblk)
-		return 1;
-
-	loff = ItemPointerGetOffsetNumber((ItemPointer) left);
-	roff = ItemPointerGetOffsetNumber((ItemPointer) right);
-
-	if (loff < roff)
-		return -1;
-	if (loff > roff)
-		return 1;
+	TidStore *dead_items = (TidStore *) state;
 
-	return 0;
+	return TidStoreIsMember(dead_items, itemptr);
 }
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
index befda1c105..fc27e2837a 100644
--- a/src/backend/commands/vacuumparallel.c
+++ b/src/backend/commands/vacuumparallel.c
@@ -8,8 +8,8 @@
  *
  * In a parallel vacuum, we perform both index bulk deletion and index cleanup
  * with parallel worker processes.  Individual indexes are processed by one
- * vacuum process.  ParallelVacuumState contains shared information as well as
- * the memory space for storing dead items allocated in the DSM segment.  We
+ * vacuum process.  ParalleVacuumState contains shared information as well as
+ * the memory space for storing dead items allocated in the DSA area.  We
  * launch parallel worker processes at the start of parallel index
  * bulk-deletion and index cleanup and once all indexes are processed, the
  * parallel worker processes exit.  Each time we process indexes in parallel,
@@ -110,6 +110,12 @@ typedef struct PVShared
 
 	/* Counter for vacuuming and cleanup */
 	pg_atomic_uint32 idx;
+
+	/* DSA pointer to the shared TidStore */
+	dsa_pointer	dead_items_handle;
+
+	/* Statistics of shared dead items */
+	VacDeadItemsInfo	di_info;
 } PVShared;
 
 /* Status used during parallel index vacuum or cleanup */
@@ -176,7 +182,8 @@ struct ParallelVacuumState
 	PVIndStats *indstats;
 
 	/* Shared dead items space among parallel vacuum workers */
-	VacDeadItems *dead_items;
+	TidStore *dead_items;
+	dsa_area *dead_items_area;
 
 	/* Points to buffer usage area in DSM */
 	BufferUsage *buffer_usage;
@@ -232,20 +239,22 @@ static void parallel_vacuum_error_callback(void *arg);
  */
 ParallelVacuumState *
 parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
-					 int nrequested_workers, int max_items,
+					 int nrequested_workers, int vac_work_mem,
 					 int elevel, BufferAccessStrategy bstrategy)
 {
 	ParallelVacuumState *pvs;
 	ParallelContext *pcxt;
 	PVShared   *shared;
-	VacDeadItems *dead_items;
+	TidStore	*dead_items;
 	PVIndStats *indstats;
 	BufferUsage *buffer_usage;
 	WalUsage   *wal_usage;
+	void		*area_space;
+	dsa_area	*dead_items_dsa;
 	bool	   *will_parallel_vacuum;
 	Size		est_indstats_len;
 	Size		est_shared_len;
-	Size		est_dead_items_len;
+	Size		dsa_minsize = dsa_minimum_size();
 	int			nindexes_mwm = 0;
 	int			parallel_workers = 0;
 	int			querylen;
@@ -294,9 +303,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
-	/* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
-	est_dead_items_len = vac_max_items_to_alloc_size(max_items);
-	shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len);
+	/* Initial size of DSA for dead tuples -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
+	shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
 	/*
@@ -362,6 +370,16 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
 	pvs->indstats = indstats;
 
+	/* Prepare DSA space for dead items */
+	area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, area_space);
+	dead_items_dsa = dsa_create_in_place(area_space, dsa_minsize,
+										 LWTRANCHE_PARALLEL_VACUUM_DSA,
+										 pcxt->seg);
+	dead_items = TidStoreCreate(vac_work_mem, dead_items_dsa);
+	pvs->dead_items = dead_items;
+	pvs->dead_items_area = dead_items_dsa;
+
 	/* Prepare shared information */
 	shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
 	MemSet(shared, 0, est_shared_len);
@@ -371,6 +389,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 		(nindexes_mwm > 0) ?
 		maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
 		maintenance_work_mem;
+	shared->dead_items_handle = TidStoreGetHandle(dead_items);
+	shared->di_info.max_bytes = vac_work_mem;
 
 	/* Use the same buffer size for all workers */
 	shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
@@ -382,15 +402,6 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
 	pvs->shared = shared;
 
-	/* Prepare the dead_items space */
-	dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc,
-												   est_dead_items_len);
-	dead_items->max_items = max_items;
-	dead_items->num_items = 0;
-	MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items);
-	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, dead_items);
-	pvs->dead_items = dead_items;
-
 	/*
 	 * Allocate space for each worker's BufferUsage and WalUsage; no need to
 	 * initialize
@@ -448,6 +459,9 @@ parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
 			istats[i] = NULL;
 	}
 
+	TidStoreDestroy(pvs->dead_items);
+	dsa_detach(pvs->dead_items_area);
+
 	DestroyParallelContext(pvs->pcxt);
 	ExitParallelMode();
 
@@ -455,13 +469,39 @@ parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
 	pfree(pvs);
 }
 
-/* Returns the dead items space */
-VacDeadItems *
-parallel_vacuum_get_dead_items(ParallelVacuumState *pvs)
+/*
+ * Returns the dead items space and dead items information.
+ */
+TidStore *
+parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **di_info_p)
 {
+	*di_info_p = &(pvs->shared->di_info);
 	return pvs->dead_items;
 }
 
+/* Forget all items in dead_items */
+void
+parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
+{
+	TidStore	*dead_items = pvs->dead_items;
+	VacDeadItemsInfo *di_info = &(pvs->shared->di_info);
+
+	/*
+	 * Free the current tidstore and return allocated DSA segments to the
+	 * operating system. Then we recreate the tidstore with the same max_bytes
+	 * limitation we just used.
+	 */
+	TidStoreDestroy(dead_items);
+	dsa_trim(pvs->dead_items_area);
+	pvs->dead_items = TidStoreCreate(di_info->max_bytes, pvs->dead_items_area);
+
+	/* Update the DSA pointer for dead_items to the new one */
+	pvs->shared->dead_items_handle = TidStoreGetHandle(dead_items);
+
+	/* Reset the counter */
+	di_info->num_items = 0;
+}
+
 /*
  * Do parallel index bulk-deletion with parallel workers.
  */
@@ -861,7 +901,8 @@ parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
 	switch (indstats->status)
 	{
 		case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
-			istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items);
+			istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items,
+											  &pvs->shared->di_info);
 			break;
 		case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
 			istat_res = vac_cleanup_one_index(&ivinfo, istat);
@@ -961,7 +1002,9 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	Relation   *indrels;
 	PVIndStats *indstats;
 	PVShared   *shared;
-	VacDeadItems *dead_items;
+	TidStore	*dead_items;
+	void		*area_space;
+	dsa_area	*dead_items_area;
 	BufferUsage *buffer_usage;
 	WalUsage   *wal_usage;
 	int			nindexes;
@@ -1005,10 +1048,10 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 											 PARALLEL_VACUUM_KEY_INDEX_STATS,
 											 false);
 
-	/* Set dead_items space */
-	dead_items = (VacDeadItems *) shm_toc_lookup(toc,
-												 PARALLEL_VACUUM_KEY_DEAD_ITEMS,
-												 false);
+	/* Set dead items */
+	area_space = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, false);
+	dead_items_area = dsa_attach_in_place(area_space, seg);
+	dead_items = TidStoreAttach(dead_items_area, shared->dead_items_handle);
 
 	/* Set cost-based vacuum delay */
 	VacuumUpdateCosts();
@@ -1056,6 +1099,9 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
 						  &wal_usage[ParallelWorkerNumber]);
 
+	TidStoreDetach(dead_items);
+	dsa_detach(dead_items_area);
+
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
 
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index e9dd5e6f99..a640da0157 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -168,6 +168,7 @@ static const char *const BuiltinTrancheNames[] = {
 	[LWTRANCHE_SERIAL_SLRU] = "SerialSLRU",
 	[LWTRANCHE_SUBTRANS_SLRU] = "SubtransSLRU",
 	[LWTRANCHE_XACT_SLRU] = "XactSLRU",
+	[LWTRANCHE_PARALLEL_VACUUM_DSA] = "ParallelVacuumDSA",
 };
 
 StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index c08e00d1d6..7ad0c765ab 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -376,7 +376,7 @@ NotifySLRU	"Waiting to access the <command>NOTIFY</command> message SLRU cache."
 SerialSLRU	"Waiting to access the serializable transaction conflict SLRU cache."
 SubtransSLRU	"Waiting to access the sub-transaction SLRU cache."
 XactSLRU	"Waiting to access the transaction status SLRU cache."
-
+ParallelVacuumDSA	"Waiting for parallel vacuum dynamic shared memory allocation."
 
 #
 # Wait Events - Lock
diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h
index 73afa77a9c..82a8fe6bd1 100644
--- a/src/include/commands/progress.h
+++ b/src/include/commands/progress.h
@@ -23,8 +23,8 @@
 #define PROGRESS_VACUUM_HEAP_BLKS_SCANNED		2
 #define PROGRESS_VACUUM_HEAP_BLKS_VACUUMED		3
 #define PROGRESS_VACUUM_NUM_INDEX_VACUUMS		4
-#define PROGRESS_VACUUM_MAX_DEAD_TUPLES			5
-#define PROGRESS_VACUUM_NUM_DEAD_TUPLES			6
+#define PROGRESS_VACUUM_MAX_DEAD_TUPLE_BYTES	5
+#define PROGRESS_VACUUM_DEAD_TUPLE_BYTES		6
 #define PROGRESS_VACUUM_INDEXES_TOTAL			7
 #define PROGRESS_VACUUM_INDEXES_PROCESSED		8
 
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 7f623b37fd..0cb2e97726 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -17,6 +17,7 @@
 #include "access/htup.h"
 #include "access/genam.h"
 #include "access/parallel.h"
+#include "access/tidstore.h"
 #include "catalog/pg_class.h"
 #include "catalog/pg_statistic.h"
 #include "catalog/pg_type.h"
@@ -277,19 +278,14 @@ struct VacuumCutoffs
 };
 
 /*
- * VacDeadItems stores TIDs whose index tuples are deleted by index vacuuming.
+ * VacDeadItemsInfo stores additional information of dead tuple TIDs and
+ * dead tuple storage (e.g. TidStore).
  */
-typedef struct VacDeadItems
+typedef struct VacDeadItemsInfo
 {
-	int			max_items;		/* # slots allocated in array */
-	int			num_items;		/* current # of entries */
-
-	/* Sorted array of TIDs to delete from indexes */
-	ItemPointerData items[FLEXIBLE_ARRAY_MEMBER];
-} VacDeadItems;
-
-#define MAXDEADITEMS(avail_mem) \
-	(((avail_mem) - offsetof(VacDeadItems, items)) / sizeof(ItemPointerData))
+	size_t	max_bytes;	/* the maximum bytes TidStore can use */
+	int64	num_items;	/* current # of entries */
+} VacDeadItemsInfo;
 
 /* GUC parameters */
 extern PGDLLIMPORT int default_statistics_target;	/* PGDLLIMPORT for PostGIS */
@@ -350,10 +346,10 @@ extern Relation vacuum_open_relation(Oid relid, RangeVar *relation,
 									 LOCKMODE lmode);
 extern IndexBulkDeleteResult *vac_bulkdel_one_index(IndexVacuumInfo *ivinfo,
 													IndexBulkDeleteResult *istat,
-													VacDeadItems *dead_items);
+													TidStore *dead_items,
+													VacDeadItemsInfo *di_info);
 extern IndexBulkDeleteResult *vac_cleanup_one_index(IndexVacuumInfo *ivinfo,
 													IndexBulkDeleteResult *istat);
-extern Size vac_max_items_to_alloc_size(int max_items);
 
 /* In postmaster/autovacuum.c */
 extern void AutoVacuumUpdateCostLimit(void);
@@ -362,10 +358,12 @@ extern void VacuumUpdateCosts(void);
 /* in commands/vacuumparallel.c */
 extern ParallelVacuumState *parallel_vacuum_init(Relation rel, Relation *indrels,
 												 int nindexes, int nrequested_workers,
-												 int max_items, int elevel,
+												 int vac_work_mem, int elevel,
 												 BufferAccessStrategy bstrategy);
 extern void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats);
-extern VacDeadItems *parallel_vacuum_get_dead_items(ParallelVacuumState *pvs);
+extern TidStore *parallel_vacuum_get_dead_items(ParallelVacuumState *pvs,
+												VacDeadItemsInfo **di_info_p);
+extern void parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs);
 extern void parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs,
 												long num_table_tuples,
 												int num_index_scans);
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 152e3b047e..66b60191b5 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -217,6 +217,7 @@ typedef enum BuiltinTrancheIds
 	LWTRANCHE_SERIAL_SLRU,
 	LWTRANCHE_SUBTRANS_SLRU,
 	LWTRANCHE_XACT_SLRU,
+	LWTRANCHE_PARALLEL_VACUUM_DSA,
 	LWTRANCHE_FIRST_USER_DEFINED,
 }			BuiltinTrancheIds;
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 0cd2c64fca..6267647766 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2048,8 +2048,8 @@ pg_stat_progress_vacuum| SELECT s.pid,
     s.param3 AS heap_blks_scanned,
     s.param4 AS heap_blks_vacuumed,
     s.param5 AS index_vacuum_count,
-    s.param6 AS max_dead_tuples,
-    s.param7 AS num_dead_tuples,
+    s.param6 AS max_dead_tuple_bytes,
+    s.param7 AS dead_tuple_bytes,
     s.param8 AS indexes_total,
     s.param9 AS indexes_processed
    FROM (pg_stat_get_progress_info('VACUUM'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index a0fa0d2d1f..96d6bfa6f0 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2963,7 +2963,7 @@ UserMapping
 UserOpts
 VacAttrStats
 VacAttrStatsP
-VacDeadItems
+VacDeadItemsInfo
 VacErrPhase
 VacObjFilter
 VacOptValue
-- 
2.44.0

From 15e740bf3f7a58fccce3f19a94c840ff85016049 Mon Sep 17 00:00:00 2001
From: John Naylor <john.naylor@postgresql.org>
Date: Wed, 13 Mar 2024 14:02:01 +0700
Subject: [PATCH v71 4/6] WIP: Use array of TIDs in TID store regression tests

---
 .../test_tidstore/expected/test_tidstore.out  |  53 ++-----
 .../test_tidstore/sql/test_tidstore.sql       |   7 +-
 .../test_tidstore/test_tidstore--1.0.sql      |   7 +-
 .../modules/test_tidstore/test_tidstore.c     | 135 ++++++++++++------
 4 files changed, 114 insertions(+), 88 deletions(-)

diff --git a/src/test/modules/test_tidstore/expected/test_tidstore.out b/src/test/modules/test_tidstore/expected/test_tidstore.out
index 80698924a1..4eab5d30ba 100644
--- a/src/test/modules/test_tidstore/expected/test_tidstore.out
+++ b/src/test/modules/test_tidstore/expected/test_tidstore.out
@@ -53,16 +53,16 @@ VALUES (0), (1), (:maxblkno - 1), (:maxblkno / 2), (:maxblkno)
 offsets (off) AS (
 VALUES (1), (2), (:maxoffset / 2), (:maxoffset - 1), (:maxoffset)
 )
-SELECT test_set_block_offsets(blk, array_agg(offsets.off)::int2[])
+SELECT do_set_block_offsets(blk, array_agg(offsets.off)::int2[])
   FROM blocks, offsets
   GROUP BY blk;
- test_set_block_offsets 
-------------------------
-             2147483647
-                      0
-             4294967294
-                      1
-             4294967295
+ do_set_block_offsets 
+----------------------
+           2147483647
+                    0
+           4294967294
+                    1
+           4294967295
 (5 rows)
 
 -- Lookup test and dump (sorted) tids.
@@ -81,42 +81,19 @@ SELECT lookup_test();
  (4294967295,512)
 (10 rows)
 
+-- Check TIDs we've added to the store.
+SELECT check_set_block_offsets();
+ check_set_block_offsets 
+-------------------------
+ 
+(1 row)
+
 SELECT test_is_full();
  test_is_full 
 --------------
  f
 (1 row)
 
-SELECT test_dump_tids();
-  test_dump_tids  
-------------------
- (0,1)
- (0,2)
- (0,256)
- (0,511)
- (0,512)
- (1,1)
- (1,2)
- (1,256)
- (1,511)
- (1,512)
- (2147483647,1)
- (2147483647,2)
- (2147483647,256)
- (2147483647,511)
- (2147483647,512)
- (4294967294,1)
- (4294967294,2)
- (4294967294,256)
- (4294967294,511)
- (4294967294,512)
- (4294967295,1)
- (4294967295,2)
- (4294967295,256)
- (4294967295,511)
- (4294967295,512)
-(25 rows)
-
 -- cleanup
 SELECT test_destroy();
  test_destroy 
diff --git a/src/test/modules/test_tidstore/sql/test_tidstore.sql b/src/test/modules/test_tidstore/sql/test_tidstore.sql
index cc1207821c..7317f52058 100644
--- a/src/test/modules/test_tidstore/sql/test_tidstore.sql
+++ b/src/test/modules/test_tidstore/sql/test_tidstore.sql
@@ -43,14 +43,17 @@ VALUES (0), (1), (:maxblkno - 1), (:maxblkno / 2), (:maxblkno)
 offsets (off) AS (
 VALUES (1), (2), (:maxoffset / 2), (:maxoffset - 1), (:maxoffset)
 )
-SELECT test_set_block_offsets(blk, array_agg(offsets.off)::int2[])
+SELECT do_set_block_offsets(blk, array_agg(offsets.off)::int2[])
   FROM blocks, offsets
   GROUP BY blk;
 
 -- Lookup test and dump (sorted) tids.
 SELECT lookup_test();
+
+-- Check TIDs we've added to the store.
+SELECT check_set_block_offsets();
+
 SELECT test_is_full();
-SELECT test_dump_tids();
 
 -- cleanup
 SELECT test_destroy();
diff --git a/src/test/modules/test_tidstore/test_tidstore--1.0.sql b/src/test/modules/test_tidstore/test_tidstore--1.0.sql
index 305459334d..c9f8dfd6f9 100644
--- a/src/test/modules/test_tidstore/test_tidstore--1.0.sql
+++ b/src/test/modules/test_tidstore/test_tidstore--1.0.sql
@@ -8,15 +8,14 @@ shared bool)
 RETURNS void STRICT PARALLEL UNSAFE
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
-CREATE FUNCTION test_set_block_offsets(
+CREATE FUNCTION do_set_block_offsets(
 blkno bigint,
 offsets int2[])
 RETURNS bigint STRICT PARALLEL UNSAFE
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
-CREATE FUNCTION test_dump_tids(
-t_ctid OUT tid)
-RETURNS SETOF tid STRICT PARALLEL UNSAFE
+CREATE FUNCTION check_set_block_offsets()
+RETURNS void STRICT PARALLEL UNSAFE
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
 CREATE FUNCTION test_lookup_tids(
diff --git a/src/test/modules/test_tidstore/test_tidstore.c b/src/test/modules/test_tidstore/test_tidstore.c
index 428d6a3fcf..f5b65eace0 100644
--- a/src/test/modules/test_tidstore/test_tidstore.c
+++ b/src/test/modules/test_tidstore/test_tidstore.c
@@ -24,17 +24,54 @@
 PG_MODULE_MAGIC;
 
 PG_FUNCTION_INFO_V1(test_create);
-PG_FUNCTION_INFO_V1(test_set_block_offsets);
-PG_FUNCTION_INFO_V1(test_dump_tids);
+PG_FUNCTION_INFO_V1(do_set_block_offsets);
+PG_FUNCTION_INFO_V1(check_set_block_offsets);
 PG_FUNCTION_INFO_V1(test_lookup_tids);
 PG_FUNCTION_INFO_V1(test_is_full);
 PG_FUNCTION_INFO_V1(test_destroy);
 
 static TidStore *tidstore = NULL;
 static dsa_area *dsa = NULL;
-static int64 num_tids = 0;
 static size_t max_bytes = (2 * 1024 * 1024L);	/* 2MB */
 
+/* array for verification of some tests */
+typedef struct ItemArray
+{
+	ItemPointerData	*tids;
+	int		max_tids;
+	int		num_tids;
+} ItemArray;
+
+static ItemArray items;
+
+/* comparator routine for ItemPointer */
+static int
+itemptr_cmp(const void *left, const void *right)
+{
+	BlockNumber lblk,
+		rblk;
+	OffsetNumber loff,
+		roff;
+
+	lblk = ItemPointerGetBlockNumber((ItemPointer) left);
+	rblk = ItemPointerGetBlockNumber((ItemPointer) right);
+
+	if (lblk < rblk)
+		return -1;
+	if (lblk > rblk)
+		return 1;
+
+	loff = ItemPointerGetOffsetNumber((ItemPointer) left);
+	roff = ItemPointerGetOffsetNumber((ItemPointer) right);
+
+	if (loff < roff)
+		return -1;
+	if (loff > roff)
+		return 1;
+
+	return 0;
+}
+
 /*
  * Create a TidStore. If shared is false, the tidstore is created
  * on TopMemoryContext, otherwise on DSA. Although the tidstore
@@ -72,7 +109,9 @@ test_create(PG_FUNCTION_ARGS)
 	else
 		tidstore = TidStoreCreate(max_bytes, NULL);
 
-	num_tids = 0;
+	items.num_tids = 0;
+	items.max_tids = max_bytes / sizeof(ItemPointerData);
+	items.tids =  (ItemPointerData *) palloc0(max_bytes);
 
 	MemoryContextSwitchTo(old_ctx);
 
@@ -95,7 +134,7 @@ sanity_check_array(ArrayType *ta)
 
 /* Set the given block and offsets pairs */
 Datum
-test_set_block_offsets(PG_FUNCTION_ARGS)
+do_set_block_offsets(PG_FUNCTION_ARGS)
 {
 	BlockNumber blkno = PG_GETARG_INT64(0);
 	ArrayType  *ta = PG_GETARG_ARRAYTYPE_P_COPY(1);
@@ -107,39 +146,58 @@ test_set_block_offsets(PG_FUNCTION_ARGS)
 	noffs = ArrayGetNItems(ARR_NDIM(ta), ARR_DIMS(ta));
 	offs = ((OffsetNumber *) ARR_DATA_PTR(ta));
 
-	/* Set TIDs */
+	/* Set TIDs in the store */
 	TidStoreLockExclusive(tidstore);
 	TidStoreSetBlockOffsets(tidstore, blkno, offs, noffs);
 	TidStoreUnlock(tidstore);
 
+	/* Set TIDs in verification array */
+	for (int i = 0; i < noffs; i++)
+	{
+		ItemPointer tid;
+		int idx = items.num_tids + i;
+
+		/* Enlarge the TID array if necessary */
+		if (idx >= items.max_tids)
+		{
+			items.max_tids *= 2;
+			items.tids = repalloc(items.tids, sizeof(ItemPointerData) * items.max_tids);
+		}
+
+		tid = &(items.tids[idx]);
+
+		ItemPointerSetBlockNumber(tid, blkno);
+		ItemPointerSetOffsetNumber(tid, offs[i]);
+	}
+
 	/* Update statistics */
-	num_tids += noffs;
+	items.num_tids += noffs;
 
 	PG_RETURN_INT64(blkno);
 }
 
 /*
- * Dump and return TIDs in the tidstore. The output TIDs are ordered.
+ * Verify TIDs in store against the array.
  */
 Datum
-test_dump_tids(PG_FUNCTION_ARGS)
+check_set_block_offsets(PG_FUNCTION_ARGS)
 {
-	FuncCallContext *funcctx;
-	ItemPointerData *tids;
-
-	if (SRF_IS_FIRSTCALL())
-	{
-		MemoryContext oldcontext;
 		TidStoreIter *iter;
 		TidStoreIterResult *iter_result;
-		int64		ntids = 0;
+		ItemPointerData * tids;
+		int ntids = 0;
+
+		/* lookup each member in the verification array */
+		for (int i = 0; i < items.num_tids; i++)
+			if (!TidStoreIsMember(tidstore, &items.tids[i]))
+				elog(ERROR, "missing TID with block %u, offset %u",
+					ItemPointerGetBlockNumber(&items.tids[i]),
+					ItemPointerGetOffsetNumber(&items.tids[i]));
 
-		funcctx = SRF_FIRSTCALL_INIT();
-		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
 		tids = (ItemPointerData *)
-			palloc0(sizeof(ItemPointerData) * num_tids);
+			palloc0(sizeof(ItemPointerData) * items.num_tids);
 
-		/* Collect TIDs stored in the tidstore */
+		/* Collect TIDs stored in the tidstore, in order */
 		TidStoreLockShare(tidstore);
 		iter = TidStoreBeginIterate(tidstore);
 		while ((iter_result = TidStoreIterateNext(iter)) != NULL)
@@ -148,33 +206,21 @@ test_dump_tids(PG_FUNCTION_ARGS)
 				ItemPointerSet(&(tids[ntids++]), iter_result->blkno,
 							   iter_result->offsets[i]);
 		}
+		TidStoreEndIterate(iter);
 		TidStoreUnlock(tidstore);
 
-		Assert(ntids == num_tids);
-
-		funcctx->user_fctx = tids;
-		funcctx->max_calls = num_tids;
-
-		MemoryContextSwitchTo(oldcontext);
-	}
-
-	funcctx = SRF_PERCALL_SETUP();
-	tids = (ItemPointerData *) funcctx->user_fctx;
+		if (ntids != items.num_tids)
+			elog(ERROR, "should have %d TIDs, have %d", items.num_tids, ntids);
 
-	if (funcctx->call_cntr < funcctx->max_calls)
-	{
-		int			idx;
-
-		/*
-		 * Note that since funcctx->call_cntr is incremented in
-		 * SRF_RETURN_NEXT before return, we need to remember the current
-		 * counter to access the tid array.
-		 */
-		idx = funcctx->call_cntr;
-		SRF_RETURN_NEXT(funcctx, PointerGetDatum(&(tids[idx])));
-	}
+		/* Sort verification array and compare each member to what we dumped from the TID store. */
+		qsort(items.tids, items.num_tids, sizeof(ItemPointerData), itemptr_cmp);
+		for (int i = 0; i < items.num_tids; i++)
+		{
+			if (itemptr_cmp((const void *) &items.tids[i], (const void *) &tids[i]) != 0)
+				elog(ERROR, "Dumped TID array doesn't match verification array");
+		}
 
-	SRF_RETURN_DONE(funcctx);
+	PG_RETURN_VOID();
 }
 
 /*
@@ -236,7 +282,8 @@ test_destroy(PG_FUNCTION_ARGS)
 {
 	TidStoreDestroy(tidstore);
 	tidstore = NULL;
-	num_tids = 0;
+	items.num_tids = 0;
+	pfree(items.tids);
 
 	if (dsa)
 		dsa_detach(dsa);
-- 
2.44.0

From 0179fd17ec1ac414510e01b4afd5488eb2c6faa2 Mon Sep 17 00:00:00 2001
From: John Naylor <john.naylor@postgresql.org>
Date: Tue, 5 Mar 2024 15:05:47 +0700
Subject: [PATCH v71 1/6] Add TIDStore, to store sets of TIDs (ItemPointerData)
 efficiently.

TIDStore is a data structure designed to efficiently store large sets
of TIDs. For TID storage, it employs a radix tree, where the key is
the BlockNumber, and the value is a bitmap representing offset
numbers. The TIDStore can be created on a DSA area and used by
multiple backend processes simultaneously.

There are potential future users such as tidbitmap.c, though it's very
likely the interface will need to evolve as we come to understand the
needs of different kinds of users. For example, we can support
updating the offset bitmap of existing values.

Currently, the TIDStore is not used for anything yet, aside from the
test code. But an upcoming patch will use it.

This includes a unit test module, in src/test/modules/test_tidstore.

Co-authored-by: John Naylor
Discussion: https://postgr.es/m/CAD21AoAfOZvmfR0j8VmZorZjL7RhTiQdVttNuC4W-Shdc2a-AA%40mail.gmail.com
---
 doc/src/sgml/monitoring.sgml                  |   4 +
 src/backend/access/common/Makefile            |   1 +
 src/backend/access/common/meson.build         |   1 +
 src/backend/access/common/tidstore.c          | 461 ++++++++++++++++++
 src/backend/storage/lmgr/lwlock.c             |   1 +
 src/include/access/tidstore.h                 |  48 ++
 src/include/storage/lwlock.h                  |   1 +
 src/test/modules/Makefile                     |   1 +
 src/test/modules/meson.build                  |   1 +
 src/test/modules/test_tidstore/Makefile       |  23 +
 .../test_tidstore/expected/test_tidstore.out  | 128 +++++
 src/test/modules/test_tidstore/meson.build    |  33 ++
 .../test_tidstore/sql/test_tidstore.sql       |  58 +++
 .../test_tidstore/test_tidstore--1.0.sql      |  35 ++
 .../modules/test_tidstore/test_tidstore.c     | 245 ++++++++++
 .../test_tidstore/test_tidstore.control       |   4 +
 src/tools/pgindent/typedefs.list              |   4 +
 17 files changed, 1049 insertions(+)
 create mode 100644 src/backend/access/common/tidstore.c
 create mode 100644 src/include/access/tidstore.h
 create mode 100644 src/test/modules/test_tidstore/Makefile
 create mode 100644 src/test/modules/test_tidstore/expected/test_tidstore.out
 create mode 100644 src/test/modules/test_tidstore/meson.build
 create mode 100644 src/test/modules/test_tidstore/sql/test_tidstore.sql
 create mode 100644 src/test/modules/test_tidstore/test_tidstore--1.0.sql
 create mode 100644 src/test/modules/test_tidstore/test_tidstore.c
 create mode 100644 src/test/modules/test_tidstore/test_tidstore.control

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 8aca08140e..c8d76906aa 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1099,6 +1099,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
        See <xref linkend="wait-event-lwlock-table"/>.
       </entry>
      </row>
+     <row>
+      <entry><literal>SharedTidStore</literal></entry>
+      <entry>Waiting to access a shared TID store.</entry>
+     </row>
      <row>
       <entry><literal>Timeout</literal></entry>
       <entry>The server process is waiting for a timeout
diff --git a/src/backend/access/common/Makefile b/src/backend/access/common/Makefile
index b9aff0ccfd..67b8cc6108 100644
--- a/src/backend/access/common/Makefile
+++ b/src/backend/access/common/Makefile
@@ -27,6 +27,7 @@ OBJS = \
 	syncscan.o \
 	toast_compression.o \
 	toast_internals.o \
+	tidstore.o \
 	tupconvert.o \
 	tupdesc.o
 
diff --git a/src/backend/access/common/meson.build b/src/backend/access/common/meson.build
index 725041a4ce..a02397855e 100644
--- a/src/backend/access/common/meson.build
+++ b/src/backend/access/common/meson.build
@@ -15,6 +15,7 @@ backend_sources += files(
   'syncscan.c',
   'toast_compression.c',
   'toast_internals.c',
+  'tidstore.c',
   'tupconvert.c',
   'tupdesc.c',
 )
diff --git a/src/backend/access/common/tidstore.c b/src/backend/access/common/tidstore.c
new file mode 100644
index 0000000000..b725b62d4c
--- /dev/null
+++ b/src/backend/access/common/tidstore.c
@@ -0,0 +1,461 @@
+/*-------------------------------------------------------------------------
+ *
+ * tidstore.c
+ *		TID (ItemPointerData) storage implementation.
+ *
+ * TidStore is a in-memory data structure to store TIDs (ItemPointerData).
+ * Internally it uses a radix tree as the storage for TIDs. The key is the
+ * BlockNumber and the value is a bitmap of offsets, BlocktableEntry.
+ *
+ * TidStore can be shared among parallel worker processes by passing DSA area
+ * to TidStoreCreate(). Other backends can attach to the shared TidStore by
+ * TidStoreAttach().
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/common/tidstore.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/tidstore.h"
+#include "miscadmin.h"
+#include "nodes/bitmapset.h"
+#include "port/pg_bitutils.h"
+#include "storage/lwlock.h"
+#include "utils/dsa.h"
+#include "utils/memutils.h"
+
+
+#define WORDNUM(x)	((x) / BITS_PER_BITMAPWORD)
+#define BITNUM(x)	((x) % BITS_PER_BITMAPWORD)
+
+/* number of active words for a page: */
+#define WORDS_PER_PAGE(n) ((n) / BITS_PER_BITMAPWORD + 1)
+
+/*
+ * This is named similarly to PagetableEntry in tidbitmap.c
+ * because the two have a similar function.
+ */
+typedef struct BlocktableEntry
+{
+	uint16		nwords;
+	bitmapword	words[FLEXIBLE_ARRAY_MEMBER];
+} BlocktableEntry;
+#define MaxBlocktableEntrySize \
+	offsetof(BlocktableEntry, words) + \
+		(sizeof(bitmapword) * WORDS_PER_PAGE(MaxOffsetNumber))
+
+#define RT_PREFIX local_rt
+#define RT_SCOPE static
+#define RT_DECLARE
+#define RT_DEFINE
+#define RT_VALUE_TYPE BlocktableEntry
+#define RT_VARLEN_VALUE_SIZE(page) \
+	(offsetof(BlocktableEntry, words) + \
+	sizeof(bitmapword) * (page)->nwords)
+#include "lib/radixtree.h"
+
+#define RT_PREFIX shared_rt
+#define RT_SHMEM
+#define RT_SCOPE static
+#define RT_DECLARE
+#define RT_DEFINE
+#define RT_VALUE_TYPE BlocktableEntry
+#define RT_VARLEN_VALUE_SIZE(page) \
+	(offsetof(BlocktableEntry, words) + \
+	sizeof(bitmapword) * (page)->nwords)
+#include "lib/radixtree.h"
+
+/* Per-backend state for a TidStore */
+struct TidStore
+{
+	/* MemoryContext where the TidStore is allocated */
+	MemoryContext context;
+
+	/* MemoryContext where the radix tree uses */
+	MemoryContext rt_context;
+
+	/* Storage for TIDs. Use either one depending on TidStoreIsShared() */
+	union
+	{
+		local_rt_radix_tree *local;
+		shared_rt_radix_tree *shared;
+	}			tree;
+
+	/* DSA area for TidStore if using shared memory */
+	dsa_area   *area;
+};
+#define TidStoreIsShared(ts) ((ts)->area != NULL)
+
+/* Iterator for TidStore */
+struct TidStoreIter
+{
+	TidStore   *ts;
+
+	/* iterator of radix tree. Use either one depending on TidStoreIsShared() */
+	union
+	{
+		shared_rt_iter *shared;
+		local_rt_iter *local;
+	}			tree_iter;
+
+	/* output for the caller */
+	TidStoreIterResult output;
+};
+
+static void tidstore_iter_extract_tids(TidStoreIter *iter, uint64 key,
+									   BlocktableEntry *page);
+
+/*
+ * Create a TidStore. The TidStore will live in the memory context that is
+ * CurrentMemoryContext at the time of this call. The TID storage, backed
+ * by a radix tree, will live in its child memory context, rt_context. The
+ * TidStore will be limited to (approximately) max_bytes total memory
+ * consumption. If the 'area' is non-NULL, the radix tree is created in the
+ * DSA area.
+ *
+ * The returned object is allocated in backend-local memory.
+ */
+TidStore *
+TidStoreCreate(size_t max_bytes, dsa_area *area)
+{
+	TidStore   *ts;
+	size_t		initBlockSize = ALLOCSET_DEFAULT_INITSIZE;
+	size_t		minContextSize = ALLOCSET_DEFAULT_MINSIZE;
+	size_t		maxBlockSize = ALLOCSET_DEFAULT_MAXSIZE;
+
+	ts = palloc0(sizeof(TidStore));
+	ts->context = CurrentMemoryContext;
+
+	/* choose the maxBlockSize to be no larger than 1/16 of max_bytes */
+	while (16 * maxBlockSize > max_bytes * 1024L)
+		maxBlockSize >>= 1;
+
+	if (maxBlockSize < ALLOCSET_DEFAULT_INITSIZE)
+		maxBlockSize = ALLOCSET_DEFAULT_INITSIZE;
+
+	/* Create a memory context for the TID storage */
+	ts->rt_context = AllocSetContextCreate(CurrentMemoryContext,
+										   "TID storage",
+										   minContextSize,
+										   initBlockSize,
+										   maxBlockSize);
+
+	if (area != NULL)
+	{
+		ts->tree.shared = shared_rt_create(ts->rt_context, area,
+										   LWTRANCHE_SHARED_TIDSTORE);
+		ts->area = area;
+	}
+	else
+		ts->tree.local = local_rt_create(ts->rt_context);
+
+	return ts;
+}
+
+/*
+ * Attach to the shared TidStore using a handle. The returned object is
+ * allocated in backend-local memory using the CurrentMemoryContext.
+ */
+TidStore *
+TidStoreAttach(dsa_area *area, dsa_pointer handle)
+{
+	TidStore   *ts;
+
+	Assert(area != NULL);
+	Assert(DsaPointerIsValid(handle));
+
+	/* create per-backend state */
+	ts = palloc0(sizeof(TidStore));
+
+	/* Find the shared the shared radix tree */
+	ts->tree.shared = shared_rt_attach(area, handle);
+	ts->area = area;
+
+	return ts;
+}
+
+/*
+ * Detach from a TidStore. This detaches from radix tree and frees the
+ * backend-local resources. The radix tree will continue to exist until
+ * it is either explicitly destroyed, or the area that backs it is returned
+ * to the operating system.
+ */
+void
+TidStoreDetach(TidStore *ts)
+{
+	Assert(TidStoreIsShared(ts));
+
+	shared_rt_detach(ts->tree.shared);
+	pfree(ts);
+}
+
+/*
+ * Lock support functions.
+ *
+ * We can use the radix tree's lock for shared TidStore as the data we
+ * need to protect is only the shared radix tree.
+ */
+void
+TidStoreLockExclusive(TidStore *ts)
+{
+	if (TidStoreIsShared(ts))
+		shared_rt_lock_exclusive(ts->tree.shared);
+}
+
+void
+TidStoreLockShare(TidStore *ts)
+{
+	if (TidStoreIsShared(ts))
+		shared_rt_lock_share(ts->tree.shared);
+}
+
+void
+TidStoreUnlock(TidStore *ts)
+{
+	if (TidStoreIsShared(ts))
+		shared_rt_unlock(ts->tree.shared);
+}
+
+/*
+ * Destroy a TidStore, returning all memory.
+ *
+ * Note that the caller must be certain that no other backend will attempt to
+ * access the TidStore before calling this function. Other backend must
+ * explicitly call TidStoreDetach() to free up backend-local memory associated
+ * with the TidStore. The backend that calls TidStoreDestroy() must not call
+ * TidStoreDetach().
+ */
+void
+TidStoreDestroy(TidStore *ts)
+{
+	/* Destroy underlying radix tree */
+	if (TidStoreIsShared(ts))
+		shared_rt_free(ts->tree.shared);
+	else
+		local_rt_free(ts->tree.local);
+
+	MemoryContextDelete(ts->rt_context);
+
+	pfree(ts);
+}
+
+/*
+ * Set the given TIDs on the blkno to TidStore.
+ *
+ * NB: the offset numbers in offsets must be sorted in ascending order.
+ */
+void
+TidStoreSetBlockOffsets(TidStore *ts, BlockNumber blkno, OffsetNumber *offsets,
+						int num_offsets)
+{
+	char		data[MaxBlocktableEntrySize];
+	BlocktableEntry *page = (BlocktableEntry *) data;
+	bitmapword	word;
+	int			wordnum;
+	int			next_word_threshold;
+	int			idx = 0;
+	bool		found PG_USED_FOR_ASSERTS_ONLY;
+
+	Assert(num_offsets > 0);
+
+	for (wordnum = 0, next_word_threshold = BITS_PER_BITMAPWORD;
+		 wordnum <= WORDNUM(offsets[num_offsets - 1]);
+		 wordnum++, next_word_threshold += BITS_PER_BITMAPWORD)
+	{
+		word = 0;
+
+		while (idx < num_offsets)
+		{
+			OffsetNumber off = offsets[idx];
+
+			/* safety check to ensure we don't overrun bit array bounds */
+			if (!OffsetNumberIsValid(off))
+				elog(ERROR, "tuple offset out of range: %u", off);
+
+			if (off >= next_word_threshold)
+				break;
+
+			word |= ((bitmapword) 1 << BITNUM(off));
+			idx++;
+		}
+
+		/* write out offset bitmap for this wordnum */
+		page->words[wordnum] = word;
+	}
+
+	page->nwords = wordnum;
+	Assert(page->nwords == WORDS_PER_PAGE(offsets[num_offsets - 1]));
+
+	if (TidStoreIsShared(ts))
+		found = shared_rt_set(ts->tree.shared, blkno, page);
+	else
+		found = local_rt_set(ts->tree.local, blkno, page);
+
+	Assert(!found);
+}
+
+/* Return true if the given TID is present in the TidStore */
+bool
+TidStoreIsMember(TidStore *ts, ItemPointer tid)
+{
+	int			wordnum;
+	int			bitnum;
+	BlocktableEntry *page;
+	BlockNumber blk = ItemPointerGetBlockNumber(tid);
+	OffsetNumber off = ItemPointerGetOffsetNumber(tid);
+	bool		ret;
+
+	if (TidStoreIsShared(ts))
+		page = shared_rt_find(ts->tree.shared, blk);
+	else
+		page = local_rt_find(ts->tree.local, blk);
+
+	/* no entry for the blk */
+	if (page == NULL)
+		return false;
+
+	wordnum = WORDNUM(off);
+	bitnum = BITNUM(off);
+
+	/* no bitmap for the off */
+	if (wordnum >= page->nwords)
+		return false;
+
+	ret = (page->words[wordnum] & ((bitmapword) 1 << bitnum)) != 0;
+
+	return ret;
+}
+
+/*
+ * Prepare to iterate through a TidStore. Since the radix tree is locked during
+ * the iteration, TidStoreEndIterate() needs to be called when finished.
+ *
+ * The TidStoreIter struct is created in the caller's memory context.
+ *
+ * Concurrent updates during the iteration will be blocked when inserting a
+ * key-value to the radix tree.
+ */
+TidStoreIter *
+TidStoreBeginIterate(TidStore *ts)
+{
+	TidStoreIter *iter;
+
+	iter = palloc0(sizeof(TidStoreIter));
+	iter->ts = ts;
+
+	/*
+	 * We start with an array large enough to contain at least the offsets
+	 * from one completely full bitmap element.
+	 */
+	iter->output.max_offset = 2 * BITS_PER_BITMAPWORD;
+	iter->output.offsets = palloc(sizeof(OffsetNumber) * iter->output.max_offset);
+
+	if (TidStoreIsShared(ts))
+		iter->tree_iter.shared = shared_rt_begin_iterate(ts->tree.shared);
+	else
+		iter->tree_iter.local = local_rt_begin_iterate(ts->tree.local);
+
+	return iter;
+}
+
+
+/*
+ * Scan the TidStore and return a pointer to TidStoreIterResult that has TIDs
+ * in one block. We return the block numbers in ascending order and the offset
+ * numbers in each result is also sorted in ascending order.
+ */
+TidStoreIterResult *
+TidStoreIterateNext(TidStoreIter *iter)
+{
+	uint64		key;
+	BlocktableEntry *page;
+	TidStoreIterResult *result = &(iter->output);
+
+	if (TidStoreIsShared(iter->ts))
+		page = shared_rt_iterate_next(iter->tree_iter.shared, &key);
+	else
+		page = local_rt_iterate_next(iter->tree_iter.local, &key);
+
+	if (page == NULL)
+		return NULL;
+
+	/* Collect TIDs extracted from the key-value pair */
+	tidstore_iter_extract_tids(iter, key, page);
+
+	return result;
+}
+
+/*
+ * Finish an iteration over TidStore. This needs to be called after finishing
+ * or when existing an iteration.
+ */
+void
+TidStoreEndIterate(TidStoreIter *iter)
+{
+	if (TidStoreIsShared(iter->ts))
+		shared_rt_end_iterate(iter->tree_iter.shared);
+	else
+		local_rt_end_iterate(iter->tree_iter.local);
+
+	pfree(iter->output.offsets);
+	pfree(iter);
+}
+
+/* Return the memory usage of TidStore */
+size_t
+TidStoreMemoryUsage(TidStore *ts)
+{
+	if (TidStoreIsShared(ts))
+		return shared_rt_memory_usage(ts->tree.shared);
+	else
+		return local_rt_memory_usage(ts->tree.local);
+}
+
+dsa_pointer
+TidStoreGetHandle(TidStore *ts)
+{
+	Assert(TidStoreIsShared(ts));
+
+	return (dsa_pointer) shared_rt_get_handle(ts->tree.shared);
+}
+
+/* Extract TIDs from the given key-value pair */
+static void
+tidstore_iter_extract_tids(TidStoreIter *iter, uint64 key, BlocktableEntry *page)
+{
+	TidStoreIterResult *result = (&iter->output);
+	int			wordnum;
+
+	result->num_offsets = 0;
+	result->blkno = (BlockNumber) key;
+
+	for (wordnum = 0; wordnum < page->nwords; wordnum++)
+	{
+		bitmapword	w = page->words[wordnum];
+
+		/* Make sure there is enough space to add offsets */
+		if ((result->num_offsets + BITS_PER_BITMAPWORD) > result->max_offset)
+		{
+			result->max_offset *= 2;
+			result->offsets = repalloc(result->offsets,
+									   sizeof(OffsetNumber) * result->max_offset);
+		}
+
+		while (w != 0)
+		{
+			/* get pos of rightmost bit */
+			int			bitnum = bmw_rightmost_one_pos(w);
+			int			off = wordnum * BITS_PER_BITMAPWORD + bitnum;
+
+			result->offsets[result->num_offsets++] = off;
+
+			/* unset the rightmost bit */
+			w &= w - 1;
+		}
+	}
+}
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 30f3a09a4c..e9dd5e6f99 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -151,6 +151,7 @@ static const char *const BuiltinTrancheNames[] = {
 	[LWTRANCHE_PER_SESSION_RECORD_TYPMOD] = "PerSessionRecordTypmod",
 	[LWTRANCHE_SHARED_TUPLESTORE] = "SharedTupleStore",
 	[LWTRANCHE_SHARED_TIDBITMAP] = "SharedTidBitmap",
+	[LWTRANCHE_SHARED_TIDSTORE] = "SharedTidStore",
 	[LWTRANCHE_PARALLEL_APPEND] = "ParallelAppend",
 	[LWTRANCHE_PER_XACT_PREDICATE_LIST] = "PerXactPredicateList",
 	[LWTRANCHE_PGSTATS_DSA] = "PgStatsDSA",
diff --git a/src/include/access/tidstore.h b/src/include/access/tidstore.h
new file mode 100644
index 0000000000..b3c331ea1d
--- /dev/null
+++ b/src/include/access/tidstore.h
@@ -0,0 +1,48 @@
+/*-------------------------------------------------------------------------
+ *
+ * tidstore.h
+ *	  Tid storage.
+ *
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/tidstore.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef TIDSTORE_H
+#define TIDSTORE_H
+
+#include "storage/itemptr.h"
+#include "utils/dsa.h"
+
+typedef struct TidStore TidStore;
+typedef struct TidStoreIter TidStoreIter;
+
+/* Result struct for TidStoreIterateNext */
+typedef struct TidStoreIterResult
+{
+	BlockNumber blkno;
+	int			max_offset;
+	int			num_offsets;
+	OffsetNumber *offsets;
+} TidStoreIterResult;
+
+extern TidStore *TidStoreCreate(size_t max_bytes, dsa_area *dsa);
+extern TidStore *TidStoreAttach(dsa_area *dsa, dsa_pointer rt_dp);
+extern void TidStoreDetach(TidStore *ts);
+extern void TidStoreLockExclusive(TidStore *ts);
+extern void TidStoreLockShare(TidStore *ts);
+extern void TidStoreUnlock(TidStore *ts);
+extern void TidStoreDestroy(TidStore *ts);
+extern void TidStoreSetBlockOffsets(TidStore *ts, BlockNumber blkno, OffsetNumber *offsets,
+									int num_offsets);
+extern bool TidStoreIsMember(TidStore *ts, ItemPointer tid);
+extern TidStoreIter *TidStoreBeginIterate(TidStore *ts);
+extern TidStoreIterResult *TidStoreIterateNext(TidStoreIter *iter);
+extern void TidStoreEndIterate(TidStoreIter *iter);
+extern size_t TidStoreMemoryUsage(TidStore *ts);
+extern dsa_pointer TidStoreGetHandle(TidStore *ts);
+
+#endif							/* TIDSTORE_H */
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 10bea8c595..152e3b047e 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -200,6 +200,7 @@ typedef enum BuiltinTrancheIds
 	LWTRANCHE_PER_SESSION_RECORD_TYPMOD,
 	LWTRANCHE_SHARED_TUPLESTORE,
 	LWTRANCHE_SHARED_TIDBITMAP,
+	LWTRANCHE_SHARED_TIDSTORE,
 	LWTRANCHE_PARALLEL_APPEND,
 	LWTRANCHE_PER_XACT_PREDICATE_LIST,
 	LWTRANCHE_PGSTATS_DSA,
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 875a76d6f1..1cbd532156 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -35,6 +35,7 @@ SUBDIRS = \
 		  test_rls_hooks \
 		  test_shm_mq \
 		  test_slru \
+		  test_tidstore \
 		  unsafe_tests \
 		  worker_spi \
 		  xid_wraparound
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index f1d18a1b29..7c11fb97f2 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -34,6 +34,7 @@ subdir('test_resowner')
 subdir('test_rls_hooks')
 subdir('test_shm_mq')
 subdir('test_slru')
+subdir('test_tidstore')
 subdir('unsafe_tests')
 subdir('worker_spi')
 subdir('xid_wraparound')
diff --git a/src/test/modules/test_tidstore/Makefile b/src/test/modules/test_tidstore/Makefile
new file mode 100644
index 0000000000..dab107d70c
--- /dev/null
+++ b/src/test/modules/test_tidstore/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/test_tidstore/Makefile
+
+MODULE_big = test_tidstore
+OBJS = \
+	$(WIN32RES) \
+	test_tidstore.o
+PGFILEDESC = "test_tidstore - test code for src/backend/access/common/tidstore.c"
+
+EXTENSION = test_tidstore
+DATA = test_tidstore--1.0.sql
+
+REGRESS = test_tidstore
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_tidstore
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_tidstore/expected/test_tidstore.out b/src/test/modules/test_tidstore/expected/test_tidstore.out
new file mode 100644
index 0000000000..80698924a1
--- /dev/null
+++ b/src/test/modules/test_tidstore/expected/test_tidstore.out
@@ -0,0 +1,128 @@
+CREATE EXTENSION test_tidstore;
+-- Constant values used in the tests.
+\set maxblkno 4294967295
+-- The maximum number of heap tuples (MaxHeapTuplesPerPage) in 8kB block is 291.
+-- We use a higher number to test tidstore.
+\set maxoffset 512
+-- Support functions.
+CREATE FUNCTION make_tid(a bigint, b int2) RETURNS tid
+BEGIN ATOMIC
+RETURN ('(' || a || ', ' || b || ')')::tid;
+END;
+-- Lookup test function. Search 1 to (:maxoffset + 5) offset numbers in
+-- 4 blocks, and return TIDS if found in the tidstore.
+CREATE FUNCTION lookup_test() RETURNS SETOF tid
+BEGIN ATOMIC;
+WITH blocks (blk) AS (
+VALUES (0), (2), (:maxblkno - 2), (:maxblkno)
+)
+SELECT t_ctid
+  FROM
+    (SELECT array_agg(make_tid(blk, off::int2)) AS tids
+      FROM blocks, generate_series(1, :maxoffset + 5) off) AS foo,
+    LATERAL test_lookup_tids(foo.tids)
+  WHERE found ORDER BY t_ctid;
+END;
+-- Test a local tdistore. A shared tidstore is created by passing true.
+SELECT test_create(false);
+ test_create 
+-------------
+ 
+(1 row)
+
+-- Test on empty tidstore.
+SELECT *
+    FROM test_lookup_tids(ARRAY[make_tid(0, 1::int2),
+        make_tid(:maxblkno, :maxoffset::int2)]::tid[]);
+      t_ctid      | found 
+------------------+-------
+ (0,1)            | f
+ (4294967295,512) | f
+(2 rows)
+
+SELECT test_is_full();
+ test_is_full 
+--------------
+ f
+(1 row)
+
+-- Add tids in out of order.
+WITH blocks (blk) AS(
+VALUES (0), (1), (:maxblkno - 1), (:maxblkno / 2), (:maxblkno)
+),
+offsets (off) AS (
+VALUES (1), (2), (:maxoffset / 2), (:maxoffset - 1), (:maxoffset)
+)
+SELECT test_set_block_offsets(blk, array_agg(offsets.off)::int2[])
+  FROM blocks, offsets
+  GROUP BY blk;
+ test_set_block_offsets 
+------------------------
+             2147483647
+                      0
+             4294967294
+                      1
+             4294967295
+(5 rows)
+
+-- Lookup test and dump (sorted) tids.
+SELECT lookup_test();
+   lookup_test    
+------------------
+ (0,1)
+ (0,2)
+ (0,256)
+ (0,511)
+ (0,512)
+ (4294967295,1)
+ (4294967295,2)
+ (4294967295,256)
+ (4294967295,511)
+ (4294967295,512)
+(10 rows)
+
+SELECT test_is_full();
+ test_is_full 
+--------------
+ f
+(1 row)
+
+SELECT test_dump_tids();
+  test_dump_tids  
+------------------
+ (0,1)
+ (0,2)
+ (0,256)
+ (0,511)
+ (0,512)
+ (1,1)
+ (1,2)
+ (1,256)
+ (1,511)
+ (1,512)
+ (2147483647,1)
+ (2147483647,2)
+ (2147483647,256)
+ (2147483647,511)
+ (2147483647,512)
+ (4294967294,1)
+ (4294967294,2)
+ (4294967294,256)
+ (4294967294,511)
+ (4294967294,512)
+ (4294967295,1)
+ (4294967295,2)
+ (4294967295,256)
+ (4294967295,511)
+ (4294967295,512)
+(25 rows)
+
+-- cleanup
+SELECT test_destroy();
+ test_destroy 
+--------------
+ 
+(1 row)
+
+DROP FUNCTION lookup_test();
+DROP FUNCTION make_tid(a bigint, b int2);
diff --git a/src/test/modules/test_tidstore/meson.build b/src/test/modules/test_tidstore/meson.build
new file mode 100644
index 0000000000..0ed3ea2ef3
--- /dev/null
+++ b/src/test/modules/test_tidstore/meson.build
@@ -0,0 +1,33 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+test_tidstore_sources = files(
+  'test_tidstore.c',
+)
+
+if host_system == 'windows'
+  test_tidstore_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'test_tidstore',
+    '--FILEDESC', 'test_tidstore - test code for src/backend/access/common/tidstore.c',])
+endif
+
+test_tidstore = shared_module('test_tidstore',
+  test_tidstore_sources,
+  kwargs: pg_test_mod_args,
+)
+test_install_libs += test_tidstore
+
+test_install_data += files(
+  'test_tidstore.control',
+  'test_tidstore--1.0.sql',
+)
+
+tests += {
+  'name': 'test_tidstore',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'regress': {
+    'sql': [
+      'test_tidstore',
+    ],
+  },
+}
diff --git a/src/test/modules/test_tidstore/sql/test_tidstore.sql b/src/test/modules/test_tidstore/sql/test_tidstore.sql
new file mode 100644
index 0000000000..cc1207821c
--- /dev/null
+++ b/src/test/modules/test_tidstore/sql/test_tidstore.sql
@@ -0,0 +1,58 @@
+CREATE EXTENSION test_tidstore;
+
+-- Constant values used in the tests.
+\set maxblkno 4294967295
+-- The maximum number of heap tuples (MaxHeapTuplesPerPage) in 8kB block is 291.
+-- We use a higher number to test tidstore.
+\set maxoffset 512
+
+-- Support functions.
+CREATE FUNCTION make_tid(a bigint, b int2) RETURNS tid
+BEGIN ATOMIC
+RETURN ('(' || a || ', ' || b || ')')::tid;
+END;
+
+-- Lookup test function. Search 1 to (:maxoffset + 5) offset numbers in
+-- 4 blocks, and return TIDS if found in the tidstore.
+CREATE FUNCTION lookup_test() RETURNS SETOF tid
+BEGIN ATOMIC;
+WITH blocks (blk) AS (
+VALUES (0), (2), (:maxblkno - 2), (:maxblkno)
+)
+SELECT t_ctid
+  FROM
+    (SELECT array_agg(make_tid(blk, off::int2)) AS tids
+      FROM blocks, generate_series(1, :maxoffset + 5) off) AS foo,
+    LATERAL test_lookup_tids(foo.tids)
+  WHERE found ORDER BY t_ctid;
+END;
+
+-- Test a local tdistore. A shared tidstore is created by passing true.
+SELECT test_create(false);
+
+-- Test on empty tidstore.
+SELECT *
+    FROM test_lookup_tids(ARRAY[make_tid(0, 1::int2),
+        make_tid(:maxblkno, :maxoffset::int2)]::tid[]);
+SELECT test_is_full();
+
+-- Add tids in out of order.
+WITH blocks (blk) AS(
+VALUES (0), (1), (:maxblkno - 1), (:maxblkno / 2), (:maxblkno)
+),
+offsets (off) AS (
+VALUES (1), (2), (:maxoffset / 2), (:maxoffset - 1), (:maxoffset)
+)
+SELECT test_set_block_offsets(blk, array_agg(offsets.off)::int2[])
+  FROM blocks, offsets
+  GROUP BY blk;
+
+-- Lookup test and dump (sorted) tids.
+SELECT lookup_test();
+SELECT test_is_full();
+SELECT test_dump_tids();
+
+-- cleanup
+SELECT test_destroy();
+DROP FUNCTION lookup_test();
+DROP FUNCTION make_tid(a bigint, b int2);
diff --git a/src/test/modules/test_tidstore/test_tidstore--1.0.sql b/src/test/modules/test_tidstore/test_tidstore--1.0.sql
new file mode 100644
index 0000000000..305459334d
--- /dev/null
+++ b/src/test/modules/test_tidstore/test_tidstore--1.0.sql
@@ -0,0 +1,35 @@
+/* src/test/modules/test_tidstore/test_tidstore--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_tidstore" to load this file. \quit
+
+CREATE FUNCTION test_create(
+shared bool)
+RETURNS void STRICT PARALLEL UNSAFE
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION test_set_block_offsets(
+blkno bigint,
+offsets int2[])
+RETURNS bigint STRICT PARALLEL UNSAFE
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION test_dump_tids(
+t_ctid OUT tid)
+RETURNS SETOF tid STRICT PARALLEL UNSAFE
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION test_lookup_tids(
+t_ctids tid[],
+t_ctid OUT tid,
+found OUT bool)
+RETURNS SETOF record STRICT PARALLEL UNSAFE
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION test_is_full()
+RETURNS bool STRICT PARALLEL UNSAFE
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION test_destroy()
+RETURNS void STRICT PARALLEL UNSAFE
+AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_tidstore/test_tidstore.c b/src/test/modules/test_tidstore/test_tidstore.c
new file mode 100644
index 0000000000..428d6a3fcf
--- /dev/null
+++ b/src/test/modules/test_tidstore/test_tidstore.c
@@ -0,0 +1,245 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_tidstore.c
+ *		Test TidStore data structure.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/test/modules/test_tidstore/test_tidstore.c
+ *
+ * -------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/tidstore.h"
+#include "fmgr.h"
+#include "funcapi.h"
+#include "storage/block.h"
+#include "storage/itemptr.h"
+#include "storage/lwlock.h"
+#include "utils/array.h"
+#include "utils/memutils.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(test_create);
+PG_FUNCTION_INFO_V1(test_set_block_offsets);
+PG_FUNCTION_INFO_V1(test_dump_tids);
+PG_FUNCTION_INFO_V1(test_lookup_tids);
+PG_FUNCTION_INFO_V1(test_is_full);
+PG_FUNCTION_INFO_V1(test_destroy);
+
+static TidStore *tidstore = NULL;
+static dsa_area *dsa = NULL;
+static int64 num_tids = 0;
+static size_t max_bytes = (2 * 1024 * 1024L);	/* 2MB */
+
+/*
+ * Create a TidStore. If shared is false, the tidstore is created
+ * on TopMemoryContext, otherwise on DSA. Although the tidstore
+ * is created on DSA, only the same process can subsequently use
+ * the tidstore. The tidstore handle is not shared anywhere.
+*/
+Datum
+test_create(PG_FUNCTION_ARGS)
+{
+	bool		shared = PG_GETARG_BOOL(0);
+	MemoryContext old_ctx;
+
+	Assert(tidstore == NULL);
+	Assert(dsa == NULL);
+
+	old_ctx = MemoryContextSwitchTo(TopMemoryContext);
+
+	if (shared)
+	{
+		int			tranche_id;
+
+		tranche_id = LWLockNewTrancheId();
+		LWLockRegisterTranche(tranche_id, "test_tidstore");
+
+		dsa = dsa_create(tranche_id);
+
+		/*
+		 * Remain attached until end of backend or explicitly detached so that
+		 * the same process use the tidstore for subsequent tests.
+		 */
+		dsa_pin_mapping(dsa);
+
+		tidstore = TidStoreCreate(max_bytes, dsa);
+	}
+	else
+		tidstore = TidStoreCreate(max_bytes, NULL);
+
+	num_tids = 0;
+
+	MemoryContextSwitchTo(old_ctx);
+
+	PG_RETURN_VOID();
+}
+
+static void
+sanity_check_array(ArrayType *ta)
+{
+	if (ARR_HASNULL(ta) && array_contains_nulls(ta))
+		ereport(ERROR,
+				(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+				 errmsg("array must not contain nulls")));
+
+	if (ARR_NDIM(ta) > 1)
+		ereport(ERROR,
+				(errcode(ERRCODE_DATA_EXCEPTION),
+				 errmsg("argument must be empty or one-dimensional array")));
+}
+
+/* Set the given block and offsets pairs */
+Datum
+test_set_block_offsets(PG_FUNCTION_ARGS)
+{
+	BlockNumber blkno = PG_GETARG_INT64(0);
+	ArrayType  *ta = PG_GETARG_ARRAYTYPE_P_COPY(1);
+	OffsetNumber *offs;
+	int			noffs;
+
+	sanity_check_array(ta);
+
+	noffs = ArrayGetNItems(ARR_NDIM(ta), ARR_DIMS(ta));
+	offs = ((OffsetNumber *) ARR_DATA_PTR(ta));
+
+	/* Set TIDs */
+	TidStoreLockExclusive(tidstore);
+	TidStoreSetBlockOffsets(tidstore, blkno, offs, noffs);
+	TidStoreUnlock(tidstore);
+
+	/* Update statistics */
+	num_tids += noffs;
+
+	PG_RETURN_INT64(blkno);
+}
+
+/*
+ * Dump and return TIDs in the tidstore. The output TIDs are ordered.
+ */
+Datum
+test_dump_tids(PG_FUNCTION_ARGS)
+{
+	FuncCallContext *funcctx;
+	ItemPointerData *tids;
+
+	if (SRF_IS_FIRSTCALL())
+	{
+		MemoryContext oldcontext;
+		TidStoreIter *iter;
+		TidStoreIterResult *iter_result;
+		int64		ntids = 0;
+
+		funcctx = SRF_FIRSTCALL_INIT();
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+		tids = (ItemPointerData *)
+			palloc0(sizeof(ItemPointerData) * num_tids);
+
+		/* Collect TIDs stored in the tidstore */
+		TidStoreLockShare(tidstore);
+		iter = TidStoreBeginIterate(tidstore);
+		while ((iter_result = TidStoreIterateNext(iter)) != NULL)
+		{
+			for (int i = 0; i < iter_result->num_offsets; i++)
+				ItemPointerSet(&(tids[ntids++]), iter_result->blkno,
+							   iter_result->offsets[i]);
+		}
+		TidStoreUnlock(tidstore);
+
+		Assert(ntids == num_tids);
+
+		funcctx->user_fctx = tids;
+		funcctx->max_calls = num_tids;
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+
+	funcctx = SRF_PERCALL_SETUP();
+	tids = (ItemPointerData *) funcctx->user_fctx;
+
+	if (funcctx->call_cntr < funcctx->max_calls)
+	{
+		int			idx;
+
+		/*
+		 * Note that since funcctx->call_cntr is incremented in
+		 * SRF_RETURN_NEXT before return, we need to remember the current
+		 * counter to access the tid array.
+		 */
+		idx = funcctx->call_cntr;
+		SRF_RETURN_NEXT(funcctx, PointerGetDatum(&(tids[idx])));
+	}
+
+	SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * Test if the given TIDs exist on the tidstore.
+ */
+Datum
+test_lookup_tids(PG_FUNCTION_ARGS)
+{
+	ArrayType  *ta = PG_GETARG_ARRAYTYPE_P_COPY(0);
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	ItemPointer tids;
+	int			ntids;
+	Datum		values[2];
+	bool		nulls[2] = {false};
+
+	sanity_check_array(ta);
+
+	InitMaterializedSRF(fcinfo, 0);
+
+	ntids = ArrayGetNItems(ARR_NDIM(ta), ARR_DIMS(ta));
+	tids = ((ItemPointer) ARR_DATA_PTR(ta));
+
+	for (int i = 0; i < ntids; i++)
+	{
+		bool		found;
+		ItemPointerData tid = tids[i];
+
+		TidStoreLockShare(tidstore);
+		found = TidStoreIsMember(tidstore, &tid);
+		TidStoreUnlock(tidstore);
+
+		values[0] = ItemPointerGetDatum(&tid);
+		values[1] = BoolGetDatum(found);
+
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
+							 values, nulls);
+	}
+
+	return (Datum) 0;
+}
+
+/*
+ * Return true if the size of tidstore reached the maximum memory
+ * limit.
+ */
+Datum
+test_is_full(PG_FUNCTION_ARGS)
+{
+	bool		is_full;
+
+	is_full = (TidStoreMemoryUsage(tidstore) > max_bytes);
+
+	PG_RETURN_BOOL(is_full);
+}
+
+/* Free the tidstore */
+Datum
+test_destroy(PG_FUNCTION_ARGS)
+{
+	TidStoreDestroy(tidstore);
+	tidstore = NULL;
+	num_tids = 0;
+
+	if (dsa)
+		dsa_detach(dsa);
+
+	PG_RETURN_VOID();
+}
diff --git a/src/test/modules/test_tidstore/test_tidstore.control b/src/test/modules/test_tidstore/test_tidstore.control
new file mode 100644
index 0000000000..9b6bd4638f
--- /dev/null
+++ b/src/test/modules/test_tidstore/test_tidstore.control
@@ -0,0 +1,4 @@
+comment = 'Test code for tidstore'
+default_version = '1.0'
+module_pathname = '$libdir/test_tidstore'
+relocatable = true
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d3a7f75b08..a0fa0d2d1f 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -4056,3 +4056,7 @@ rfile
 ws_options
 ws_file_info
 PathKeyInfo
+TidStore
+TidStoreIter
+TidStoreIterResult
+BlocktableEntry
-- 
2.44.0

Reply via email to