On Wed, Apr 03, 2024 at 10:25:01PM +0300, Nazir Bilal Yavuz wrote:
>
> I realized the same error while working on Jakub's benchmarking results.
> 
> Cause: I was using the nblocks variable to check how many blocks will
> be returned from the streaming API. But I realized that sometimes the
> number returned from BlockSampler_Init() is not equal to the number of
> blocks that BlockSampler_Next() will return as BlockSampling algorithm
> decides how many blocks to return on the fly by using some random
> seeds.
> 
> There are a couple of solutions I thought of:
> 
> 1- Use BlockSampler_HasMore() instead of nblocks in the main loop in
> the acquire_sample_rows():
> 
> Streaming API uses this function to prefetch block numbers.
> BlockSampler_HasMore() will reach to the end first as it is used while
> prefetching, so it will start to return false while there are still
> buffers to return from the streaming API. That will cause some buffers
> at the end to not be processed.
> 
> 2- Expose something (function, variable etc.) from the streaming API
> to understand if the read is finished and there is no buffer to
> return:
> 
> I think this works but I am not sure if the streaming API allows
> something like that.
> 
> 3- Check every buffer returned from the streaming API, if it is
> invalid stop the main loop in the acquire_sample_rows():
> 
> This solves the problem but there will be two if checks for each
> buffer returned,
> - in heapam_scan_analyze_next_block() to check if the returned buffer is 
> invalid
> - to break main loop in acquire_sample_rows() if
> heapam_scan_analyze_next_block() returns false
> One of the if cases can be bypassed by moving
> heapam_scan_analyze_next_block()'s code to the main loop in the
> acquire_sample_rows().
> 
> I implemented the third solution, here is v6.

I've reviewed the patches inline below and attached a patch that has
some of my ideas on top of your patch.

> From 8d396a42186325f920d5a05e7092d8e1b66f3cdf Mon Sep 17 00:00:00 2001
> From: Nazir Bilal Yavuz <byavu...@gmail.com>
> Date: Wed, 3 Apr 2024 15:14:15 +0300
> Subject: [PATCH v6] Use streaming read API in ANALYZE
> 
> ANALYZE command gets random tuples using BlockSampler algorithm. Use
> streaming reads to get these tuples by using BlockSampler algorithm in
> streaming read API prefetch logic.
> ---
>  src/include/access/heapam.h              |  6 +-
>  src/backend/access/heap/heapam_handler.c | 22 +++---
>  src/backend/commands/analyze.c           | 85 ++++++++----------------
>  3 files changed, 42 insertions(+), 71 deletions(-)
> 
> diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
> index a307fb5f245..633caee9d95 100644
> --- a/src/include/access/heapam.h
> +++ b/src/include/access/heapam.h
> @@ -25,6 +25,7 @@
>  #include "storage/bufpage.h"
>  #include "storage/dsm.h"
>  #include "storage/lockdefs.h"
> +#include "storage/read_stream.h"
>  #include "storage/shm_toc.h"
>  #include "utils/relcache.h"
>  #include "utils/snapshot.h"
> @@ -388,9 +389,8 @@ extern bool HeapTupleIsSurelyDead(HeapTuple htup,
>                                                                 struct 
> GlobalVisState *vistest);
>  
>  /* in heap/heapam_handler.c*/
> -extern void heapam_scan_analyze_next_block(TableScanDesc scan,
> -                                                                             
>    BlockNumber blockno,
> -                                                                             
>    BufferAccessStrategy bstrategy);
> +extern bool heapam_scan_analyze_next_block(TableScanDesc scan,
> +                                                                             
>    ReadStream *stream);
>  extern bool heapam_scan_analyze_next_tuple(TableScanDesc scan,
>                                                                               
>    TransactionId OldestXmin,
>                                                                               
>    double *liverows, double *deadrows,
> diff --git a/src/backend/access/heap/heapam_handler.c 
> b/src/backend/access/heap/heapam_handler.c
> index 0952d4a98eb..d83fbbe6af3 100644
> --- a/src/backend/access/heap/heapam_handler.c
> +++ b/src/backend/access/heap/heapam_handler.c
> @@ -1054,16 +1054,16 @@ heapam_relation_copy_for_cluster(Relation OldHeap, 
> Relation NewHeap,
>  }
>  
>  /*
> - * Prepare to analyze block `blockno` of `scan`.  The scan has been started
> - * with SO_TYPE_ANALYZE option.
> + * Prepare to analyze block returned from streaming object.  If the block 
> returned
> + * from streaming object is valid, true is returned; otherwise false is 
> returned.
> + * The scan has been started with SO_TYPE_ANALYZE option.
>   *
>   * This routine holds a buffer pin and lock on the heap page.  They are held
>   * until heapam_scan_analyze_next_tuple() returns false.  That is until all 
> the
>   * items of the heap page are analyzed.
>   */
> -void
> -heapam_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
> -                                                        BufferAccessStrategy 
> bstrategy)
> +bool
> +heapam_scan_analyze_next_block(TableScanDesc scan, ReadStream *stream)
>  {
>       HeapScanDesc hscan = (HeapScanDesc) scan;
>  
> @@ -1076,11 +1076,15 @@ heapam_scan_analyze_next_block(TableScanDesc scan, 
> BlockNumber blockno,
>        * doing much work per tuple, the extra lock traffic is probably better
>        * avoided.

Personally I think heapam_scan_analyze_next_block() should be inlined.
It only has a few lines. I would find it clearer inline. At the least,
there is no reason for it (or heapam_scan_analyze_next_tuple()) to take
a TableScanDesc instead of a HeapScanDesc.

>        */
> -     hscan->rs_cblock = blockno;
> -     hscan->rs_cindex = FirstOffsetNumber;
> -     hscan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM,
> -                                                                             
> blockno, RBM_NORMAL, bstrategy);
> +     hscan->rs_cbuf = read_stream_next_buffer(stream, NULL);
> +     if (hscan->rs_cbuf == InvalidBuffer)
> +             return false;
> +
>       LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE);
> +
> +     hscan->rs_cblock = BufferGetBlockNumber(hscan->rs_cbuf);
> +     hscan->rs_cindex = FirstOffsetNumber;
> +     return true;
>  }

>  /*
> diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
> index 2fb39f3ede1..764520d5aa2 100644
> --- a/src/backend/commands/analyze.c
> +++ b/src/backend/commands/analyze.c
> @@ -1102,6 +1102,20 @@ examine_attribute(Relation onerel, int attnum, Node 
> *index_expr)
>       return stats;
>  }
>  
> +/*
> + * Prefetch callback function to get next block number while using
> + * BlockSampling algorithm
> + */
> +static BlockNumber
> +block_sampling_streaming_read_next(ReadStream *stream,
> +                                                                void 
> *user_data,
> +                                                                void 
> *per_buffer_data)
> +{
> +     BlockSamplerData *bs = user_data;
> +
> +     return BlockSampler_HasMore(bs) ? BlockSampler_Next(bs) : 
> InvalidBlockNumber;

I don't see the point of BlockSampler_HasMore() anymore. I removed it in
the attached and made BlockSampler_Next() return InvalidBlockNumber
under the same conditions. Is there a reason not to do this? There
aren't other callers. If the BlockSampler_Next() wasn't part of an API,
we could just make it the streaming read callback, but that might be
weird as it is now.

That and my other ideas in attached. Let me know what you think.

- Melanie
>From 960e52eca955086e49d80a1dba20db7db4f43b16 Mon Sep 17 00:00:00 2001
From: Nazir Bilal Yavuz <byavu...@gmail.com>
Date: Wed, 3 Apr 2024 15:14:15 +0300
Subject: [PATCH v7 1/2] Use streaming read API in ANALYZE

ANALYZE command gets random tuples using BlockSampler algorithm. Use
streaming reads to get these tuples by using BlockSampler algorithm in
streaming read API prefetch logic.
---
 src/backend/access/heap/heapam_handler.c | 22 +++---
 src/backend/commands/analyze.c           | 85 ++++++++----------------
 src/include/access/heapam.h              |  6 +-
 3 files changed, 42 insertions(+), 71 deletions(-)

diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 0952d4a98eb..d83fbbe6af3 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -1054,16 +1054,16 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 }
 
 /*
- * Prepare to analyze block `blockno` of `scan`.  The scan has been started
- * with SO_TYPE_ANALYZE option.
+ * Prepare to analyze block returned from streaming object.  If the block returned
+ * from streaming object is valid, true is returned; otherwise false is returned.
+ * The scan has been started with SO_TYPE_ANALYZE option.
  *
  * This routine holds a buffer pin and lock on the heap page.  They are held
  * until heapam_scan_analyze_next_tuple() returns false.  That is until all the
  * items of the heap page are analyzed.
  */
-void
-heapam_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
-							   BufferAccessStrategy bstrategy)
+bool
+heapam_scan_analyze_next_block(TableScanDesc scan, ReadStream *stream)
 {
 	HeapScanDesc hscan = (HeapScanDesc) scan;
 
@@ -1076,11 +1076,15 @@ heapam_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
 	 * doing much work per tuple, the extra lock traffic is probably better
 	 * avoided.
 	 */
-	hscan->rs_cblock = blockno;
-	hscan->rs_cindex = FirstOffsetNumber;
-	hscan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM,
-										blockno, RBM_NORMAL, bstrategy);
+	hscan->rs_cbuf = read_stream_next_buffer(stream, NULL);
+	if (hscan->rs_cbuf == InvalidBuffer)
+		return false;
+
 	LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE);
+
+	hscan->rs_cblock = BufferGetBlockNumber(hscan->rs_cbuf);
+	hscan->rs_cindex = FirstOffsetNumber;
+	return true;
 }
 
 /*
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 2fb39f3ede1..764520d5aa2 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -1102,6 +1102,20 @@ examine_attribute(Relation onerel, int attnum, Node *index_expr)
 	return stats;
 }
 
+/*
+ * Prefetch callback function to get next block number while using
+ * BlockSampling algorithm
+ */
+static BlockNumber
+block_sampling_streaming_read_next(ReadStream *stream,
+								   void *user_data,
+								   void *per_buffer_data)
+{
+	BlockSamplerData *bs = user_data;
+
+	return BlockSampler_HasMore(bs) ? BlockSampler_Next(bs) : InvalidBlockNumber;
+}
+
 /*
  * acquire_sample_rows -- acquire a random sample of rows from the heap
  *
@@ -1154,10 +1168,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	TableScanDesc scan;
 	BlockNumber nblocks;
 	BlockNumber blksdone = 0;
-#ifdef USE_PREFETCH
-	int			prefetch_maximum = 0;	/* blocks to prefetch if enabled */
-	BlockSamplerData prefetch_bs;
-#endif
+	ReadStream *stream;
 
 	Assert(targrows > 0);
 
@@ -1170,13 +1181,6 @@ acquire_sample_rows(Relation onerel, int elevel,
 	randseed = pg_prng_uint32(&pg_global_prng_state);
 	nblocks = BlockSampler_Init(&bs, totalblocks, targrows, randseed);
 
-#ifdef USE_PREFETCH
-	prefetch_maximum = get_tablespace_maintenance_io_concurrency(onerel->rd_rel->reltablespace);
-	/* Create another BlockSampler, using the same seed, for prefetching */
-	if (prefetch_maximum)
-		(void) BlockSampler_Init(&prefetch_bs, totalblocks, targrows, randseed);
-#endif
-
 	/* Report sampling block numbers */
 	pgstat_progress_update_param(PROGRESS_ANALYZE_BLOCKS_TOTAL,
 								 nblocks);
@@ -1187,59 +1191,21 @@ acquire_sample_rows(Relation onerel, int elevel,
 	scan = heap_beginscan(onerel, NULL, 0, NULL, NULL, SO_TYPE_ANALYZE);
 	slot = table_slot_create(onerel, NULL);
 
-#ifdef USE_PREFETCH
-
-	/*
-	 * If we are doing prefetching, then go ahead and tell the kernel about
-	 * the first set of pages we are going to want.  This also moves our
-	 * iterator out ahead of the main one being used, where we will keep it so
-	 * that we're always pre-fetching out prefetch_maximum number of blocks
-	 * ahead.
-	 */
-	if (prefetch_maximum)
-	{
-		for (int i = 0; i < prefetch_maximum; i++)
-		{
-			BlockNumber prefetch_block;
-
-			if (!BlockSampler_HasMore(&prefetch_bs))
-				break;
-
-			prefetch_block = BlockSampler_Next(&prefetch_bs);
-			PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_block);
-		}
-	}
-#endif
+	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE,
+										vac_strategy,
+										scan->rs_rd,
+										MAIN_FORKNUM,
+										block_sampling_streaming_read_next,
+										&bs,
+										0);
 
 	/* Outer loop over blocks to sample */
-	while (BlockSampler_HasMore(&bs))
+	while (true)
 	{
-		BlockNumber targblock = BlockSampler_Next(&bs);
-#ifdef USE_PREFETCH
-		BlockNumber prefetch_targblock = InvalidBlockNumber;
-
-		/*
-		 * Make sure that every time the main BlockSampler is moved forward
-		 * that our prefetch BlockSampler also gets moved forward, so that we
-		 * always stay out ahead.
-		 */
-		if (prefetch_maximum && BlockSampler_HasMore(&prefetch_bs))
-			prefetch_targblock = BlockSampler_Next(&prefetch_bs);
-#endif
-
 		vacuum_delay_point();
 
-		heapam_scan_analyze_next_block(scan, targblock, vac_strategy);
-
-#ifdef USE_PREFETCH
-
-		/*
-		 * When pre-fetching, after we get a block, tell the kernel about the
-		 * next one we will want, if there's any left.
-		 */
-		if (prefetch_maximum && prefetch_targblock != InvalidBlockNumber)
-			PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_targblock);
-#endif
+		if (!heapam_scan_analyze_next_block(scan, stream))
+			break;
 
 		while (heapam_scan_analyze_next_tuple(scan, OldestXmin, &liverows, &deadrows, slot))
 		{
@@ -1289,6 +1255,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 		pgstat_progress_update_param(PROGRESS_ANALYZE_BLOCKS_DONE,
 									 ++blksdone);
 	}
+	read_stream_end(stream);
 
 	ExecDropSingleTupleTableSlot(slot);
 	heap_endscan(scan);
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index a307fb5f245..633caee9d95 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -25,6 +25,7 @@
 #include "storage/bufpage.h"
 #include "storage/dsm.h"
 #include "storage/lockdefs.h"
+#include "storage/read_stream.h"
 #include "storage/shm_toc.h"
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
@@ -388,9 +389,8 @@ extern bool HeapTupleIsSurelyDead(HeapTuple htup,
 								  struct GlobalVisState *vistest);
 
 /* in heap/heapam_handler.c*/
-extern void heapam_scan_analyze_next_block(TableScanDesc scan,
-										   BlockNumber blockno,
-										   BufferAccessStrategy bstrategy);
+extern bool heapam_scan_analyze_next_block(TableScanDesc scan,
+										   ReadStream *stream);
 extern bool heapam_scan_analyze_next_tuple(TableScanDesc scan,
 										   TransactionId OldestXmin,
 										   double *liverows, double *deadrows,
-- 
2.40.1

>From d75cfac7a5ca380ad86569a65b678390b9768d4e Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Wed, 3 Apr 2024 16:28:46 -0400
Subject: [PATCH v7 2/2] some ideas

---
 src/backend/access/heap/heapam_handler.c | 57 +++++-------------------
 src/backend/commands/analyze.c           | 21 +++++----
 src/backend/utils/misc/sampling.c        |  9 +---
 src/include/access/heapam.h              |  4 +-
 src/include/utils/sampling.h             |  1 -
 5 files changed, 27 insertions(+), 65 deletions(-)

diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index d83fbbe6af3..aab392f50e5 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -1053,40 +1053,6 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 	pfree(isnull);
 }
 
-/*
- * Prepare to analyze block returned from streaming object.  If the block returned
- * from streaming object is valid, true is returned; otherwise false is returned.
- * The scan has been started with SO_TYPE_ANALYZE option.
- *
- * This routine holds a buffer pin and lock on the heap page.  They are held
- * until heapam_scan_analyze_next_tuple() returns false.  That is until all the
- * items of the heap page are analyzed.
- */
-bool
-heapam_scan_analyze_next_block(TableScanDesc scan, ReadStream *stream)
-{
-	HeapScanDesc hscan = (HeapScanDesc) scan;
-
-	/*
-	 * We must maintain a pin on the target page's buffer to ensure that
-	 * concurrent activity - e.g. HOT pruning - doesn't delete tuples out from
-	 * under us.  Hence, pin the page until we are done looking at it.  We
-	 * also choose to hold sharelock on the buffer throughout --- we could
-	 * release and re-acquire sharelock for each tuple, but since we aren't
-	 * doing much work per tuple, the extra lock traffic is probably better
-	 * avoided.
-	 */
-	hscan->rs_cbuf = read_stream_next_buffer(stream, NULL);
-	if (hscan->rs_cbuf == InvalidBuffer)
-		return false;
-
-	LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE);
-
-	hscan->rs_cblock = BufferGetBlockNumber(hscan->rs_cbuf);
-	hscan->rs_cindex = FirstOffsetNumber;
-	return true;
-}
-
 /*
  * Iterate over tuples in the block selected with
  * heapam_scan_analyze_next_block().  If a tuple that's suitable for sampling
@@ -1098,11 +1064,10 @@ heapam_scan_analyze_next_block(TableScanDesc scan, ReadStream *stream)
  * tuples.
  */
 bool
-heapam_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
+heapam_scan_analyze_next_tuple(HeapScanDesc scan, TransactionId OldestXmin,
 							   double *liverows, double *deadrows,
 							   TupleTableSlot *slot)
 {
-	HeapScanDesc hscan = (HeapScanDesc) scan;
 	Page		targpage;
 	OffsetNumber maxoffset;
 	BufferHeapTupleTableSlot *hslot;
@@ -1110,17 +1075,17 @@ heapam_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
 	Assert(TTS_IS_BUFFERTUPLE(slot));
 
 	hslot = (BufferHeapTupleTableSlot *) slot;
-	targpage = BufferGetPage(hscan->rs_cbuf);
+	targpage = BufferGetPage(scan->rs_cbuf);
 	maxoffset = PageGetMaxOffsetNumber(targpage);
 
 	/* Inner loop over all tuples on the selected page */
-	for (; hscan->rs_cindex <= maxoffset; hscan->rs_cindex++)
+	for (; scan->rs_cindex <= maxoffset; scan->rs_cindex++)
 	{
 		ItemId		itemid;
 		HeapTuple	targtuple = &hslot->base.tupdata;
 		bool		sample_it = false;
 
-		itemid = PageGetItemId(targpage, hscan->rs_cindex);
+		itemid = PageGetItemId(targpage, scan->rs_cindex);
 
 		/*
 		 * We ignore unused and redirect line pointers.  DEAD line pointers
@@ -1135,14 +1100,14 @@ heapam_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
 			continue;
 		}
 
-		ItemPointerSet(&targtuple->t_self, hscan->rs_cblock, hscan->rs_cindex);
+		ItemPointerSet(&targtuple->t_self, scan->rs_cblock, scan->rs_cindex);
 
-		targtuple->t_tableOid = RelationGetRelid(scan->rs_rd);
+		targtuple->t_tableOid = RelationGetRelid(scan->rs_base.rs_rd);
 		targtuple->t_data = (HeapTupleHeader) PageGetItem(targpage, itemid);
 		targtuple->t_len = ItemIdGetLength(itemid);
 
 		switch (HeapTupleSatisfiesVacuum(targtuple, OldestXmin,
-										 hscan->rs_cbuf))
+										 scan->rs_cbuf))
 		{
 			case HEAPTUPLE_LIVE:
 				sample_it = true;
@@ -1222,8 +1187,8 @@ heapam_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
 
 		if (sample_it)
 		{
-			ExecStoreBufferHeapTuple(targtuple, slot, hscan->rs_cbuf);
-			hscan->rs_cindex++;
+			ExecStoreBufferHeapTuple(targtuple, slot, scan->rs_cbuf);
+			scan->rs_cindex++;
 
 			/* note that we leave the buffer locked here! */
 			return true;
@@ -1231,8 +1196,8 @@ heapam_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
 	}
 
 	/* Now release the lock and pin on the page */
-	UnlockReleaseBuffer(hscan->rs_cbuf);
-	hscan->rs_cbuf = InvalidBuffer;
+	UnlockReleaseBuffer(scan->rs_cbuf);
+	scan->rs_cbuf = InvalidBuffer;
 
 	/* also prevent old slot contents from having pin on page */
 	ExecClearTuple(slot);
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 764520d5aa2..7002ece8be8 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -1111,9 +1111,7 @@ block_sampling_streaming_read_next(ReadStream *stream,
 								   void *user_data,
 								   void *per_buffer_data)
 {
-	BlockSamplerData *bs = user_data;
-
-	return BlockSampler_HasMore(bs) ? BlockSampler_Next(bs) : InvalidBlockNumber;
+	return BlockSampler_Next(user_data);
 }
 
 /*
@@ -1165,7 +1163,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	BlockSamplerData bs;
 	ReservoirStateData rstate;
 	TupleTableSlot *slot;
-	TableScanDesc scan;
+	HeapScanDesc scan;
 	BlockNumber nblocks;
 	BlockNumber blksdone = 0;
 	ReadStream *stream;
@@ -1188,12 +1186,12 @@ acquire_sample_rows(Relation onerel, int elevel,
 	/* Prepare for sampling rows */
 	reservoir_init_selection_state(&rstate, targrows);
 
-	scan = heap_beginscan(onerel, NULL, 0, NULL, NULL, SO_TYPE_ANALYZE);
+	scan = (HeapScanDesc) heap_beginscan(onerel, NULL, 0, NULL, NULL, SO_TYPE_ANALYZE);
 	slot = table_slot_create(onerel, NULL);
 
 	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE,
 										vac_strategy,
-										scan->rs_rd,
+										onerel,
 										MAIN_FORKNUM,
 										block_sampling_streaming_read_next,
 										&bs,
@@ -1204,9 +1202,16 @@ acquire_sample_rows(Relation onerel, int elevel,
 	{
 		vacuum_delay_point();
 
-		if (!heapam_scan_analyze_next_block(scan, stream))
+		scan->rs_cbuf = read_stream_next_buffer(stream, NULL);
+
+		if (!BufferIsValid(scan->rs_cbuf))
 			break;
 
+		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
+
+		scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
+		scan->rs_cindex = FirstOffsetNumber;
+
 		while (heapam_scan_analyze_next_tuple(scan, OldestXmin, &liverows, &deadrows, slot))
 		{
 			/*
@@ -1258,7 +1263,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	read_stream_end(stream);
 
 	ExecDropSingleTupleTableSlot(slot);
-	heap_endscan(scan);
+	heap_endscan(&scan->rs_base);
 
 	/*
 	 * If we didn't find as many tuples as we wanted then we're done. No sort
diff --git a/src/backend/utils/misc/sampling.c b/src/backend/utils/misc/sampling.c
index 933db06702c..69b73f5b115 100644
--- a/src/backend/utils/misc/sampling.c
+++ b/src/backend/utils/misc/sampling.c
@@ -54,12 +54,6 @@ BlockSampler_Init(BlockSampler bs, BlockNumber nblocks, int samplesize,
 	return Min(bs->n, bs->N);
 }
 
-bool
-BlockSampler_HasMore(BlockSampler bs)
-{
-	return (bs->t < bs->N) && (bs->m < bs->n);
-}
-
 BlockNumber
 BlockSampler_Next(BlockSampler bs)
 {
@@ -68,7 +62,8 @@ BlockSampler_Next(BlockSampler bs)
 	double		p;				/* probability to skip block */
 	double		V;				/* random */
 
-	Assert(BlockSampler_HasMore(bs));	/* hence K > 0 and k > 0 */
+	if (bs->t >= bs->N || bs->m >= bs->n)
+		return InvalidBlockNumber;
 
 	if ((BlockNumber) k >= K)
 	{
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 633caee9d95..f76dd18ad5d 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -389,9 +389,7 @@ extern bool HeapTupleIsSurelyDead(HeapTuple htup,
 								  struct GlobalVisState *vistest);
 
 /* in heap/heapam_handler.c*/
-extern bool heapam_scan_analyze_next_block(TableScanDesc scan,
-										   ReadStream *stream);
-extern bool heapam_scan_analyze_next_tuple(TableScanDesc scan,
+extern bool heapam_scan_analyze_next_tuple(HeapScanDesc scan,
 										   TransactionId OldestXmin,
 										   double *liverows, double *deadrows,
 										   TupleTableSlot *slot);
diff --git a/src/include/utils/sampling.h b/src/include/utils/sampling.h
index be48ee52bac..fb5d6820a24 100644
--- a/src/include/utils/sampling.h
+++ b/src/include/utils/sampling.h
@@ -38,7 +38,6 @@ typedef BlockSamplerData *BlockSampler;
 
 extern BlockNumber BlockSampler_Init(BlockSampler bs, BlockNumber nblocks,
 									 int samplesize, uint32 randseed);
-extern bool BlockSampler_HasMore(BlockSampler bs);
 extern BlockNumber BlockSampler_Next(BlockSampler bs);
 
 /* Reservoir sampling methods */
-- 
2.40.1

Reply via email to