On Tue, Apr 2, 2024 at 1:10 PM Heikki Linnakangas <hlinn...@iki.fi> wrote:
>
> On 01/04/2024 22:58, Melanie Plageman wrote:
> > Attached v7 has version 14 of the streaming read API as well as a few
> > small tweaks to comments and code.
>
> I saw benchmarks in this thread to show that there's no regression when
> the data is in cache, but I didn't see any benchmarks demonstrating the
> benefit of this. So I ran this quick test:

Good point! It would be good to show why we would actually want this
patch. Attached v8 is rebased over current master (which now has the
streaming read API).

On the topic of BAS_BULKREAD buffer access strategy, I think the least
we could do is add an assert like this to read_stream_begin_relation()
after calculating max_pinned_buffers.

    Assert(GetAccessStrategyBufferCount(strategy) > max_pinned_buffers);

Perhaps we should do more? I think with a ring size of 16 MB, large
SELECTs are safe for now. But I know future developers will make
changes and it would be good not to assume they will understand that
pinning more buffers than the size of the ring effectively invalidates
the ring.

- Melanie
From cfccafec650a77c53b1d78180b52db31742181ff Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Wed, 27 Mar 2024 20:25:06 -0400
Subject: [PATCH v8 3/3] Sequential scans and TID range scans stream reads

Implementing streaming read support for heap sequential scans and TID
range scans includes three parts:

Allocate the read stream object in heap_beginscan(). On rescan, reset
the stream by releasing all pinned buffers and resetting the prefetch
block.

Implement a callback returning the next block to prefetch to the
read stream infrastructure.

Invoke the read stream API when a new page is needed. When the scan
direction changes, reset the stream.
---
 src/backend/access/heap/heapam.c | 94 ++++++++++++++++++++++++++++----
 src/include/access/heapam.h      | 15 +++++
 2 files changed, 97 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 6b26f5bf8af..3546f637c13 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -221,6 +221,25 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
  * ----------------------------------------------------------------
  */
 
+static BlockNumber
+heap_scan_stream_read_next(ReadStream *pgsr, void *private_data,
+						   void *per_buffer_data)
+{
+	HeapScanDesc scan = (HeapScanDesc) private_data;
+
+	if (unlikely(!scan->rs_inited))
+	{
+		scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir);
+		scan->rs_inited = true;
+	}
+	else
+		scan->rs_prefetch_block = heapgettup_advance_block(scan,
+														   scan->rs_prefetch_block,
+														   scan->rs_dir);
+
+	return scan->rs_prefetch_block;
+}
+
 /* ----------------
  *		initscan - scan code common to heap_beginscan and heap_rescan
  * ----------------
@@ -323,6 +342,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
 	scan->rs_cbuf = InvalidBuffer;
 	scan->rs_cblock = InvalidBlockNumber;
 
+	/*
+	 * Initialize to ForwardScanDirection because it is most common and heap
+	 * scans usually must go forwards before going backward.
+	 */
+	scan->rs_dir = ForwardScanDirection;
+	scan->rs_prefetch_block = InvalidBlockNumber;
+
 	/* page-at-a-time fields are always invalid when not rs_inited */
 
 	/*
@@ -459,12 +485,14 @@ heapbuildvis(TableScanDesc sscan)
 /*
  * heapfetchbuf - subroutine for heapgettup()
  *
- * This routine reads the next block of the relation into a buffer and returns
- * with that pinned buffer saved in the scan descriptor.
+ * This routine gets gets the next block of the relation from the read stream
+ * and saves that pinned buffer in the scan descriptor.
  */
 static inline void
 heapfetchbuf(HeapScanDesc scan, ScanDirection dir)
 {
+	Assert(scan->rs_read_stream);
+
 	/* release previous scan buffer, if any */
 	if (BufferIsValid(scan->rs_cbuf))
 	{
@@ -479,19 +507,23 @@ heapfetchbuf(HeapScanDesc scan, ScanDirection dir)
 	 */
 	CHECK_FOR_INTERRUPTS();
 
-	if (unlikely(!scan->rs_inited))
+	/*
+	 * If the scan direction is changing, reset the prefetch block to the
+	 * current block. Otherwise, we will incorrectly prefetch the blocks
+	 * between the prefetch block and the current block again before
+	 * prefetching blocks in the new, correct scan direction.
+	 */
+	if (unlikely(scan->rs_dir != dir))
 	{
-		scan->rs_cblock = heapgettup_initial_block(scan, dir);
-		Assert(scan->rs_cblock != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
-		scan->rs_inited = true;
+		scan->rs_prefetch_block = scan->rs_cblock;
+		read_stream_reset(scan->rs_read_stream);
 	}
-	else
-		scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, dir);
 
-	/* read block if valid */
-	if (BlockNumberIsValid(scan->rs_cblock))
-		scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
-										   scan->rs_cblock, RBM_NORMAL, scan->rs_strategy);
+	scan->rs_dir = dir;
+
+	scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL);
+	if (BufferIsValid(scan->rs_cbuf))
+		scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
 }
 
 /*
@@ -820,6 +852,7 @@ continue_page:
 
 	scan->rs_cbuf = InvalidBuffer;
 	scan->rs_cblock = InvalidBlockNumber;
+	scan->rs_prefetch_block = InvalidBlockNumber;
 	tuple->t_data = NULL;
 	scan->rs_inited = false;
 }
@@ -910,6 +943,7 @@ continue_page:
 		ReleaseBuffer(scan->rs_cbuf);
 	scan->rs_cbuf = InvalidBuffer;
 	scan->rs_cblock = InvalidBlockNumber;
+	scan->rs_prefetch_block = InvalidBlockNumber;
 	tuple->t_data = NULL;
 	scan->rs_inited = false;
 }
@@ -1003,6 +1037,28 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 
 	initscan(scan, key, false);
 
+	scan->rs_read_stream = NULL;
+
+	/*
+	 * For sequential scans and TID range scans, we will set up a read stream.
+	 * We do not know the scan direction yet. If the scan does not end up
+	 * being a forward scan, the read stream will be freed. This should be
+	 * done after initscan() because initscan() allocates the
+	 * BufferAccessStrategy object.
+	 */
+	if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN ||
+		scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN)
+	{
+		scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL,
+														  scan->rs_strategy,
+														  scan->rs_base.rs_rd,
+														  MAIN_FORKNUM,
+														  heap_scan_stream_read_next,
+														  scan,
+														  0);
+	}
+
+
 	return (TableScanDesc) scan;
 }
 
@@ -1037,6 +1093,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
 	if (BufferIsValid(scan->rs_cbuf))
 		ReleaseBuffer(scan->rs_cbuf);
 
+	/*
+	 * The read stream is reset on rescan. This must be done before
+	 * initscan(), as some state referred to by read_stream_reset() is reset
+	 * in initscan().
+	 */
+	if (scan->rs_read_stream)
+		read_stream_reset(scan->rs_read_stream);
+
 	/*
 	 * reinitialize scan descriptor
 	 */
@@ -1056,6 +1120,12 @@ heap_endscan(TableScanDesc sscan)
 	if (BufferIsValid(scan->rs_cbuf))
 		ReleaseBuffer(scan->rs_cbuf);
 
+	/*
+	 * Must free the read stream before freeing the BufferAccessStrategy.
+	 */
+	if (scan->rs_read_stream)
+		read_stream_end(scan->rs_read_stream);
+
 	/*
 	 * decrement relation reference count and free scan descriptor storage
 	 */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 4d324c78e5b..41d32d5d95d 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -25,6 +25,7 @@
 #include "storage/bufpage.h"
 #include "storage/dsm.h"
 #include "storage/lockdefs.h"
+#include "storage/read_stream.h"
 #include "storage/shm_toc.h"
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
@@ -70,6 +71,20 @@ typedef struct HeapScanDescData
 
 	HeapTupleData rs_ctup;		/* current tuple in scan, if any */
 
+	/* For scans that stream reads */
+	ReadStream *rs_read_stream;
+
+	/*
+	 * For sequential scans and TID range scans to stream reads. The read
+	 * stream is allocated at the beginning of the scan and reset on rescan or
+	 * when the scan direction changes. The scan direction is saved each time
+	 * a new page is requested. If the scan direction changes from one page to
+	 * the next, the read stream releases all previously pinned buffers and
+	 * resets the prefetch block.
+	 */
+	ScanDirection rs_dir;
+	BlockNumber rs_prefetch_block;
+
 	/*
 	 * For parallel scans to store page allocation data.  NULL when not
 	 * performing a parallel scan.
-- 
2.40.1

From 7b474f3b4fad7fcf092bae20e67cb020846be30f Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Mon, 29 Jan 2024 11:50:01 -0500
Subject: [PATCH v8 2/3] Replace blocks with buffers in heapgettup control flow

Future commits will introduce the sequential scan streaming read user
which will implement a callback returning the next block to read.
Sequential scans previously looped through the blocks in the relation,
synchronously reading in a block and then processing it. An
InvalidBlockNumber returned by heapgettup_advance_block() meant that the
relation was exhausted and all blocks had been processed.

The streaming read API may exhaust the blocks in a relation (having read
all of them into buffers) before they have all been processed by the
sequential scan. As such, the sequential scan should continue processing
blocks until heapfetchbuf() returns InvalidBuffer.

Note that this commit does not implement the streaming read API user. It
simply restructures heapgettup() and heapgettup_pagemode() to use
buffers instead of blocks for control flow.
---
 src/backend/access/heap/heapam.c | 79 ++++++++++++++------------------
 1 file changed, 35 insertions(+), 44 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index bf2b9b19e72..6b26f5bf8af 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -83,6 +83,9 @@ static Bitmapset *HeapDetermineColumnsInfo(Relation relation,
 static bool heap_acquire_tuplock(Relation relation, ItemPointer tid,
 								 LockTupleMode mode, LockWaitPolicy wait_policy,
 								 bool *have_tuple_lock);
+static inline BlockNumber heapgettup_advance_block(HeapScanDesc scan,
+												   BlockNumber block, ScanDirection dir);
+static inline BlockNumber heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir);
 static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask,
 									  uint16 old_infomask2, TransactionId add_to_xmax,
 									  LockTupleMode mode, bool is_update,
@@ -456,14 +459,12 @@ heapbuildvis(TableScanDesc sscan)
 /*
  * heapfetchbuf - subroutine for heapgettup()
  *
- * This routine reads the specified block of the relation into a buffer and
- * returns with that pinned buffer saved in the scan descriptor.
+ * This routine reads the next block of the relation into a buffer and returns
+ * with that pinned buffer saved in the scan descriptor.
  */
 static inline void
-heapfetchbuf(HeapScanDesc scan, BlockNumber block)
+heapfetchbuf(HeapScanDesc scan, ScanDirection dir)
 {
-	Assert(block < scan->rs_nblocks);
-
 	/* release previous scan buffer, if any */
 	if (BufferIsValid(scan->rs_cbuf))
 	{
@@ -478,10 +479,19 @@ heapfetchbuf(HeapScanDesc scan, BlockNumber block)
 	 */
 	CHECK_FOR_INTERRUPTS();
 
-	/* read page using selected strategy */
-	scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, block,
-									   RBM_NORMAL, scan->rs_strategy);
-	scan->rs_cblock = block;
+	if (unlikely(!scan->rs_inited))
+	{
+		scan->rs_cblock = heapgettup_initial_block(scan, dir);
+		Assert(scan->rs_cblock != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
+		scan->rs_inited = true;
+	}
+	else
+		scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, dir);
+
+	/* read block if valid */
+	if (BlockNumberIsValid(scan->rs_cblock))
+		scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
+										   scan->rs_cblock, RBM_NORMAL, scan->rs_strategy);
 }
 
 /*
@@ -491,7 +501,7 @@ heapfetchbuf(HeapScanDesc scan, BlockNumber block)
  * occur with empty tables and in parallel scans when parallel workers get all
  * of the pages before we can get a chance to get our first page.
  */
-static BlockNumber
+BlockNumber
 heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
 {
 	Assert(!scan->rs_inited);
@@ -631,7 +641,7 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft,
  * This also adjusts rs_numblocks when a limit has been imposed by
  * heap_setscanlimits().
  */
-static inline BlockNumber
+BlockNumber
 heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir)
 {
 	if (ScanDirectionIsForward(dir))
@@ -729,23 +739,13 @@ heapgettup(HeapScanDesc scan,
 		   ScanKey key)
 {
 	HeapTuple	tuple = &(scan->rs_ctup);
-	BlockNumber block;
 	Page		page;
 	OffsetNumber lineoff;
 	int			linesleft;
 
-	if (unlikely(!scan->rs_inited))
-	{
-		block = heapgettup_initial_block(scan, dir);
-		/* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
-		Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
-		scan->rs_inited = true;
-	}
-	else
+	if (likely(scan->rs_inited))
 	{
 		/* continue from previously returned page/tuple */
-		block = scan->rs_cblock;
-
 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 		page = heapgettup_continue_page(scan, dir, &linesleft, &lineoff);
 		goto continue_page;
@@ -755,9 +755,12 @@ heapgettup(HeapScanDesc scan,
 	 * advance the scan until we find a qualifying tuple or run out of stuff
 	 * to scan
 	 */
-	while (block != InvalidBlockNumber)
+	while (true)
 	{
-		heapfetchbuf(scan, block);
+		heapfetchbuf(scan, dir);
+		if (!BufferIsValid(scan->rs_cbuf))
+			break;
+		Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock);
 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 		page = heapgettup_start_page(scan, dir, &linesleft, &lineoff);
 continue_page:
@@ -779,7 +782,7 @@ continue_page:
 
 			tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp);
 			tuple->t_len = ItemIdGetLength(lpp);
-			ItemPointerSet(&(tuple->t_self), block, lineoff);
+			ItemPointerSet(&(tuple->t_self), scan->rs_cblock, lineoff);
 
 			visible = HeapTupleSatisfiesVisibility(tuple,
 												   scan->rs_base.rs_snapshot,
@@ -809,9 +812,6 @@ continue_page:
 		 * it's time to move to the next.
 		 */
 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
-
-		/* get the BlockNumber to scan next */
-		block = heapgettup_advance_block(scan, block, dir);
 	}
 
 	/* end of scan */
@@ -844,22 +844,13 @@ heapgettup_pagemode(HeapScanDesc scan,
 					ScanKey key)
 {
 	HeapTuple	tuple = &(scan->rs_ctup);
-	BlockNumber block;
 	Page		page;
 	int			lineindex;
 	int			linesleft;
 
-	if (unlikely(!scan->rs_inited))
-	{
-		block = heapgettup_initial_block(scan, dir);
-		/* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
-		Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
-		scan->rs_inited = true;
-	}
-	else
+	if (likely(scan->rs_inited))
 	{
 		/* continue from previously returned page/tuple */
-		block = scan->rs_cblock;	/* current page */
 		page = BufferGetPage(scan->rs_cbuf);
 
 		lineindex = scan->rs_cindex + dir;
@@ -876,9 +867,12 @@ heapgettup_pagemode(HeapScanDesc scan,
 	 * advance the scan until we find a qualifying tuple or run out of stuff
 	 * to scan
 	 */
-	while (block != InvalidBlockNumber)
+	while (true)
 	{
-		heapfetchbuf(scan, block);
+		heapfetchbuf(scan, dir);
+		if (!BufferIsValid(scan->rs_cbuf))
+			break;
+		Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock);
 		heapbuildvis((TableScanDesc) scan);
 		page = BufferGetPage(scan->rs_cbuf);
 		linesleft = scan->rs_ntuples;
@@ -898,7 +892,7 @@ continue_page:
 
 			tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp);
 			tuple->t_len = ItemIdGetLength(lpp);
-			ItemPointerSet(&(tuple->t_self), block, lineoff);
+			ItemPointerSet(&(tuple->t_self), scan->rs_cblock, lineoff);
 
 			/* skip any tuples that don't match the scan key */
 			if (key != NULL &&
@@ -909,9 +903,6 @@ continue_page:
 			scan->rs_cindex = lineindex;
 			return;
 		}
-
-		/* get the BlockNumber to scan next */
-		block = heapgettup_advance_block(scan, block, dir);
 	}
 
 	/* end of scan */
-- 
2.40.1

From 2fb76f871d25a9a6967d700a5dccbe28473c0fc2 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Sat, 27 Jan 2024 18:39:37 -0500
Subject: [PATCH v8 1/3] Split heapgetpage into two parts

heapgetpage(), a per-block utility function used in heap scans, read a
passed-in block into a buffer and then, if page-at-a-time processing was
enabled, pruned the page and built an array of its visible tuples. This
was used for sequential and sample scans.

Future commits will add support for read streams. The streaming read API
will read in the blocks specified by a callback, but any significant
per-page processing should be done synchronously on the buffer yielded
by the streaming read API. To support this, separate the logic in
heapgetpage() to read a block into a buffer from that which prunes the
page and builds an array of the visible tuples. The former is now
heapfetchbuf() and the latter is now heapbuildvis().

Future commits will push the logic for selecting the next block into
heapfetchbuf() in cases when streaming reads are not supported (such as
backwards sequential scans). Because this logic differs for sample scans
and sequential scans, inline the code to read the block into a buffer
for sample scans.

This has the added benefit of allowing for a bit of refactoring in
heapam_scan_sample_next_block(), including unpinning the previous buffer
before invoking the callback to select the next block.
---
 src/backend/access/heap/heapam.c         | 74 ++++++++++++++----------
 src/backend/access/heap/heapam_handler.c | 40 +++++++++----
 src/include/access/heapam.h              |  2 +-
 3 files changed, 72 insertions(+), 44 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index a9d5b109a5e..bf2b9b19e72 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -360,17 +360,18 @@ heap_setscanlimits(TableScanDesc sscan, BlockNumber startBlk, BlockNumber numBlk
 }
 
 /*
- * heapgetpage - subroutine for heapgettup()
+ * heapbuildvis - Utility function for heap scans.
  *
- * This routine reads and pins the specified page of the relation.
- * In page-at-a-time mode it performs additional work, namely determining
- * which tuples on the page are visible.
+ * Given a page residing in a buffer saved in the scan descriptor, prune the
+ * page and determine which of its tuples are all visible, saving their offsets
+ * in an array in the scan descriptor.
  */
 void
-heapgetpage(TableScanDesc sscan, BlockNumber block)
+heapbuildvis(TableScanDesc sscan)
 {
 	HeapScanDesc scan = (HeapScanDesc) sscan;
-	Buffer		buffer;
+	Buffer		buffer = scan->rs_cbuf;
+	BlockNumber block = scan->rs_cblock;
 	Snapshot	snapshot;
 	Page		page;
 	int			lines;
@@ -378,31 +379,8 @@ heapgetpage(TableScanDesc sscan, BlockNumber block)
 	OffsetNumber lineoff;
 	bool		all_visible;
 
-	Assert(block < scan->rs_nblocks);
+	Assert(BufferGetBlockNumber(buffer) == block);
 
-	/* release previous scan buffer, if any */
-	if (BufferIsValid(scan->rs_cbuf))
-	{
-		ReleaseBuffer(scan->rs_cbuf);
-		scan->rs_cbuf = InvalidBuffer;
-	}
-
-	/*
-	 * Be sure to check for interrupts at least once per page.  Checks at
-	 * higher code levels won't be able to stop a seqscan that encounters many
-	 * pages' worth of consecutive dead tuples.
-	 */
-	CHECK_FOR_INTERRUPTS();
-
-	/* read page using selected strategy */
-	scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, block,
-									   RBM_NORMAL, scan->rs_strategy);
-	scan->rs_cblock = block;
-
-	if (!(scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE))
-		return;
-
-	buffer = scan->rs_cbuf;
 	snapshot = scan->rs_base.rs_snapshot;
 
 	/*
@@ -475,6 +453,37 @@ heapgetpage(TableScanDesc sscan, BlockNumber block)
 	scan->rs_ntuples = ntup;
 }
 
+/*
+ * heapfetchbuf - subroutine for heapgettup()
+ *
+ * This routine reads the specified block of the relation into a buffer and
+ * returns with that pinned buffer saved in the scan descriptor.
+ */
+static inline void
+heapfetchbuf(HeapScanDesc scan, BlockNumber block)
+{
+	Assert(block < scan->rs_nblocks);
+
+	/* release previous scan buffer, if any */
+	if (BufferIsValid(scan->rs_cbuf))
+	{
+		ReleaseBuffer(scan->rs_cbuf);
+		scan->rs_cbuf = InvalidBuffer;
+	}
+
+	/*
+	 * Be sure to check for interrupts at least once per page.  Checks at
+	 * higher code levels won't be able to stop a seqscan that encounters many
+	 * pages' worth of consecutive dead tuples.
+	 */
+	CHECK_FOR_INTERRUPTS();
+
+	/* read page using selected strategy */
+	scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, block,
+									   RBM_NORMAL, scan->rs_strategy);
+	scan->rs_cblock = block;
+}
+
 /*
  * heapgettup_initial_block - return the first BlockNumber to scan
  *
@@ -748,7 +757,7 @@ heapgettup(HeapScanDesc scan,
 	 */
 	while (block != InvalidBlockNumber)
 	{
-		heapgetpage((TableScanDesc) scan, block);
+		heapfetchbuf(scan, block);
 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 		page = heapgettup_start_page(scan, dir, &linesleft, &lineoff);
 continue_page:
@@ -869,7 +878,8 @@ heapgettup_pagemode(HeapScanDesc scan,
 	 */
 	while (block != InvalidBlockNumber)
 	{
-		heapgetpage((TableScanDesc) scan, block);
+		heapfetchbuf(scan, block);
+		heapbuildvis((TableScanDesc) scan);
 		page = BufferGetPage(scan->rs_cbuf);
 		linesleft = scan->rs_ntuples;
 		lineindex = ScanDirectionIsForward(dir) ? 0 : linesleft - 1;
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 0952d4a98eb..f4f670e9b24 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2352,11 +2352,14 @@ heapam_scan_sample_next_block(TableScanDesc scan, SampleScanState *scanstate)
 	if (hscan->rs_nblocks == 0)
 		return false;
 
-	if (tsm->NextSampleBlock)
+	if (BufferIsValid(hscan->rs_cbuf))
 	{
-		blockno = tsm->NextSampleBlock(scanstate, hscan->rs_nblocks);
-		hscan->rs_cblock = blockno;
+		ReleaseBuffer(hscan->rs_cbuf);
+		hscan->rs_cbuf = InvalidBuffer;
 	}
+
+	if (tsm->NextSampleBlock)
+		blockno = tsm->NextSampleBlock(scanstate, hscan->rs_nblocks);
 	else
 	{
 		/* scanning table sequentially */
@@ -2398,20 +2401,35 @@ heapam_scan_sample_next_block(TableScanDesc scan, SampleScanState *scanstate)
 		}
 	}
 
-	if (!BlockNumberIsValid(blockno))
+	hscan->rs_cblock = blockno;
+
+	if (!BlockNumberIsValid(hscan->rs_cblock))
 	{
-		if (BufferIsValid(hscan->rs_cbuf))
-			ReleaseBuffer(hscan->rs_cbuf);
-		hscan->rs_cbuf = InvalidBuffer;
-		hscan->rs_cblock = InvalidBlockNumber;
 		hscan->rs_inited = false;
-
 		return false;
 	}
 
-	heapgetpage(scan, blockno);
-	hscan->rs_inited = true;
+	Assert(hscan->rs_cblock < hscan->rs_nblocks);
+
+	/*
+	 * We may scan multiple pages before finding tuples to yield or finishing
+	 * the scan. Since we want to check for interrupts at least once per page,
+	 * do so here.
+	 */
+	CHECK_FOR_INTERRUPTS();
+
+	/* Read page using selected strategy */
+	hscan->rs_cbuf = ReadBufferExtended(hscan->rs_base.rs_rd, MAIN_FORKNUM,
+										hscan->rs_cblock, RBM_NORMAL, hscan->rs_strategy);
 
+	/*
+	 * If pagemode is allowed, prune the page and build an array of visible
+	 * tuple offsets.
+	 */
+	if (hscan->rs_base.rs_flags & SO_ALLOW_PAGEMODE)
+		heapbuildvis(scan);
+
+	hscan->rs_inited = true;
 	return true;
 }
 
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index a307fb5f245..4d324c78e5b 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -267,7 +267,7 @@ extern TableScanDesc heap_beginscan(Relation relation, Snapshot snapshot,
 									uint32 flags);
 extern void heap_setscanlimits(TableScanDesc sscan, BlockNumber startBlk,
 							   BlockNumber numBlks);
-extern void heapgetpage(TableScanDesc sscan, BlockNumber block);
+extern void heapbuildvis(TableScanDesc sscan);
 extern void heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
 						bool allow_strat, bool allow_sync, bool allow_pagemode);
 extern void heap_endscan(TableScanDesc sscan);
-- 
2.40.1

Reply via email to