On Wed, Jan 24, 2024 at 3:20 PM Melanie Plageman
<melanieplage...@gmail.com> wrote:
>
> On Wed, Jan 24, 2024 at 4:19 AM Tomas Vondra
> <tomas.von...@enterprisedb.com> wrote:
> >
> > On 1/24/24 01:51, Melanie Plageman wrote:
> > >> But I'm not sure what to do about optimizations that are more specific
> > >> to the access path. Consider for example the index-only scans. We don't
> > >> want to prefetch all the pages, we need to inspect the VM and prefetch
> > >> just the not-all-visible ones. And then pass the info to the index scan,
> > >> so that it does not need to check the VM again. It's not clear to me how
> > >> to do this with this approach.
> > >
> > > Yea, this is an issue I'll need to think about. To really spell out
> > > the problem: the callback dequeues a TID from the tid_queue and looks
> > > up its block in the VM. It's all visible. So, it shouldn't return that
> > > block to the streaming read API to fetch from the heap because it
> > > doesn't need to be read. But, where does the callback put the TID so
> > > that the caller can get it? I'm going to think more about this.
> > >
> >
> > Yes, that's the problem for index-only scans. I'd generalize it so that
> > it's about the callback being able to (a) decide if it needs to read the
> > heap page, and (b) store some custom info for the TID.
>
> Actually, I think this is no big deal. See attached. I just don't
> enqueue tids whose blocks are all visible. I had to switch the order
> from fetch heap then fill queue to fill queue then fetch heap.
>
> While doing this I noticed some wrong results in the regression tests
> (like in the alter table test), so I suspect I have some kind of
> control flow issue. Perhaps I should fix the resource leak so I can
> actually see the failing tests :)

Attached is a patch which implements a real queue and fixes some of
the issues with the previous version. It doesn't pass tests yet and
has issues. Some are bugs in my implementation I need to fix. Some are
issues we would need to solve in the streaming read API. Some are
issues with index prefetching generally.

Note that these two patches have to be applied before 21d9c3ee4e
because Thomas hasn't released a rebased version of the streaming read
API patches yet.

Issues
---
- kill prior tuple

This optimization doesn't work with index prefetching with the current
design. Kill prior tuple relies on alternating between fetching a
single index tuple and visiting the heap. After visiting the heap we
can potentially kill the immediately preceding index tuple. Once we
fetch multiple index tuples, enqueue their TIDs, and later visit the
heap, the next index page we visit may not contain all of the index
tuples deemed killable by our visit to the heap.

In our case, we could try and fix this by prefetching only heap blocks
referred to by index tuples on the same index page. Or we could try
and keep a pool of index pages pinned and go back and kill index
tuples on those pages.

Having disabled kill_prior_tuple is why the mvcc test fails. Perhaps
there is an easier way to fix this, as I don't think the mvcc test
failed on Tomas' version.

- switching scan directions

If the index scan switches directions on a given invocation of
IndexNext(), heap blocks may have already been prefetched and read for
blocks containing tuples beyond the point at which we want to switch
directions.

We could fix this by having some kind of streaming read "reset"
callback to drop all of the buffers which have been prefetched which
are now no longer needed. We'd have to go backwards from the last TID
which was yielded to the caller and figure out which buffers in the
pgsr buffer ranges are associated with all of the TIDs which were
prefetched after that TID. The TIDs are in the per_buffer_data
associated with each buffer in pgsr. The issue would be searching
through those efficiently.

The other issue is that the streaming read API does not currently
support backwards scans. So, if we switch to a backwards scan from a
forwards scan, we would need to fallback to the non streaming read
method. We could do this by just setting the TID queue size to 1
(which is what I have currently implemented). Or we could add
backwards scan support to the streaming read API.

- mark and restore

Similar to the issue with switching the scan direction, mark and
restore requires us to reset the TID queue and streaming read queue.
For now, I've hacked in something to the PlannerInfo and Plan to set
the TID queue size to 1 for plans containing a merge join (yikes).

- multiple executions

For reasons I don't entirely understand yet, multiple executions (not
rescan -- see ExecutorRun(...execute_once)) do not work. As in Tomas'
patch, I have disabled prefetching (and made the TID queue size 1)
when execute_once is false.

- Index Only Scans need to return IndexTuples

Because index only scans return either the IndexTuple pointed to by
IndexScanDesc->xs_itup or the HeapTuple pointed to by
IndexScanDesc->xs_hitup -- both of which are populated by the index
AM, we have to save copies of those IndexTupleData and HeapTupleDatas
for every TID whose block we prefetch.

This might be okay, but it is a bit sad to have to make copies of those tuples.

In this patch, I still haven't figured out the memory management part.
I copy over the tuples when enqueuing a TID queue item and then copy
them back again when the streaming read API returns the
per_buffer_data to us. Something is still not quite right here. I
suspect this is part of the reason why some of the other tests are
failing.

Other issues/gaps in my implementation:

Determining where to allocate the memory for the streaming read object
and the TID queue is an outstanding TODO. To implement a fallback
method for cases in which streaming read doesn't work, I set the queue
size to 1. This is obviously not good.

Right now, I allocate the TID queue and streaming read objects in
IndexNext() and IndexOnlyNext(). This doesn't seem ideal. Doing it in
index_beginscan() (and index_beginscan_parallel()) is tricky though
because we don't know the scan direction at that point (and the scan
direction can change). There are also callers of index_beginscan() who
do not call Index[Only]Next() (like systable_getnext() which calls
index_getnext_slot() directly).

Also, my implementation does not yet have the optimization Tomas does
to skip prefetching recently prefetched blocks. As he has said, it
probably makes sense to add something to do this in a lower layer --
such as in the streaming read API or even in bufmgr.c (maybe in
PrefetchSharedBuffer()).

- Melanie
From 550e3a4b55eb0f3edc0f8c4f691cff134b256371 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sat, 22 Jul 2023 17:31:54 +1200
Subject: [PATCH v4 1/2] Streaming Read API

---
 contrib/pg_prewarm/pg_prewarm.c          |  40 +-
 src/backend/access/transam/xlogutils.c   |   2 +-
 src/backend/postmaster/bgwriter.c        |   8 +-
 src/backend/postmaster/checkpointer.c    |  15 +-
 src/backend/storage/Makefile             |   2 +-
 src/backend/storage/aio/Makefile         |  14 +
 src/backend/storage/aio/meson.build      |   5 +
 src/backend/storage/aio/streaming_read.c | 435 ++++++++++++++++++
 src/backend/storage/buffer/bufmgr.c      | 560 +++++++++++++++--------
 src/backend/storage/buffer/localbuf.c    |  14 +-
 src/backend/storage/meson.build          |   1 +
 src/backend/storage/smgr/smgr.c          |  49 +-
 src/include/storage/bufmgr.h             |  22 +
 src/include/storage/smgr.h               |   4 +-
 src/include/storage/streaming_read.h     |  45 ++
 src/include/utils/rel.h                  |   6 -
 src/tools/pgindent/typedefs.list         |   2 +
 17 files changed, 986 insertions(+), 238 deletions(-)
 create mode 100644 src/backend/storage/aio/Makefile
 create mode 100644 src/backend/storage/aio/meson.build
 create mode 100644 src/backend/storage/aio/streaming_read.c
 create mode 100644 src/include/storage/streaming_read.h

diff --git a/contrib/pg_prewarm/pg_prewarm.c b/contrib/pg_prewarm/pg_prewarm.c
index 8541e4d6e4..9617bf130b 100644
--- a/contrib/pg_prewarm/pg_prewarm.c
+++ b/contrib/pg_prewarm/pg_prewarm.c
@@ -20,6 +20,7 @@
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "storage/smgr.h"
+#include "storage/streaming_read.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -38,6 +39,25 @@ typedef enum
 
 static PGIOAlignedBlock blockbuffer;
 
+struct pg_prewarm_streaming_read_private
+{
+	BlockNumber blocknum;
+	int64		last_block;
+};
+
+static BlockNumber
+pg_prewarm_streaming_read_next(PgStreamingRead *pgsr,
+							   void *pgsr_private,
+							   void *per_buffer_data)
+{
+	struct pg_prewarm_streaming_read_private *p = pgsr_private;
+
+	if (p->blocknum <= p->last_block)
+		return p->blocknum++;
+
+	return InvalidBlockNumber;
+}
+
 /*
  * pg_prewarm(regclass, mode text, fork text,
  *			  first_block int8, last_block int8)
@@ -183,18 +203,36 @@ pg_prewarm(PG_FUNCTION_ARGS)
 	}
 	else if (ptype == PREWARM_BUFFER)
 	{
+		struct pg_prewarm_streaming_read_private p;
+		PgStreamingRead *pgsr;
+
 		/*
 		 * In buffer mode, we actually pull the data into shared_buffers.
 		 */
+
+		/* Set up the private state for our streaming buffer read callback. */
+		p.blocknum = first_block;
+		p.last_block = last_block;
+
+		pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_DEFAULT,
+											  &p,
+											  0,
+											  NULL,
+											  BMR_REL(rel),
+											  forkNumber,
+											  pg_prewarm_streaming_read_next);
+
 		for (block = first_block; block <= last_block; ++block)
 		{
 			Buffer		buf;
 
 			CHECK_FOR_INTERRUPTS();
-			buf = ReadBufferExtended(rel, forkNumber, block, RBM_NORMAL, NULL);
+			buf = pg_streaming_read_buffer_get_next(pgsr, NULL);
 			ReleaseBuffer(buf);
 			++blocks_done;
 		}
+		Assert(pg_streaming_read_buffer_get_next(pgsr, NULL) == InvalidBuffer);
+		pg_streaming_read_free(pgsr);
 	}
 
 	/* Close relation, release lock. */
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index aa8667abd1..8775b5789b 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -657,7 +657,7 @@ XLogDropDatabase(Oid dbid)
 	 * This is unnecessarily heavy-handed, as it will close SMgrRelation
 	 * objects for other databases as well. DROP DATABASE occurs seldom enough
 	 * that it's not worth introducing a variant of smgrclose for just this
-	 * purpose. XXX: Or should we rather leave the smgr entries dangling?
+	 * purpose.
 	 */
 	smgrcloseall();
 
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index d7d6cc0cd7..13e5376619 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -246,10 +246,12 @@ BackgroundWriterMain(void)
 		if (FirstCallSinceLastCheckpoint())
 		{
 			/*
-			 * After any checkpoint, close all smgr files.  This is so we
-			 * won't hang onto smgr references to deleted files indefinitely.
+			 * After any checkpoint, free all smgr objects.  Otherwise we
+			 * would never do so for dropped relations, as the bgwriter does
+			 * not process shared invalidation messages or call
+			 * AtEOXact_SMgr().
 			 */
-			smgrcloseall();
+			smgrdestroyall();
 		}
 
 		/*
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 5e949fc885..5d843b6142 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -469,10 +469,12 @@ CheckpointerMain(void)
 				ckpt_performed = CreateRestartPoint(flags);
 
 			/*
-			 * After any checkpoint, close all smgr files.  This is so we
-			 * won't hang onto smgr references to deleted files indefinitely.
+			 * After any checkpoint, free all smgr objects.  Otherwise we
+			 * would never do so for dropped relations, as the checkpointer
+			 * does not process shared invalidation messages or call
+			 * AtEOXact_SMgr().
 			 */
-			smgrcloseall();
+			smgrdestroyall();
 
 			/*
 			 * Indicate checkpoint completion to any waiting backends.
@@ -958,11 +960,8 @@ RequestCheckpoint(int flags)
 		 */
 		CreateCheckPoint(flags | CHECKPOINT_IMMEDIATE);
 
-		/*
-		 * After any checkpoint, close all smgr files.  This is so we won't
-		 * hang onto smgr references to deleted files indefinitely.
-		 */
-		smgrcloseall();
+		/* Free all smgr objects, as CheckpointerMain() normally would. */
+		smgrdestroyall();
 
 		return;
 	}
diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile
index 8376cdfca2..eec03f6f2b 100644
--- a/src/backend/storage/Makefile
+++ b/src/backend/storage/Makefile
@@ -8,6 +8,6 @@ subdir = src/backend/storage
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS     = buffer file freespace ipc large_object lmgr page smgr sync
+SUBDIRS     = aio buffer file freespace ipc large_object lmgr page smgr sync
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile
new file mode 100644
index 0000000000..bcab44c802
--- /dev/null
+++ b/src/backend/storage/aio/Makefile
@@ -0,0 +1,14 @@
+#
+# Makefile for storage/aio
+#
+# src/backend/storage/aio/Makefile
+#
+
+subdir = src/backend/storage/aio
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = \
+	streaming_read.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build
new file mode 100644
index 0000000000..39aef2a84a
--- /dev/null
+++ b/src/backend/storage/aio/meson.build
@@ -0,0 +1,5 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+backend_sources += files(
+  'streaming_read.c',
+)
diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c
new file mode 100644
index 0000000000..19605090fe
--- /dev/null
+++ b/src/backend/storage/aio/streaming_read.c
@@ -0,0 +1,435 @@
+#include "postgres.h"
+
+#include "storage/streaming_read.h"
+#include "utils/rel.h"
+
+/*
+ * Element type for PgStreamingRead's circular array of block ranges.
+ *
+ * For hits, need_to_complete is false and there is just one block per
+ * range, already pinned and ready for use.
+ *
+ * For misses, need_to_complete is true and buffers[] holds a range of
+ * blocks that are contiguous in storage (though the buffers may not be
+ * contiguous in memory), so we can complete them with a single call to
+ * CompleteReadBuffers().
+ */
+typedef struct PgStreamingReadRange
+{
+	bool		advice_issued;
+	bool		need_complete;
+	BlockNumber blocknum;
+	int			nblocks;
+	int			per_buffer_data_index[MAX_BUFFERS_PER_TRANSFER];
+	Buffer		buffers[MAX_BUFFERS_PER_TRANSFER];
+} PgStreamingReadRange;
+
+struct PgStreamingRead
+{
+	int			max_ios;
+	int			ios_in_progress;
+	int			ios_in_progress_trigger;
+	int			max_pinned_buffers;
+	int			pinned_buffers;
+	int			pinned_buffers_trigger;
+	int			next_tail_buffer;
+	bool		finished;
+	void	   *pgsr_private;
+	PgStreamingReadBufferCB callback;
+	BufferAccessStrategy strategy;
+	BufferManagerRelation bmr;
+	ForkNumber	forknum;
+
+	bool		advice_enabled;
+
+	/* Next expected block, for detecting sequential access. */
+	BlockNumber seq_blocknum;
+
+	/* Space for optional per-buffer private data. */
+	size_t		per_buffer_data_size;
+	void	   *per_buffer_data;
+	int			per_buffer_data_next;
+
+	/* Circular buffer of ranges. */
+	int			size;
+	int			head;
+	int			tail;
+	PgStreamingReadRange ranges[FLEXIBLE_ARRAY_MEMBER];
+};
+
+static PgStreamingRead *
+pg_streaming_read_buffer_alloc_internal(int flags,
+										void *pgsr_private,
+										size_t per_buffer_data_size,
+										BufferAccessStrategy strategy)
+{
+	PgStreamingRead *pgsr;
+	int			size;
+	int			max_ios;
+	uint32		max_pinned_buffers;
+
+
+	/*
+	 * Decide how many assumed I/Os we will allow to run concurrently.  That
+	 * is, advice to the kernel to tell it that we will soon read.  This
+	 * number also affects how far we look ahead for opportunities to start
+	 * more I/Os.
+	 */
+	if (flags & PGSR_FLAG_MAINTENANCE)
+		max_ios = maintenance_io_concurrency;
+	else
+		max_ios = effective_io_concurrency;
+
+	/*
+	 * The desired level of I/O concurrency controls how far ahead we are
+	 * willing to look ahead.  We also clamp it to at least
+	 * MAX_BUFFER_PER_TRANFER so that we can have a chance to build up a full
+	 * sized read, even when max_ios is zero.
+	 */
+	max_pinned_buffers = Max(max_ios * 4, MAX_BUFFERS_PER_TRANSFER);
+
+	/*
+	 * The *_io_concurrency GUCs, we might have 0.  We want to allow at least
+	 * one, to keep our gating logic simple.
+	 */
+	max_ios = Max(max_ios, 1);
+
+	/*
+	 * Don't allow this backend to pin too many buffers.  For now we'll apply
+	 * the limit for the shared buffer pool and the local buffer pool, without
+	 * worrying which it is.
+	 */
+	LimitAdditionalPins(&max_pinned_buffers);
+	LimitAdditionalLocalPins(&max_pinned_buffers);
+	Assert(max_pinned_buffers > 0);
+
+	/*
+	 * pgsr->ranges is a circular buffer.  When it is empty, head == tail.
+	 * When it is full, there is an empty element between head and tail.  Head
+	 * can also be empty (nblocks == 0), therefore we need two extra elements
+	 * for non-occupied ranges, on top of max_pinned_buffers to allow for the
+	 * maxmimum possible number of occupied ranges of the smallest possible
+	 * size of one.
+	 */
+	size = max_pinned_buffers + 2;
+
+	pgsr = (PgStreamingRead *)
+		palloc0(offsetof(PgStreamingRead, ranges) +
+				sizeof(pgsr->ranges[0]) * size);
+
+	pgsr->max_ios = max_ios;
+	pgsr->per_buffer_data_size = per_buffer_data_size;
+	pgsr->max_pinned_buffers = max_pinned_buffers;
+	pgsr->pgsr_private = pgsr_private;
+	pgsr->strategy = strategy;
+	pgsr->size = size;
+
+#ifdef USE_PREFETCH
+
+	/*
+	 * This system supports prefetching advice.  As long as direct I/O isn't
+	 * enabled, and the caller hasn't promised sequential access, we can use
+	 * it.
+	 */
+	if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
+		(flags & PGSR_FLAG_SEQUENTIAL) == 0)
+		pgsr->advice_enabled = true;
+#endif
+
+	/*
+	 * We want to avoid creating ranges that are smaller than they could be
+	 * just because we hit max_pinned_buffers.  We only look ahead when the
+	 * number of pinned buffers falls below this trigger number, or put
+	 * another way, we stop looking ahead when we wouldn't be able to build a
+	 * "full sized" range.
+	 */
+	pgsr->pinned_buffers_trigger =
+		Max(1, (int) max_pinned_buffers - MAX_BUFFERS_PER_TRANSFER);
+
+	/* Space the callback to store extra data along with each block. */
+	if (per_buffer_data_size)
+		pgsr->per_buffer_data = palloc(per_buffer_data_size * max_pinned_buffers);
+
+	return pgsr;
+}
+
+/*
+ * Create a new streaming read object that can be used to perform the
+ * equivalent of a series of ReadBuffer() calls for one fork of one relation.
+ * Internally, it generates larger vectored reads where possible by looking
+ * ahead.
+ */
+PgStreamingRead *
+pg_streaming_read_buffer_alloc(int flags,
+							   void *pgsr_private,
+							   size_t per_buffer_data_size,
+							   BufferAccessStrategy strategy,
+							   BufferManagerRelation bmr,
+							   ForkNumber forknum,
+							   PgStreamingReadBufferCB next_block_cb)
+{
+	PgStreamingRead *result;
+
+	result = pg_streaming_read_buffer_alloc_internal(flags,
+													 pgsr_private,
+													 per_buffer_data_size,
+													 strategy);
+	result->callback = next_block_cb;
+	result->bmr = bmr;
+	result->forknum = forknum;
+
+	return result;
+}
+
+/*
+ * Start building a new range.  This is called after the previous one
+ * reached maximum size, or the callback's next block can't be merged with it.
+ *
+ * Since the previous head range has now reached its full potential size, this
+ * is also a good time to issue 'prefetch' advice, because we know that'll
+ * soon be reading.  In future, we could start an actual I/O here.
+ */
+static PgStreamingReadRange *
+pg_streaming_read_new_range(PgStreamingRead *pgsr)
+{
+	PgStreamingReadRange *head_range;
+
+	head_range = &pgsr->ranges[pgsr->head];
+	Assert(head_range->nblocks > 0);
+
+	/*
+	 * If a call to CompleteReadBuffers() will be needed, and we can issue
+	 * advice to the kernel to get the read started.  We suppress it if the
+	 * access pattern appears to be completely sequential, though, because on
+	 * some systems that interfers with the kernel's own sequential read ahead
+	 * heurstics and hurts performance.
+	 */
+	if (pgsr->advice_enabled)
+	{
+		BlockNumber blocknum = head_range->blocknum;
+		int			nblocks = head_range->nblocks;
+
+		if (head_range->need_complete && blocknum != pgsr->seq_blocknum)
+		{
+			SMgrRelation smgr =
+				pgsr->bmr.smgr ? pgsr->bmr.smgr :
+				RelationGetSmgr(pgsr->bmr.rel);
+
+			Assert(!head_range->advice_issued);
+
+			smgrprefetch(smgr, pgsr->forknum, blocknum, nblocks);
+
+			/*
+			 * Count this as an I/O that is concurrently in progress, though
+			 * we don't really know if the kernel generates a physical I/O.
+			 */
+			head_range->advice_issued = true;
+			pgsr->ios_in_progress++;
+		}
+
+		/* Remember the block after this range, for sequence detection. */
+		pgsr->seq_blocknum = blocknum + nblocks;
+	}
+
+	/* Create a new head range.  There must be space. */
+	Assert(pgsr->size > pgsr->max_pinned_buffers);
+	Assert((pgsr->head + 1) % pgsr->size != pgsr->tail);
+	if (++pgsr->head == pgsr->size)
+		pgsr->head = 0;
+	head_range = &pgsr->ranges[pgsr->head];
+	head_range->nblocks = 0;
+
+	return head_range;
+}
+
+static void
+pg_streaming_read_look_ahead(PgStreamingRead *pgsr)
+{
+	/*
+	 * If we're finished or can't start more I/O, then don't look ahead.
+	 */
+	if (pgsr->finished || pgsr->ios_in_progress == pgsr->max_ios)
+		return;
+
+	/*
+	 * We'll also wait until the number of pinned buffers falls below our
+	 * trigger level, so that we have the chance to create a full range.
+	 */
+	if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger)
+		return;
+
+	do
+	{
+		BufferManagerRelation bmr;
+		ForkNumber	forknum;
+		BlockNumber blocknum;
+		Buffer		buffer;
+		bool		found;
+		bool		need_complete;
+		PgStreamingReadRange *head_range;
+		void	   *per_buffer_data;
+
+		/* Do we have a full-sized range? */
+		head_range = &pgsr->ranges[pgsr->head];
+		if (head_range->nblocks == lengthof(head_range->buffers))
+		{
+			Assert(head_range->need_complete);
+			head_range = pg_streaming_read_new_range(pgsr);
+
+			/*
+			 * Give up now if I/O is saturated, or we wouldn't be able form
+			 * another full range after this due to the pin limit.
+			 */
+			if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger ||
+				pgsr->ios_in_progress == pgsr->max_ios)
+				break;
+		}
+
+		per_buffer_data = (char *) pgsr->per_buffer_data +
+			pgsr->per_buffer_data_size * pgsr->per_buffer_data_next;
+
+		/* Find out which block the callback wants to read next. */
+		blocknum = pgsr->callback(pgsr, pgsr->pgsr_private, per_buffer_data);
+		if (blocknum == InvalidBlockNumber)
+		{
+			pgsr->finished = true;
+			break;
+		}
+		bmr = pgsr->bmr;
+		forknum = pgsr->forknum;
+
+		Assert(pgsr->pinned_buffers < pgsr->max_pinned_buffers);
+
+		buffer = PrepareReadBuffer(bmr,
+								   forknum,
+								   blocknum,
+								   pgsr->strategy,
+								   &found);
+		pgsr->pinned_buffers++;
+
+		need_complete = !found;
+
+		/* Is there a head range that we can't extend? */
+		head_range = &pgsr->ranges[pgsr->head];
+		if (head_range->nblocks > 0 &&
+			(!need_complete ||
+			 !head_range->need_complete ||
+			 head_range->blocknum + head_range->nblocks != blocknum))
+		{
+			/* Yes, time to start building a new one. */
+			head_range = pg_streaming_read_new_range(pgsr);
+			Assert(head_range->nblocks == 0);
+		}
+
+		if (head_range->nblocks == 0)
+		{
+			/* Initialize a new range beginning at this block. */
+			head_range->blocknum = blocknum;
+			head_range->need_complete = need_complete;
+			head_range->advice_issued = false;
+		}
+		else
+		{
+			/* We can extend an existing range by one block. */
+			Assert(head_range->blocknum + head_range->nblocks == blocknum);
+			Assert(head_range->need_complete);
+		}
+
+		head_range->per_buffer_data_index[head_range->nblocks] = pgsr->per_buffer_data_next++;
+		head_range->buffers[head_range->nblocks] = buffer;
+		head_range->nblocks++;
+
+		if (pgsr->per_buffer_data_next == pgsr->max_pinned_buffers)
+			pgsr->per_buffer_data_next = 0;
+
+	} while (pgsr->pinned_buffers < pgsr->max_pinned_buffers &&
+			 pgsr->ios_in_progress < pgsr->max_ios);
+
+	if (pgsr->ranges[pgsr->head].nblocks > 0)
+		pg_streaming_read_new_range(pgsr);
+}
+
+Buffer
+pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_data)
+{
+	pg_streaming_read_look_ahead(pgsr);
+
+	/* See if we have one buffer to return. */
+	while (pgsr->tail != pgsr->head)
+	{
+		PgStreamingReadRange *tail_range;
+
+		tail_range = &pgsr->ranges[pgsr->tail];
+
+		/*
+		 * Do we need to perform an I/O before returning the buffers from this
+		 * range?
+		 */
+		if (tail_range->need_complete)
+		{
+			CompleteReadBuffers(pgsr->bmr,
+								tail_range->buffers,
+								pgsr->forknum,
+								tail_range->blocknum,
+								tail_range->nblocks,
+								false,
+								pgsr->strategy);
+			tail_range->need_complete = false;
+
+			/*
+			 * We don't really know if the kernel generated an physical I/O
+			 * when we issued advice, let alone when it finished, but it has
+			 * certainly finished after a read call returns.
+			 */
+			if (tail_range->advice_issued)
+				pgsr->ios_in_progress--;
+		}
+
+		/* Are there more buffers available in this range? */
+		if (pgsr->next_tail_buffer < tail_range->nblocks)
+		{
+			int			buffer_index;
+			Buffer		buffer;
+
+			buffer_index = pgsr->next_tail_buffer++;
+			buffer = tail_range->buffers[buffer_index];
+
+			Assert(BufferIsValid(buffer));
+
+			/* We are giving away ownership of this pinned buffer. */
+			Assert(pgsr->pinned_buffers > 0);
+			pgsr->pinned_buffers--;
+
+			if (per_buffer_data)
+				*per_buffer_data = (char *) pgsr->per_buffer_data +
+					tail_range->per_buffer_data_index[buffer_index] *
+					pgsr->per_buffer_data_size;
+
+			return buffer;
+		}
+
+		/* Advance tail to next range, if there is one. */
+		if (++pgsr->tail == pgsr->size)
+			pgsr->tail = 0;
+		pgsr->next_tail_buffer = 0;
+	}
+
+	Assert(pgsr->pinned_buffers == 0);
+
+	return InvalidBuffer;
+}
+
+void
+pg_streaming_read_free(PgStreamingRead *pgsr)
+{
+	Buffer		buffer;
+
+	/* Stop looking ahead, and unpin anything that wasn't consumed. */
+	pgsr->finished = true;
+	while ((buffer = pg_streaming_read_buffer_get_next(pgsr, NULL)) != InvalidBuffer)
+		ReleaseBuffer(buffer);
+
+	if (pgsr->per_buffer_data)
+		pfree(pgsr->per_buffer_data);
+	pfree(pgsr);
+}
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 7d601bef6d..2157a97b97 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -472,7 +472,7 @@ ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
 )
 
 
-static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence,
+static Buffer ReadBuffer_common(BufferManagerRelation bmr,
 								ForkNumber forkNum, BlockNumber blockNum,
 								ReadBufferMode mode, BufferAccessStrategy strategy,
 								bool *hit);
@@ -501,7 +501,7 @@ static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
 static int	SyncOneBuffer(int buf_id, bool skip_recently_used,
 						  WritebackContext *wb_context);
 static void WaitIO(BufferDesc *buf);
-static bool StartBufferIO(BufferDesc *buf, bool forInput);
+static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
 static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
 							  uint32 set_flag_bits, bool forget_owner);
 static void AbortBufferIO(Buffer buffer);
@@ -795,15 +795,9 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("cannot access temporary tables of other sessions")));
 
-	/*
-	 * Read the buffer, and update pgstat counters to reflect a cache hit or
-	 * miss.
-	 */
-	pgstat_count_buffer_read(reln);
-	buf = ReadBuffer_common(RelationGetSmgr(reln), reln->rd_rel->relpersistence,
+	buf = ReadBuffer_common(BMR_REL(reln),
 							forkNum, blockNum, mode, strategy, &hit);
-	if (hit)
-		pgstat_count_buffer_hit(reln);
+
 	return buf;
 }
 
@@ -827,8 +821,9 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum,
 
 	SMgrRelation smgr = smgropen(rlocator, InvalidBackendId);
 
-	return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT :
-							 RELPERSISTENCE_UNLOGGED, forkNum, blockNum,
+	return ReadBuffer_common(BMR_SMGR(smgr, permanent ? RELPERSISTENCE_PERMANENT :
+									  RELPERSISTENCE_UNLOGGED),
+							 forkNum, blockNum,
 							 mode, strategy, &hit);
 }
 
@@ -1002,7 +997,7 @@ ExtendBufferedRelTo(BufferManagerRelation bmr,
 		bool		hit;
 
 		Assert(extended_by == 0);
-		buffer = ReadBuffer_common(bmr.smgr, bmr.relpersistence,
+		buffer = ReadBuffer_common(bmr,
 								   fork, extend_to - 1, mode, strategy,
 								   &hit);
 	}
@@ -1016,18 +1011,11 @@ ExtendBufferedRelTo(BufferManagerRelation bmr,
  * *hit is set to true if the request was satisfied from shared buffer cache.
  */
 static Buffer
-ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
+ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum,
 				  BlockNumber blockNum, ReadBufferMode mode,
 				  BufferAccessStrategy strategy, bool *hit)
 {
-	BufferDesc *bufHdr;
-	Block		bufBlock;
-	bool		found;
-	IOContext	io_context;
-	IOObject	io_object;
-	bool		isLocalBuf = SmgrIsTemp(smgr);
-
-	*hit = false;
+	Buffer		buffer;
 
 	/*
 	 * Backward compatibility path, most code should use ExtendBufferedRel()
@@ -1046,175 +1034,339 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
 			flags |= EB_LOCK_FIRST;
 
-		return ExtendBufferedRel(BMR_SMGR(smgr, relpersistence),
-								 forkNum, strategy, flags);
+		*hit = false;
+
+		return ExtendBufferedRel(bmr, forkNum, strategy, flags);
 	}
 
-	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
-									   smgr->smgr_rlocator.locator.spcOid,
-									   smgr->smgr_rlocator.locator.dbOid,
-									   smgr->smgr_rlocator.locator.relNumber,
-									   smgr->smgr_rlocator.backend);
+	buffer = PrepareReadBuffer(bmr,
+							   forkNum,
+							   blockNum,
+							   strategy,
+							   hit);
+
+	/* At this point we do NOT hold any locks. */
 
+	if (mode == RBM_ZERO_AND_CLEANUP_LOCK || mode == RBM_ZERO_AND_LOCK)
+	{
+		/* if we just want zeroes and a lock, we're done */
+		ZeroBuffer(buffer, mode);
+	}
+	else if (!*hit)
+	{
+		/* we might need to perform I/O */
+		CompleteReadBuffers(bmr,
+							&buffer,
+							forkNum,
+							blockNum,
+							1,
+							mode == RBM_ZERO_ON_ERROR,
+							strategy);
+	}
+
+	return buffer;
+}
+
+/*
+ * Prepare to read a block.  The buffer is pinned.  If this is a 'hit', then
+ * the returned buffer can be used immediately.  Otherwise, a physical read
+ * should be completed with CompleteReadBuffers(), or the buffer should be
+ * zeroed with ZeroBuffer().  PrepareReadBuffer() followed by
+ * CompleteReadBuffers() or ZeroBuffer() is equivalent to ReadBuffer(), but
+ * the caller has the opportunity to combine reads of multiple neighboring
+ * blocks into one CompleteReadBuffers() call.
+ *
+ * *foundPtr is set to true for a hit, and false for a miss.
+ */
+Buffer
+PrepareReadBuffer(BufferManagerRelation bmr,
+				  ForkNumber forkNum,
+				  BlockNumber blockNum,
+				  BufferAccessStrategy strategy,
+				  bool *foundPtr)
+{
+	BufferDesc *bufHdr;
+	bool		isLocalBuf;
+	IOContext	io_context;
+	IOObject	io_object;
+
+	Assert(blockNum != P_NEW);
+
+	if (bmr.rel)
+	{
+		bmr.smgr = RelationGetSmgr(bmr.rel);
+		bmr.relpersistence = bmr.rel->rd_rel->relpersistence;
+	}
+
+	isLocalBuf = SmgrIsTemp(bmr.smgr);
 	if (isLocalBuf)
 	{
-		/*
-		 * We do not use a BufferAccessStrategy for I/O of temporary tables.
-		 * However, in some cases, the "strategy" may not be NULL, so we can't
-		 * rely on IOContextForStrategy() to set the right IOContext for us.
-		 * This may happen in cases like CREATE TEMPORARY TABLE AS...
-		 */
 		io_context = IOCONTEXT_NORMAL;
 		io_object = IOOBJECT_TEMP_RELATION;
-		bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
-		if (found)
-			pgBufferUsage.local_blks_hit++;
-		else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
-				 mode == RBM_ZERO_ON_ERROR)
-			pgBufferUsage.local_blks_read++;
 	}
 	else
 	{
-		/*
-		 * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
-		 * not currently in memory.
-		 */
 		io_context = IOContextForStrategy(strategy);
 		io_object = IOOBJECT_RELATION;
-		bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
-							 strategy, &found, io_context);
-		if (found)
-			pgBufferUsage.shared_blks_hit++;
-		else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
-				 mode == RBM_ZERO_ON_ERROR)
-			pgBufferUsage.shared_blks_read++;
 	}
 
-	/* At this point we do NOT hold any locks. */
+	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
+									   bmr.smgr->smgr_rlocator.locator.spcOid,
+									   bmr.smgr->smgr_rlocator.locator.dbOid,
+									   bmr.smgr->smgr_rlocator.locator.relNumber,
+									   bmr.smgr->smgr_rlocator.backend);
 
-	/* if it was already in the buffer pool, we're done */
-	if (found)
+	ResourceOwnerEnlarge(CurrentResourceOwner);
+	if (isLocalBuf)
+	{
+		bufHdr = LocalBufferAlloc(bmr.smgr, forkNum, blockNum, foundPtr);
+		if (*foundPtr)
+			pgBufferUsage.local_blks_hit++;
+	}
+	else
+	{
+		bufHdr = BufferAlloc(bmr.smgr, bmr.relpersistence, forkNum, blockNum,
+							 strategy, foundPtr, io_context);
+		if (*foundPtr)
+			pgBufferUsage.shared_blks_hit++;
+	}
+	if (bmr.rel)
+	{
+		/*
+		 * While pgBufferUsage's "read" counter isn't bumped unless we reach
+		 * CompleteReadBuffers() (so, not for hits, and not for buffers that
+		 * are zeroed instead), the per-relation stats always count them.
+		 */
+		pgstat_count_buffer_read(bmr.rel);
+		if (*foundPtr)
+			pgstat_count_buffer_hit(bmr.rel);
+	}
+	if (*foundPtr)
 	{
-		/* Just need to update stats before we exit */
-		*hit = true;
 		VacuumPageHit++;
 		pgstat_count_io_op(io_object, io_context, IOOP_HIT);
-
 		if (VacuumCostActive)
 			VacuumCostBalance += VacuumCostPageHit;
 
 		TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-										  smgr->smgr_rlocator.locator.spcOid,
-										  smgr->smgr_rlocator.locator.dbOid,
-										  smgr->smgr_rlocator.locator.relNumber,
-										  smgr->smgr_rlocator.backend,
-										  found);
+										  bmr.smgr->smgr_rlocator.locator.spcOid,
+										  bmr.smgr->smgr_rlocator.locator.dbOid,
+										  bmr.smgr->smgr_rlocator.locator.relNumber,
+										  bmr.smgr->smgr_rlocator.backend,
+										  true);
+	}
 
-		/*
-		 * In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked
-		 * on return.
-		 */
-		if (!isLocalBuf)
-		{
-			if (mode == RBM_ZERO_AND_LOCK)
-				LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
-							  LW_EXCLUSIVE);
-			else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
-				LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
-		}
+	return BufferDescriptorGetBuffer(bufHdr);
+}
 
-		return BufferDescriptorGetBuffer(bufHdr);
+static inline bool
+CompleteReadBuffersCanStartIO(Buffer buffer, bool nowait)
+{
+	if (BufferIsLocal(buffer))
+	{
+		BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+
+		return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
 	}
+	else
+		return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
+}
 
-	/*
-	 * if we have gotten to this point, we have allocated a buffer for the
-	 * page but its contents are not yet valid.  IO_IN_PROGRESS is set for it,
-	 * if it's a shared buffer.
-	 */
-	Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID));	/* spinlock not needed */
+/*
+ * Complete a set reads prepared with PrepareReadBuffers().  The buffers must
+ * cover a cluster of neighboring block numbers.
+ *
+ * Typically this performs one physical vector read covering the block range,
+ * but if some of the buffers have already been read in the meantime by any
+ * backend, zero or multiple reads may be performed.
+ */
+void
+CompleteReadBuffers(BufferManagerRelation bmr,
+					Buffer *buffers,
+					ForkNumber forknum,
+					BlockNumber blocknum,
+					int nblocks,
+					bool zero_on_error,
+					BufferAccessStrategy strategy)
+{
+	bool		isLocalBuf;
+	IOContext	io_context;
+	IOObject	io_object;
 
-	bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
+	if (bmr.rel)
+	{
+		bmr.smgr = RelationGetSmgr(bmr.rel);
+		bmr.relpersistence = bmr.rel->rd_rel->relpersistence;
+	}
+
+	isLocalBuf = SmgrIsTemp(bmr.smgr);
+	if (isLocalBuf)
+	{
+		io_context = IOCONTEXT_NORMAL;
+		io_object = IOOBJECT_TEMP_RELATION;
+	}
+	else
+	{
+		io_context = IOContextForStrategy(strategy);
+		io_object = IOOBJECT_RELATION;
+	}
 
 	/*
-	 * Read in the page, unless the caller intends to overwrite it and just
-	 * wants us to allocate a buffer.
+	 * We count all these blocks as read by this backend.  This is traditional
+	 * behavior, but might turn out to be not true if we find that someone
+	 * else has beaten us and completed the read of some of these blocks.  In
+	 * that case the system globally double-counts, but we traditionally don't
+	 * count this as a "hit", and we don't have a separate counter for "miss,
+	 * but another backend completed the read".
 	 */
-	if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
-		MemSet((char *) bufBlock, 0, BLCKSZ);
+	if (isLocalBuf)
+		pgBufferUsage.local_blks_read += nblocks;
 	else
+		pgBufferUsage.shared_blks_read += nblocks;
+
+	for (int i = 0; i < nblocks; ++i)
 	{
-		instr_time	io_start = pgstat_prepare_io_time(track_io_timing);
+		int			io_buffers_len;
+		Buffer		io_buffers[MAX_BUFFERS_PER_TRANSFER];
+		void	   *io_pages[MAX_BUFFERS_PER_TRANSFER];
+		instr_time	io_start;
+		BlockNumber io_first_block;
 
-		smgrread(smgr, forkNum, blockNum, bufBlock);
+#ifdef USE_ASSERT_CHECKING
 
-		pgstat_count_io_op_time(io_object, io_context,
-								IOOP_READ, io_start, 1);
+		/*
+		 * We could get all the information from buffer headers, but it can be
+		 * expensive to access buffer header cache lines so we make the caller
+		 * provide all the information we need, and assert that it is
+		 * consistent.
+		 */
+		{
+			RelFileLocator xlocator;
+			ForkNumber	xforknum;
+			BlockNumber xblocknum;
+
+			BufferGetTag(buffers[i], &xlocator, &xforknum, &xblocknum);
+			Assert(RelFileLocatorEquals(bmr.smgr->smgr_rlocator.locator, xlocator));
+			Assert(xforknum == forknum);
+			Assert(xblocknum == blocknum + i);
+		}
+#endif
+
+		/*
+		 * Skip this block if someone else has already completed it.  If an
+		 * I/O is already in progress in another backend, this will wait for
+		 * the outcome: either done, or something went wrong and we will
+		 * retry.
+		 */
+		if (!CompleteReadBuffersCanStartIO(buffers[i], false))
+		{
+			/*
+			 * Report this as a 'hit' for this backend, even though it must
+			 * have started out as a miss in PrepareReadBuffer().
+			 */
+			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i,
+											  bmr.smgr->smgr_rlocator.locator.spcOid,
+											  bmr.smgr->smgr_rlocator.locator.dbOid,
+											  bmr.smgr->smgr_rlocator.locator.relNumber,
+											  bmr.smgr->smgr_rlocator.backend,
+											  true);
+			continue;
+		}
+
+		/* We found a buffer that we need to read in. */
+		io_buffers[0] = buffers[i];
+		io_pages[0] = BufferGetBlock(buffers[i]);
+		io_first_block = blocknum + i;
+		io_buffers_len = 1;
 
-		/* check for garbage data */
-		if (!PageIsVerifiedExtended((Page) bufBlock, blockNum,
-									PIV_LOG_WARNING | PIV_REPORT_STAT))
+		/*
+		 * How many neighboring-on-disk blocks can we can scatter-read into
+		 * other buffers at the same time?  In this case we don't wait if we
+		 * see an I/O already in progress.  We already hold BM_IO_IN_PROGRESS
+		 * for the head block, so we should get on with that I/O as soon as
+		 * possible.  We'll come back to this block again, above.
+		 */
+		while ((i + 1) < nblocks &&
+			   CompleteReadBuffersCanStartIO(buffers[i + 1], true))
+		{
+			/* Must be consecutive block numbers. */
+			Assert(BufferGetBlockNumber(buffers[i + 1]) ==
+				   BufferGetBlockNumber(buffers[i]) + 1);
+
+			io_buffers[io_buffers_len] = buffers[++i];
+			io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
+		}
+
+		io_start = pgstat_prepare_io_time(track_io_timing);
+		smgrreadv(bmr.smgr, forknum, io_first_block, io_pages, io_buffers_len);
+		pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start,
+								io_buffers_len);
+
+		/* Verify each block we read, and terminate the I/O. */
+		for (int j = 0; j < io_buffers_len; ++j)
 		{
-			if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
+			BufferDesc *bufHdr;
+			Block		bufBlock;
+
+			if (isLocalBuf)
 			{
-				ereport(WARNING,
-						(errcode(ERRCODE_DATA_CORRUPTED),
-						 errmsg("invalid page in block %u of relation %s; zeroing out page",
-								blockNum,
-								relpath(smgr->smgr_rlocator, forkNum))));
-				MemSet((char *) bufBlock, 0, BLCKSZ);
+				bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1);
+				bufBlock = LocalBufHdrGetBlock(bufHdr);
 			}
 			else
-				ereport(ERROR,
-						(errcode(ERRCODE_DATA_CORRUPTED),
-						 errmsg("invalid page in block %u of relation %s",
-								blockNum,
-								relpath(smgr->smgr_rlocator, forkNum))));
-		}
-	}
-
-	/*
-	 * In RBM_ZERO_AND_LOCK / RBM_ZERO_AND_CLEANUP_LOCK mode, grab the buffer
-	 * content lock before marking the page as valid, to make sure that no
-	 * other backend sees the zeroed page before the caller has had a chance
-	 * to initialize it.
-	 *
-	 * Since no-one else can be looking at the page contents yet, there is no
-	 * difference between an exclusive lock and a cleanup-strength lock. (Note
-	 * that we cannot use LockBuffer() or LockBufferForCleanup() here, because
-	 * they assert that the buffer is already valid.)
-	 */
-	if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
-		!isLocalBuf)
-	{
-		LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
-	}
+			{
+				bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
+				bufBlock = BufHdrGetBlock(bufHdr);
+			}
 
-	if (isLocalBuf)
-	{
-		/* Only need to adjust flags */
-		uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
+			/* check for garbage data */
+			if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j,
+										PIV_LOG_WARNING | PIV_REPORT_STAT))
+			{
+				if (zero_on_error || zero_damaged_pages)
+				{
+					ereport(WARNING,
+							(errcode(ERRCODE_DATA_CORRUPTED),
+							 errmsg("invalid page in block %u of relation %s; zeroing out page",
+									io_first_block + j,
+									relpath(bmr.smgr->smgr_rlocator, forknum))));
+					memset(bufBlock, 0, BLCKSZ);
+				}
+				else
+					ereport(ERROR,
+							(errcode(ERRCODE_DATA_CORRUPTED),
+							 errmsg("invalid page in block %u of relation %s",
+									io_first_block + j,
+									relpath(bmr.smgr->smgr_rlocator, forknum))));
+			}
 
-		buf_state |= BM_VALID;
-		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
-	}
-	else
-	{
-		/* Set BM_VALID, terminate IO, and wake up any waiters */
-		TerminateBufferIO(bufHdr, false, BM_VALID, true);
-	}
+			/* Terminate I/O and set BM_VALID. */
+			if (isLocalBuf)
+			{
+				uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
 
-	VacuumPageMiss++;
-	if (VacuumCostActive)
-		VacuumCostBalance += VacuumCostPageMiss;
+				buf_state |= BM_VALID;
+				pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+			}
+			else
+			{
+				/* Set BM_VALID, terminate IO, and wake up any waiters */
+				TerminateBufferIO(bufHdr, false, BM_VALID, true);
+			}
 
-	TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-									  smgr->smgr_rlocator.locator.spcOid,
-									  smgr->smgr_rlocator.locator.dbOid,
-									  smgr->smgr_rlocator.locator.relNumber,
-									  smgr->smgr_rlocator.backend,
-									  found);
+			/* Report I/Os as completing individually. */
+			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
+											  bmr.smgr->smgr_rlocator.locator.spcOid,
+											  bmr.smgr->smgr_rlocator.locator.dbOid,
+											  bmr.smgr->smgr_rlocator.locator.relNumber,
+											  bmr.smgr->smgr_rlocator.backend,
+											  false);
+		}
 
-	return BufferDescriptorGetBuffer(bufHdr);
+		VacuumPageMiss += io_buffers_len;
+		if (VacuumCostActive)
+			VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+	}
 }
 
 /*
@@ -1228,11 +1380,8 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
  *
  * The returned buffer is pinned and is already marked as holding the
  * desired page.  If it already did have the desired page, *foundPtr is
- * set true.  Otherwise, *foundPtr is set false and the buffer is marked
- * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
- *
- * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
- * we keep it for simplicity in ReadBuffer.
+ * set true.  Otherwise, *foundPtr is set false.  A read should be
+ * performed with CompleteReadBuffers().
  *
  * io_context is passed as an output parameter to avoid calling
  * IOContextForStrategy() when there is a shared buffers hit and no IO
@@ -1291,19 +1440,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		{
 			/*
 			 * We can only get here if (a) someone else is still reading in
-			 * the page, or (b) a previous read attempt failed.  We have to
-			 * wait for any active read attempt to finish, and then set up our
-			 * own read attempt if the page is still not BM_VALID.
-			 * StartBufferIO does it all.
+			 * the page, (b) a previous read attempt failed, or (c) someone
+			 * called PrepareReadBuffer() but not yet CompleteReadBuffers().
 			 */
-			if (StartBufferIO(buf, true))
-			{
-				/*
-				 * If we get here, previous attempts to read the buffer must
-				 * have failed ... but we shall bravely try again.
-				 */
-				*foundPtr = false;
-			}
+			*foundPtr = false;
 		}
 
 		return buf;
@@ -1368,19 +1508,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		{
 			/*
 			 * We can only get here if (a) someone else is still reading in
-			 * the page, or (b) a previous read attempt failed.  We have to
-			 * wait for any active read attempt to finish, and then set up our
-			 * own read attempt if the page is still not BM_VALID.
-			 * StartBufferIO does it all.
+			 * the page, (b) a previous read attempt failed, or (c) someone
+			 * called PrepareReadBuffer() but not yet CompleteReadBuffers().
 			 */
-			if (StartBufferIO(existing_buf_hdr, true))
-			{
-				/*
-				 * If we get here, previous attempts to read the buffer must
-				 * have failed ... but we shall bravely try again.
-				 */
-				*foundPtr = false;
-			}
+			*foundPtr = false;
 		}
 
 		return existing_buf_hdr;
@@ -1412,15 +1543,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	LWLockRelease(newPartitionLock);
 
 	/*
-	 * Buffer contents are currently invalid.  Try to obtain the right to
-	 * start I/O.  If StartBufferIO returns false, then someone else managed
-	 * to read it before we did, so there's nothing left for BufferAlloc() to
-	 * do.
+	 * Buffer contents are currently invalid.
 	 */
-	if (StartBufferIO(victim_buf_hdr, true))
-		*foundPtr = false;
-	else
-		*foundPtr = true;
+	*foundPtr = false;
 
 	return victim_buf_hdr;
 }
@@ -1774,7 +1899,7 @@ again:
  * pessimistic, but outside of toy-sized shared_buffers it should allow
  * sufficient pins.
  */
-static void
+void
 LimitAdditionalPins(uint32 *additional_pins)
 {
 	uint32		max_backends;
@@ -2043,7 +2168,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 
 				buf_state &= ~BM_VALID;
 				UnlockBufHdr(existing_hdr, buf_state);
-			} while (!StartBufferIO(existing_hdr, true));
+			} while (!StartBufferIO(existing_hdr, true, false));
 		}
 		else
 		{
@@ -2066,7 +2191,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 			LWLockRelease(partition_lock);
 
 			/* XXX: could combine the locked operations in it with the above */
-			StartBufferIO(victim_buf_hdr, true);
+			StartBufferIO(victim_buf_hdr, true, false);
 		}
 	}
 
@@ -2381,7 +2506,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
 	else
 	{
 		/*
-		 * If we previously pinned the buffer, it must surely be valid.
+		 * If we previously pinned the buffer, it is likely to be valid, but
+		 * it may not be if PrepareReadBuffer() was called and
+		 * CompleteReadBuffers() hasn't been called yet.  We'll check by
+		 * loading the flags without locking.  This is racy, but it's OK to
+		 * return false spuriously: when CompleteReadBuffers() calls
+		 * StartBufferIO(), it'll see that it's now valid.
 		 *
 		 * Note: We deliberately avoid a Valgrind client request here.
 		 * Individual access methods can optionally superimpose buffer page
@@ -2390,7 +2520,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
 		 * that the buffer page is legitimately non-accessible here.  We
 		 * cannot meddle with that.
 		 */
-		result = true;
+		result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0;
 	}
 
 	ref->refcount++;
@@ -3458,7 +3588,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	 * someone else flushed the buffer before we could, so we need not do
 	 * anything.
 	 */
-	if (!StartBufferIO(buf, false))
+	if (!StartBufferIO(buf, false, false))
 		return;
 
 	/* Setup error traceback support for ereport() */
@@ -4845,6 +4975,46 @@ ConditionalLockBuffer(Buffer buffer)
 									LW_EXCLUSIVE);
 }
 
+/*
+ * Zero a buffer, and lock it as RBM_ZERO_AND_LOCK or
+ * RBM_ZERO_AND_CLEANUP_LOCK would.  The buffer must be already pinned.  It
+ * does not have to be valid, but it is valid and locked on return.
+ */
+void
+ZeroBuffer(Buffer buffer, ReadBufferMode mode)
+{
+	BufferDesc *bufHdr;
+	uint32		buf_state;
+
+	Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
+
+	if (BufferIsLocal(buffer))
+		bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+	else
+	{
+		bufHdr = GetBufferDescriptor(buffer - 1);
+		if (mode == RBM_ZERO_AND_LOCK)
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+		else
+			LockBufferForCleanup(buffer);
+	}
+
+	memset(BufferGetPage(buffer), 0, BLCKSZ);
+
+	if (BufferIsLocal(buffer))
+	{
+		buf_state = pg_atomic_read_u32(&bufHdr->state);
+		buf_state |= BM_VALID;
+		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+	}
+	else
+	{
+		buf_state = LockBufHdr(bufHdr);
+		buf_state |= BM_VALID;
+		UnlockBufHdr(bufHdr, buf_state);
+	}
+}
+
 /*
  * Verify that this backend is pinning the buffer exactly once.
  *
@@ -5197,9 +5367,15 @@ WaitIO(BufferDesc *buf)
  *
  * Returns true if we successfully marked the buffer as I/O busy,
  * false if someone else already did the work.
+ *
+ * If nowait is true, then we don't wait for an I/O to be finished by another
+ * backend.  In that case, false indicates either that the I/O was already
+ * finished, or is still in progress.  This is useful for callers that want to
+ * find out if they can perform the I/O as part of a larger operation, without
+ * waiting for the answer or distinguishing the reasons why not.
  */
 static bool
-StartBufferIO(BufferDesc *buf, bool forInput)
+StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 {
 	uint32		buf_state;
 
@@ -5212,6 +5388,8 @@ StartBufferIO(BufferDesc *buf, bool forInput)
 		if (!(buf_state & BM_IO_IN_PROGRESS))
 			break;
 		UnlockBufHdr(buf, buf_state);
+		if (nowait)
+			return false;
 		WaitIO(buf);
 	}
 
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 1be4f4f8da..717b8f58da 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -109,10 +109,9 @@ PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
  * LocalBufferAlloc -
  *	  Find or create a local buffer for the given page of the given relation.
  *
- * API is similar to bufmgr.c's BufferAlloc, except that we do not need
- * to do any locking since this is all local.   Also, IO_IN_PROGRESS
- * does not get set.  Lastly, we support only default access strategy
- * (hence, usage_count is always advanced).
+ * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
+ * any locking since this is all local.  We support only default access
+ * strategy (hence, usage_count is always advanced).
  */
 BufferDesc *
 LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
@@ -288,7 +287,7 @@ GetLocalVictimBuffer(void)
 }
 
 /* see LimitAdditionalPins() */
-static void
+void
 LimitAdditionalLocalPins(uint32 *additional_pins)
 {
 	uint32		max_pins;
@@ -298,9 +297,10 @@ LimitAdditionalLocalPins(uint32 *additional_pins)
 
 	/*
 	 * In contrast to LimitAdditionalPins() other backends don't play a role
-	 * here. We can allow up to NLocBuffer pins in total.
+	 * here. We can allow up to NLocBuffer pins in total, but it might not be
+	 * initialized yet so read num_temp_buffers.
 	 */
-	max_pins = (NLocBuffer - NLocalPinnedBuffers);
+	max_pins = (num_temp_buffers - NLocalPinnedBuffers);
 
 	if (*additional_pins >= max_pins)
 		*additional_pins = max_pins;
diff --git a/src/backend/storage/meson.build b/src/backend/storage/meson.build
index 40345bdca2..739d13293f 100644
--- a/src/backend/storage/meson.build
+++ b/src/backend/storage/meson.build
@@ -1,5 +1,6 @@
 # Copyright (c) 2022-2024, PostgreSQL Global Development Group
 
+subdir('aio')
 subdir('buffer')
 subdir('file')
 subdir('freespace')
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 563a0be5c7..0d7272e796 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -147,7 +147,9 @@ smgrshutdown(int code, Datum arg)
 /*
  * smgropen() -- Return an SMgrRelation object, creating it if need be.
  *
- * This does not attempt to actually open the underlying file.
+ * This does not attempt to actually open the underlying files.  The returned
+ * object remains valid at least until AtEOXact_SMgr() is called, or until
+ * smgrdestroy() is called in non-transaction backends.
  */
 SMgrRelation
 smgropen(RelFileLocator rlocator, BackendId backend)
@@ -259,10 +261,10 @@ smgrexists(SMgrRelation reln, ForkNumber forknum)
 }
 
 /*
- * smgrclose() -- Close and delete an SMgrRelation object.
+ * smgrdestroy() -- Delete an SMgrRelation object.
  */
 void
-smgrclose(SMgrRelation reln)
+smgrdestroy(SMgrRelation reln)
 {
 	SMgrRelation *owner;
 	ForkNumber	forknum;
@@ -289,12 +291,14 @@ smgrclose(SMgrRelation reln)
 }
 
 /*
- * smgrrelease() -- Release all resources used by this object.
+ * smgrclose() -- Release all resources used by this object.
  *
- * The object remains valid.
+ * The object remains valid, but is moved to the unknown list where it will
+ * be destroyed by AtEOXact_SMgr().  It may be re-owned if it is accessed by a
+ * relation before then.
  */
 void
-smgrrelease(SMgrRelation reln)
+smgrclose(SMgrRelation reln)
 {
 	for (ForkNumber forknum = 0; forknum <= MAX_FORKNUM; forknum++)
 	{
@@ -302,15 +306,20 @@ smgrrelease(SMgrRelation reln)
 		reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
 	}
 	reln->smgr_targblock = InvalidBlockNumber;
+
+	if (reln->smgr_owner)
+	{
+		*reln->smgr_owner = NULL;
+		reln->smgr_owner = NULL;
+		dlist_push_tail(&unowned_relns, &reln->node);
+	}
 }
 
 /*
- * smgrreleaseall() -- Release resources used by all objects.
- *
- * This is called for PROCSIGNAL_BARRIER_SMGRRELEASE.
+ * smgrcloseall() -- Close all objects.
  */
 void
-smgrreleaseall(void)
+smgrcloseall(void)
 {
 	HASH_SEQ_STATUS status;
 	SMgrRelation reln;
@@ -322,14 +331,17 @@ smgrreleaseall(void)
 	hash_seq_init(&status, SMgrRelationHash);
 
 	while ((reln = (SMgrRelation) hash_seq_search(&status)) != NULL)
-		smgrrelease(reln);
+		smgrclose(reln);
 }
 
 /*
- * smgrcloseall() -- Close all existing SMgrRelation objects.
+ * smgrdestroyall() -- Destroy all SMgrRelation objects.
+ *
+ * It must be known that there are no pointers to SMgrRelations, other than
+ * those registered with smgrsetowner().
  */
 void
-smgrcloseall(void)
+smgrdestroyall(void)
 {
 	HASH_SEQ_STATUS status;
 	SMgrRelation reln;
@@ -341,7 +353,7 @@ smgrcloseall(void)
 	hash_seq_init(&status, SMgrRelationHash);
 
 	while ((reln = (SMgrRelation) hash_seq_search(&status)) != NULL)
-		smgrclose(reln);
+		smgrdestroy(reln);
 }
 
 /*
@@ -733,7 +745,8 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
  * AtEOXact_SMgr
  *
  * This routine is called during transaction commit or abort (it doesn't
- * particularly care which).  All transient SMgrRelation objects are closed.
+ * particularly care which).  All transient SMgrRelation objects are
+ * destroyed.
  *
  * We do this as a compromise between wanting transient SMgrRelations to
  * live awhile (to amortize the costs of blind writes of multiple blocks)
@@ -747,7 +760,7 @@ AtEOXact_SMgr(void)
 	dlist_mutable_iter iter;
 
 	/*
-	 * Zap all unowned SMgrRelations.  We rely on smgrclose() to remove each
+	 * Zap all unowned SMgrRelations.  We rely on smgrdestroy() to remove each
 	 * one from the list.
 	 */
 	dlist_foreach_modify(iter, &unowned_relns)
@@ -757,7 +770,7 @@ AtEOXact_SMgr(void)
 
 		Assert(rel->smgr_owner == NULL);
 
-		smgrclose(rel);
+		smgrdestroy(rel);
 	}
 }
 
@@ -768,6 +781,6 @@ AtEOXact_SMgr(void)
 bool
 ProcessBarrierSmgrRelease(void)
 {
-	smgrreleaseall();
+	smgrcloseall();
 	return true;
 }
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index d51d46d335..a38f1acb37 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -14,6 +14,7 @@
 #ifndef BUFMGR_H
 #define BUFMGR_H
 
+#include "port/pg_iovec.h"
 #include "storage/block.h"
 #include "storage/buf.h"
 #include "storage/bufpage.h"
@@ -158,6 +159,11 @@ extern PGDLLIMPORT int32 *LocalRefCount;
 #define BUFFER_LOCK_SHARE		1
 #define BUFFER_LOCK_EXCLUSIVE	2
 
+/*
+ * Maximum number of buffers for multi-buffer I/O functions.  This is set to
+ * allow 128kB transfers, unless BLCKSZ and IOV_MAX imply a a smaller maximum.
+ */
+#define MAX_BUFFERS_PER_TRANSFER Min(PG_IOV_MAX, (128 * 1024) / BLCKSZ)
 
 /*
  * prototypes for functions in bufmgr.c
@@ -177,6 +183,18 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator,
 										ForkNumber forkNum, BlockNumber blockNum,
 										ReadBufferMode mode, BufferAccessStrategy strategy,
 										bool permanent);
+extern Buffer PrepareReadBuffer(BufferManagerRelation bmr,
+								ForkNumber forkNum,
+								BlockNumber blockNum,
+								BufferAccessStrategy strategy,
+								bool *foundPtr);
+extern void CompleteReadBuffers(BufferManagerRelation bmr,
+								Buffer *buffers,
+								ForkNumber forknum,
+								BlockNumber blocknum,
+								int nblocks,
+								bool zero_on_error,
+								BufferAccessStrategy strategy);
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
 extern bool BufferIsExclusiveLocked(Buffer buffer);
@@ -247,9 +265,13 @@ extern void LockBufferForCleanup(Buffer buffer);
 extern bool ConditionalLockBufferForCleanup(Buffer buffer);
 extern bool IsBufferCleanupOK(Buffer buffer);
 extern bool HoldingBufferPinThatDelaysRecovery(void);
+extern void ZeroBuffer(Buffer buffer, ReadBufferMode mode);
 
 extern bool BgBufferSync(struct WritebackContext *wb_context);
 
+extern void LimitAdditionalPins(uint32 *additional_pins);
+extern void LimitAdditionalLocalPins(uint32 *additional_pins);
+
 /* in buf_init.c */
 extern void InitBufferPool(void);
 extern Size BufferShmemSize(void);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 527cd2a056..d8ffe397fa 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -85,8 +85,8 @@ extern void smgrclearowner(SMgrRelation *owner, SMgrRelation reln);
 extern void smgrclose(SMgrRelation reln);
 extern void smgrcloseall(void);
 extern void smgrcloserellocator(RelFileLocatorBackend rlocator);
-extern void smgrrelease(SMgrRelation reln);
-extern void smgrreleaseall(void);
+extern void smgrdestroy(SMgrRelation reln);
+extern void smgrdestroyall(void);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h
new file mode 100644
index 0000000000..40c3408c54
--- /dev/null
+++ b/src/include/storage/streaming_read.h
@@ -0,0 +1,45 @@
+#ifndef STREAMING_READ_H
+#define STREAMING_READ_H
+
+#include "storage/bufmgr.h"
+#include "storage/fd.h"
+#include "storage/smgr.h"
+
+/* Default tuning, reasonable for many users. */
+#define PGSR_FLAG_DEFAULT 0x00
+
+/*
+ * I/O streams that are performing maintenance work on behalf of potentially
+ * many users.
+ */
+#define PGSR_FLAG_MAINTENANCE 0x01
+
+/*
+ * We usually avoid issuing prefetch advice automatically when sequential
+ * access is detected, but this flag explicitly disables it, for cases that
+ * might not be correctly detected.  Explicit advice is known to perform worse
+ * than letting the kernel (at least Linux) detect sequential access.
+ */
+#define PGSR_FLAG_SEQUENTIAL 0x02
+
+struct PgStreamingRead;
+typedef struct PgStreamingRead PgStreamingRead;
+
+/* Callback that returns the next block number to read. */
+typedef BlockNumber (*PgStreamingReadBufferCB) (PgStreamingRead *pgsr,
+												void *pgsr_private,
+												void *per_buffer_private);
+
+extern PgStreamingRead *pg_streaming_read_buffer_alloc(int flags,
+													   void *pgsr_private,
+													   size_t per_buffer_private_size,
+													   BufferAccessStrategy strategy,
+													   BufferManagerRelation bmr,
+													   ForkNumber forknum,
+													   PgStreamingReadBufferCB next_block_cb);
+
+extern void pg_streaming_read_prefetch(PgStreamingRead *pgsr);
+extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_private);
+extern void pg_streaming_read_free(PgStreamingRead *pgsr);
+
+#endif
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index a584b1ddff..6636cc82c0 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -561,12 +561,6 @@ typedef struct ViewOptions
  *
  * Very little code is authorized to touch rel->rd_smgr directly.  Instead
  * use this function to fetch its value.
- *
- * Note: since a relcache flush can cause the file handle to be closed again,
- * it's unwise to hold onto the pointer returned by this function for any
- * long period.  Recommended practice is to just re-execute RelationGetSmgr
- * each time you need to access the SMgrRelation.  It's quite cheap in
- * comparison to whatever an smgr function is going to do.
  */
 static inline SMgrRelation
 RelationGetSmgr(Relation rel)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 7e866e3c3d..0e34145187 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2093,6 +2093,8 @@ PgStat_TableCounts
 PgStat_TableStatus
 PgStat_TableXactStatus
 PgStat_WalStats
+PgStreamingRead
+PgStreamingReadRange
 PgXmlErrorContext
 PgXmlStrictness
 Pg_finfo_record
-- 
2.37.2

From ec809e08fe59ffc3eaee772d2269cb47f365c0a6 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Wed, 7 Feb 2024 12:47:51 -0500
Subject: [PATCH v4 2/2] Index scans prefetch with streaming read API

---
 src/backend/access/heap/heapam_handler.c |  55 +++++++-
 src/backend/access/index/indexam.c       | 104 ++++++++++++++-
 src/backend/executor/execMain.c          |   4 +
 src/backend/executor/nodeIndexonlyscan.c | 153 ++++++++++++++++-------
 src/backend/executor/nodeIndexscan.c     |  61 ++++++++-
 src/backend/optimizer/plan/createplan.c  |   1 +
 src/backend/optimizer/plan/planner.c     |   3 +
 src/backend/storage/aio/streaming_read.c |  11 +-
 src/include/access/genam.h               |  41 ++++++
 src/include/access/relscan.h             |  11 ++
 src/include/nodes/execnodes.h            |   1 +
 src/include/nodes/pathnodes.h            |   1 +
 src/include/nodes/plannodes.h            |   1 +
 src/include/storage/streaming_read.h     |   2 +
 src/tools/pgindent/typedefs.list         |   2 +
 15 files changed, 401 insertions(+), 50 deletions(-)

diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index d15a02b2be..03e6b522a8 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -108,6 +108,12 @@ heapam_index_fetch_end(IndexFetchTableData *scan)
 	pfree(hscan);
 }
 
+/*
+ * For those using the streaming read user, tid is an output parameter set with
+ * the latest TID obtained from the streaming read API. For non-streaming read
+ * users, tid is an input parameter and contains the next block to be read from
+ * the heap.
+ */
 static bool
 heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
 						 ItemPointer tid,
@@ -127,9 +133,52 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
 		/* Switch to correct buffer if we don't have it already */
 		Buffer		prev_buf = hscan->xs_cbuf;
 
-		hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf,
-											  hscan->xs_base.rel,
-											  ItemPointerGetBlockNumber(tid));
+		if (scan->pgsr)
+		{
+			TIDQueueItem *result;
+
+			if (BufferIsValid(hscan->xs_cbuf))
+				ReleaseBuffer(hscan->xs_cbuf);
+
+			hscan->xs_cbuf = pg_streaming_read_buffer_get_next(scan->pgsr, (void **) &result);
+			if (!BufferIsValid(hscan->xs_cbuf))
+			{
+				/*
+				 * Invalidate the item pointer to allow the caller to
+				 * distinguish between index_fetch_heap() returning false
+				 * because the tuple is not visible and because the streaming
+				 * read callback ran out of queue items.
+				 */
+				ItemPointerSetInvalid(tid);
+				return false;
+			}
+
+			/* Set this for use below */
+			*tid = result->tid;
+
+			scan->tid = result->tid;
+			scan->recheck = result->recheck;
+
+			if (scan->itup)
+				pfree(scan->itup);
+			scan->itup = NULL;
+
+			if (scan->htup)
+				pfree(scan->htup);
+			scan->htup = NULL;
+
+			if (result->itup)
+				scan->itup = CopyIndexTuple(result->itup);
+			if (result->htup)
+				scan->htup = heap_copytuple(result->htup);
+		}
+		else
+		{
+			hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf,
+												  hscan->xs_base.rel,
+												  ItemPointerGetBlockNumber(tid));
+		}
+
 
 		/*
 		 * Prune page, but only if we weren't already on this page
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index bbd499abcf..0bf50bcd83 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -248,6 +248,91 @@ index_insert_cleanup(Relation indexRelation,
 		indexRelation->rd_indam->aminsertcleanup(indexInfo);
 }
 
+void
+tid_queue_reset(TIDQueue *q)
+{
+	q->head = q->tail = 0;
+}
+
+TIDQueue *
+tid_queue_alloc(int size)
+{
+	TIDQueue   *result;
+
+	result = palloc(sizeof(TIDQueue) + (sizeof(TIDQueueItem) * size));
+	result->size = size;
+	tid_queue_reset(result);
+	return result;
+}
+
+static TIDQueueItem
+index_tid_dequeue(TIDQueue *tid_queue)
+{
+	TIDQueueItem result;
+
+	Assert(tid_queue->tail > tid_queue->head);
+	result = tid_queue->data[tid_queue->head % tid_queue->size];
+	tid_queue->head++;
+
+	return result;
+}
+
+void
+index_tid_enqueue(TIDQueue *tid_queue, ItemPointer tid, bool recheck,
+				  HeapTuple htup, IndexTuple itup)
+{
+	TIDQueueItem *cur;
+
+	Assert(tid_queue->tail >= tid_queue->head);
+	Assert(!TID_QUEUE_FULL(tid_queue));
+	cur = &tid_queue->data[tid_queue->tail % tid_queue->size];
+	ItemPointerSet(&cur->tid,
+				   ItemPointerGetBlockNumber(tid), ItemPointerGetOffsetNumber(tid));
+	cur->recheck = recheck;
+	cur->itup = NULL;
+	cur->htup = NULL;
+
+	if (itup)
+		cur->itup = CopyIndexTuple(itup);
+
+	if (htup)
+		cur->htup = heap_copytuple(htup);
+
+	tid_queue->tail++;
+}
+
+static BlockNumber
+index_pgsr_next_single(PgStreamingRead *pgsr, void *pgsr_private, void *per_buffer_data)
+{
+	IndexScanDesc scan = pgsr_private;
+	TIDQueueItem *result = per_buffer_data;
+
+	scan->kill_prior_tuple = false;
+	scan->xs_heap_continue = false;
+
+	if (TID_QUEUE_EMPTY(scan->tid_queue))
+		return InvalidBlockNumber;
+
+	*result = index_tid_dequeue(scan->tid_queue);
+	return ItemPointerGetBlockNumber(&result->tid);
+}
+
+void
+index_pgsr_alloc(IndexScanDesc scan)
+{
+	if (scan->xs_heapfetch->pgsr)
+		pg_streaming_read_free(scan->xs_heapfetch->pgsr);
+	scan->xs_heapfetch->pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_DEFAULT,
+															  scan,
+															  sizeof(TIDQueueItem),
+															  NULL,
+															  BMR_REL(scan->heapRelation),
+															  MAIN_FORKNUM,
+															  index_pgsr_next_single);
+
+	pg_streaming_read_set_resumable(scan->xs_heapfetch->pgsr);
+}
+
 /*
  * index_beginscan - start a scan of an index with amgettuple
  *
@@ -333,6 +418,8 @@ index_beginscan_internal(Relation indexRelation,
 	/* Initialize information for parallel scan. */
 	scan->parallel_scan = pscan;
 	scan->xs_temp_snap = temp_snap;
+	scan->index_done = false;
+	scan->tid_queue = NULL;
 
 	return scan;
 }
@@ -364,8 +451,12 @@ index_rescan(IndexScanDesc scan,
 	if (scan->xs_heapfetch)
 		table_index_fetch_reset(scan->xs_heapfetch);
 
+	scan->index_done = false;
+
 	scan->kill_prior_tuple = false; /* for safety */
 	scan->xs_heap_continue = false;
+	if (scan->tid_queue)
+		tid_queue_reset(scan->tid_queue);
 
 	scan->indexRelation->rd_indam->amrescan(scan, keys, nkeys,
 											orderbys, norderbys);
@@ -384,10 +475,17 @@ index_endscan(IndexScanDesc scan)
 	/* Release resources (like buffer pins) from table accesses */
 	if (scan->xs_heapfetch)
 	{
+		if (scan->xs_heapfetch->pgsr)
+			pg_streaming_read_free(scan->xs_heapfetch->pgsr);
+		scan->xs_heapfetch->pgsr = NULL;
 		table_index_fetch_end(scan->xs_heapfetch);
 		scan->xs_heapfetch = NULL;
 	}
 
+	if (scan->tid_queue)
+		pfree(scan->tid_queue);
+	scan->tid_queue = NULL;
+
 	/* End the AM's scan */
 	scan->indexRelation->rd_indam->amendscan(scan);
 
@@ -530,6 +628,9 @@ index_parallelrescan(IndexScanDesc scan)
 	if (scan->xs_heapfetch)
 		table_index_fetch_reset(scan->xs_heapfetch);
 
+	if (scan->tid_queue)
+		tid_queue_reset(scan->tid_queue);
+
 	/* amparallelrescan is optional; assume no-op if not provided by AM */
 	if (scan->indexRelation->rd_indam->amparallelrescan != NULL)
 		scan->indexRelation->rd_indam->amparallelrescan(scan);
@@ -603,6 +704,7 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
 		if (scan->xs_heapfetch)
 			table_index_fetch_reset(scan->xs_heapfetch);
 
+		scan->index_done = true;
 		return NULL;
 	}
 	Assert(ItemPointerIsValid(&scan->xs_heaptid));
@@ -651,7 +753,7 @@ index_fetch_heap(IndexScanDesc scan, TupleTableSlot *slot)
 	 * recovery because it may violate MVCC to do so.  See comments in
 	 * RelationGetIndexScan().
 	 */
-	if (!scan->xactStartedInRecovery)
+	if (!scan->xactStartedInRecovery && !scan->xs_heapfetch->pgsr)
 		scan->kill_prior_tuple = all_dead;
 
 	return found;
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 13a9b7da83..9e951a69ab 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1652,6 +1652,10 @@ ExecutePlan(EState *estate,
 	if (!execute_once)
 		use_parallel_mode = false;
 
+	estate->es_use_prefetching = execute_once;
+	if (!planstate->plan->allow_prefetch)
+		estate->es_use_prefetching = false;
+
 	estate->es_use_parallel_mode = use_parallel_mode;
 	if (use_parallel_mode)
 		EnterParallelMode();
diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c
index 2c2c9c10b5..e3824c25e0 100644
--- a/src/backend/executor/nodeIndexonlyscan.c
+++ b/src/backend/executor/nodeIndexonlyscan.c
@@ -113,61 +113,109 @@ IndexOnlyNext(IndexOnlyScanState *node)
 						 node->ioss_NumOrderByKeys);
 	}
 
+	if (!scandesc->tid_queue)
+	{
+		/* Fall back to a queue size of 1 for now */
+		int			queue_size = 1;
+
+		if (estate->es_use_prefetching && ScanDirectionIsForward(direction))
+			queue_size = TID_QUEUE_SIZE;
+		scandesc->tid_queue = tid_queue_alloc(queue_size);
+		index_pgsr_alloc(scandesc);
+	}
+
 	/*
 	 * OK, now that we have what we need, fetch the next tuple.
 	 */
-	while ((tid = index_getnext_tid(scandesc, direction)) != NULL)
+	for (;;)
 	{
-		bool		tuple_from_heap = false;
+		bool		tuple_from_heap = true;
 
 		CHECK_FOR_INTERRUPTS();
 
-		/*
-		 * We can skip the heap fetch if the TID references a heap page on
-		 * which all tuples are known visible to everybody.  In any case,
-		 * we'll use the index tuple not the heap tuple as the data source.
-		 *
-		 * Note on Memory Ordering Effects: visibilitymap_get_status does not
-		 * lock the visibility map buffer, and therefore the result we read
-		 * here could be slightly stale.  However, it can't be stale enough to
-		 * matter.
-		 *
-		 * We need to detect clearing a VM bit due to an insert right away,
-		 * because the tuple is present in the index page but not visible. The
-		 * reading of the TID by this scan (using a shared lock on the index
-		 * buffer) is serialized with the insert of the TID into the index
-		 * (using an exclusive lock on the index buffer). Because the VM bit
-		 * is cleared before updating the index, and locking/unlocking of the
-		 * index page acts as a full memory barrier, we are sure to see the
-		 * cleared bit if we see a recently-inserted TID.
-		 *
-		 * Deletes do not update the index page (only VACUUM will clear out
-		 * the TID), so the clearing of the VM bit by a delete is not
-		 * serialized with this test below, and we may see a value that is
-		 * significantly stale. However, we don't care about the delete right
-		 * away, because the tuple is still visible until the deleting
-		 * transaction commits or the statement ends (if it's our
-		 * transaction). In either case, the lock on the VM buffer will have
-		 * been released (acting as a write barrier) after clearing the bit.
-		 * And for us to have a snapshot that includes the deleting
-		 * transaction (making the tuple invisible), we must have acquired
-		 * ProcArrayLock after that time, acting as a read barrier.
-		 *
-		 * It's worth going through this complexity to avoid needing to lock
-		 * the VM buffer, which could cause significant contention.
-		 */
-		if (!VM_ALL_VISIBLE(scandesc->heapRelation,
-							ItemPointerGetBlockNumber(tid),
-							&node->ioss_VMBuffer))
+		if (!scandesc->index_done)
+		{
+			while (!TID_QUEUE_FULL(scandesc->tid_queue))
+			{
+				if ((tid = index_getnext_tid(scandesc, direction)) == NULL)
+				{
+					scandesc->index_done = true;
+					break;
+				}
+
+				/*
+				 * We can skip the heap fetch if the TID references a heap
+				 * page on which all tuples are known visible to everybody. In
+				 * any case, we'll use the index tuple not the heap tuple as
+				 * the data source.
+				 *
+				 * Note on Memory Ordering Effects: visibilitymap_get_status
+				 * does not lock the visibility map buffer, and therefore the
+				 * result we read here could be slightly stale.  However, it
+				 * can't be stale enough to matter.
+				 *
+				 * We need to detect clearing a VM bit due to an insert right
+				 * away, because the tuple is present in the index page but
+				 * not visible. The reading of the TID by this scan (using a
+				 * shared lock on the index buffer) is serialized with the
+				 * insert of the TID into the index (using an exclusive lock
+				 * on the index buffer). Because the VM bit is cleared before
+				 * updating the index, and locking/unlocking of the index page
+				 * acts as a full memory barrier, we are sure to see the
+				 * cleared bit if we see a recently-inserted TID.
+				 *
+				 * Deletes do not update the index page (only VACUUM will
+				 * clear out the TID), so the clearing of the VM bit by a
+				 * delete is not serialized with this test below, and we may
+				 * see a value that is significantly stale. However, we don't
+				 * care about the delete right away, because the tuple is
+				 * still visible until the deleting transaction commits or the
+				 * statement ends (if it's our transaction). In either case,
+				 * the lock on the VM buffer will have been released (acting
+				 * as a write barrier) after clearing the bit. And for us to
+				 * have a snapshot that includes the deleting transaction
+				 * (making the tuple invisible), we must have acquired
+				 * ProcArrayLock after that time, acting as a read barrier.
+				 *
+				 * It's worth going through this complexity to avoid needing
+				 * to lock the VM buffer, which could cause significant
+				 * contention.
+				 */
+
+				if (VM_ALL_VISIBLE(scandesc->heapRelation,
+								   ItemPointerGetBlockNumber(tid),
+								   &node->ioss_VMBuffer))
+				{
+					tuple_from_heap = false;
+					break;
+				}
+
+				index_tid_enqueue(scandesc->tid_queue, tid, scandesc->xs_recheck,
+								  scandesc->xs_hitup, scandesc->xs_itup);
+			}
+		}
+
+		if (tuple_from_heap)
 		{
 			/*
 			 * Rats, we have to visit the heap to check visibility.
 			 */
 			InstrCountTuples2(node, 1);
+
 			if (!index_fetch_heap(scandesc, node->ioss_TableSlot))
-				continue;		/* no visible tuple, try next index entry */
+			{
+				/*
+				 * Either there is no visible tuple or the streaming read ran
+				 * out of queue items and it is time to add more.
+				 */
+				if (ItemPointerIsValid(&scandesc->xs_heaptid))
+					continue;
 
-			ExecClearTuple(node->ioss_TableSlot);
+				if (!scandesc->index_done)
+					continue;
+
+				break;
+			}
 
 			/*
 			 * Only MVCC snapshots are supported here, so there should be no
@@ -185,15 +233,32 @@ IndexOnlyNext(IndexOnlyScanState *node)
 			 * entry might require a visit to the same heap page.
 			 */
 
-			tuple_from_heap = true;
+			/*
+			 * If we visit the underlying table, we need to reset the
+			 * IndexScanDesc's fields to match the per tuple state returned by
+			 * the streaming read API. The most recent index tuple fetched
+			 * will not necessarily match the current TID being processed
+			 * after returning from index_fetch_heap().
+			 */
+			scandesc->xs_recheck = scandesc->xs_heapfetch->recheck;
+			scandesc->xs_heaptid = scandesc->xs_heapfetch->tid;
+
+			scandesc->xs_hitup = scandesc->xs_heapfetch->htup;
+			scandesc->xs_itup = scandesc->xs_heapfetch->itup;
 		}
 
 		/*
 		 * Fill the scan tuple slot with data from the index.  This might be
-		 * provided in either HeapTuple or IndexTuple format.  Conceivably an
+		 * provided in either HeapTuple or IndexTuple format. Conceivably an
 		 * index AM might fill both fields, in which case we prefer the heap
-		 * format, since it's probably a bit cheaper to fill a slot from.
+		 * format, since it's probably a bit cheaper to fill a slot from. As
+		 * soon as we encounter a tuple from an all visible block, we stop
+		 * prefetching and yield the tuple. As such, we can use the IndexTuple
+		 * and HeapTuple that the index AM filled in the scan descriptor
+		 * instead of having to get them from the per tuple state yielded by
+		 * the streaming read API.
 		 */
+		ExecClearTuple(node->ioss_TableSlot);
 		if (scandesc->xs_hitup)
 		{
 			/*
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index 03142b4a94..aa72479df1 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -125,13 +125,72 @@ IndexNext(IndexScanState *node)
 						 node->iss_OrderByKeys, node->iss_NumOrderByKeys);
 	}
 
+	if (!scandesc->tid_queue)
+	{
+		/* Fall back to a queue size of 1 for now */
+		int			queue_size = 1;
+
+		if (estate->es_use_prefetching && ScanDirectionIsForward(direction))
+			queue_size = TID_QUEUE_SIZE;
+		scandesc->tid_queue = tid_queue_alloc(queue_size);
+		index_pgsr_alloc(scandesc);
+	}
+
 	/*
 	 * ok, now that we have what we need, fetch the next tuple.
 	 */
-	while (index_getnext_slot(scandesc, direction, slot))
+	for (;;)
 	{
+		ItemPointerData last_tid = scandesc->xs_heaptid;
+
+		/*
+		 * If we haven't exhausted TIDs from the index, then fill the queue
+		 * with TIDs from the index until the queue is full. Mark the index as
+		 * exhausted if we reach the end of it.
+		 */
+		if (!scandesc->index_done)
+		{
+			while (!TID_QUEUE_FULL(scandesc->tid_queue))
+			{
+				ItemPointer tid;
+
+				if ((tid = index_getnext_tid(scandesc, direction)) == NULL)
+				{
+					scandesc->index_done = true;
+					break;
+				}
+
+				index_tid_enqueue(scandesc->tid_queue, tid, scandesc->xs_recheck,
+								  NULL, NULL);
+			}
+		}
+
+		if (scandesc->xs_heap_continue)
+			scandesc->xs_heaptid = last_tid;
+
+		/*
+		 * index_fetch_heap() returns false when either the tuple isn't
+		 * visible or when there's no more to read
+		 */
+		if (!index_fetch_heap(scandesc, slot))
+		{
+			if (ItemPointerIsValid(&scandesc->xs_heaptid))
+				continue;
+
+			if (!scandesc->index_done)
+				continue;
+
+			if (scandesc->xs_heap_continue)
+				continue;
+
+			break;
+		}
+
 		CHECK_FOR_INTERRUPTS();
 
+		scandesc->xs_recheck = scandesc->xs_heapfetch->recheck;
+		scandesc->xs_heaptid = scandesc->xs_heapfetch->tid;
+
 		/*
 		 * If the index was lossy, we have to recheck the index quals using
 		 * the fetched tuple.
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 610f4a56d6..3360da8288 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -4719,6 +4719,7 @@ create_mergejoin_plan(PlannerInfo *root,
 
 	/* Costs of sort and material steps are included in path cost already */
 	copy_generic_path_info(&join_plan->join.plan, &best_path->jpath.path);
+	root->allow_prefetch = false;
 
 	return join_plan;
 }
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 2e2458b128..3be133f757 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -413,11 +413,14 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
 	root = subquery_planner(glob, parse, NULL,
 							false, tuple_fraction);
 
+	root->allow_prefetch = true;
+
 	/* Select best Path and turn it into a Plan */
 	final_rel = fetch_upper_rel(root, UPPERREL_FINAL, NULL);
 	best_path = get_cheapest_fractional_path(final_rel, tuple_fraction);
 
 	top_plan = create_plan(root, best_path);
+	top_plan->allow_prefetch = root->allow_prefetch;
 
 	/*
 	 * If creating a plan for a scrollable cursor, make sure it can run
diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c
index 19605090fe..aaccf25a7f 100644
--- a/src/backend/storage/aio/streaming_read.c
+++ b/src/backend/storage/aio/streaming_read.c
@@ -34,6 +34,7 @@ struct PgStreamingRead
 	int			pinned_buffers_trigger;
 	int			next_tail_buffer;
 	bool		finished;
+	bool		resumable;
 	void	   *pgsr_private;
 	PgStreamingReadBufferCB callback;
 	BufferAccessStrategy strategy;
@@ -245,10 +246,12 @@ pg_streaming_read_new_range(PgStreamingRead *pgsr)
 static void
 pg_streaming_read_look_ahead(PgStreamingRead *pgsr)
 {
+	bool		done = pgsr->finished && !pgsr->resumable;
+
 	/*
 	 * If we're finished or can't start more I/O, then don't look ahead.
 	 */
-	if (pgsr->finished || pgsr->ios_in_progress == pgsr->max_ios)
+	if (done || pgsr->ios_in_progress == pgsr->max_ios)
 		return;
 
 	/*
@@ -433,3 +436,9 @@ pg_streaming_read_free(PgStreamingRead *pgsr)
 		pfree(pgsr->per_buffer_data);
 	pfree(pgsr);
 }
+
+void
+pg_streaming_read_set_resumable(PgStreamingRead *pgsr)
+{
+	pgsr->resumable = true;
+}
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index 8026c2b36d..13d6fab318 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -14,6 +14,7 @@
 #ifndef GENAM_H
 #define GENAM_H
 
+#include "access/relscan.h"
 #include "access/sdir.h"
 #include "access/skey.h"
 #include "nodes/tidbitmap.h"
@@ -24,6 +25,22 @@
 /* We don't want this file to depend on execnodes.h. */
 struct IndexInfo;
 
+typedef struct TIDQueueItem
+{
+	ItemPointerData tid;
+	bool		recheck;
+	IndexTuple	itup;
+	HeapTuple	htup;
+} TIDQueueItem;
+
+typedef struct TIDQueue
+{
+	uint64		head;
+	uint64		tail;
+	int			size;
+	TIDQueueItem data[FLEXIBLE_ARRAY_MEMBER /* size */ ];
+} TIDQueue;
+
 /*
  * Struct for statistics returned by ambuild
  */
@@ -175,6 +192,30 @@ extern IndexScanDesc index_beginscan_parallel(Relation heaprel,
 											  ParallelIndexScanDesc pscan);
 extern ItemPointer index_getnext_tid(IndexScanDesc scan,
 									 ScanDirection direction);
+
+extern void index_pgsr_alloc(IndexScanDesc scan);
+
+extern void tid_queue_reset(TIDQueue *q);
+
+extern void index_tid_enqueue(TIDQueue *tid_queue, ItemPointer tid, bool recheck,
+							  HeapTuple htup, IndexTuple itup);
+
+#define TID_QUEUE_SIZE 6
+
+extern TIDQueue *tid_queue_alloc(int size);
+
+static inline bool
+TID_QUEUE_FULL(TIDQueue *q)
+{
+	return q->tail - q->head == q->size;
+}
+
+static inline bool
+TID_QUEUE_EMPTY(TIDQueue *q)
+{
+	return q->head == q->tail;
+}
+
 struct TupleTableSlot;
 extern bool index_fetch_heap(IndexScanDesc scan, struct TupleTableSlot *slot);
 extern bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction,
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 521043304a..66b3d90c83 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -18,6 +18,7 @@
 #include "access/itup.h"
 #include "port/atomics.h"
 #include "storage/buf.h"
+#include "storage/streaming_read.h"
 #include "storage/spin.h"
 #include "utils/relcache.h"
 
@@ -104,8 +105,16 @@ typedef struct ParallelBlockTableScanWorkerData *ParallelBlockTableScanWorker;
 typedef struct IndexFetchTableData
 {
 	Relation	rel;
+	PgStreamingRead *pgsr;
+
+	ItemPointerData tid;
+	bool		recheck;
+	IndexTuple	itup;
+	HeapTuple	htup;
 } IndexFetchTableData;
 
+typedef struct TIDQueue TIDQueue;
+
 /*
  * We use the same IndexScanDescData structure for both amgettuple-based
  * and amgetbitmap-based index scans.  Some fields are only relevant in
@@ -148,6 +157,8 @@ typedef struct IndexScanDescData
 	bool		xs_heap_continue;	/* T if must keep walking, potential
 									 * further results */
 	IndexFetchTableData *xs_heapfetch;
+	TIDQueue   *tid_queue;
+	bool		index_done;
 
 	bool		xs_recheck;		/* T means scan keys must be rechecked */
 
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 444a5f0fd5..a8042abf95 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -697,6 +697,7 @@ typedef struct EState
 	struct EPQState *es_epq_active;
 
 	bool		es_use_parallel_mode;	/* can we use parallel workers? */
+	bool		es_use_prefetching;
 
 	/* The per-query shared memory area to use for parallel execution. */
 	struct dsa_area *es_query_dsa;
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index 534692bee1..171066b14e 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -203,6 +203,7 @@ struct PlannerInfo
 
 	/* 1 at the outermost Query */
 	Index		query_level;
+	bool		allow_prefetch;
 
 	/* NULL at outermost Query */
 	PlannerInfo *parent_root pg_node_attr(read_write_ignore);
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index b4ef6bc44c..317de2d781 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -169,6 +169,7 @@ typedef struct Plan
 	 */
 	Bitmapset  *extParam;
 	Bitmapset  *allParam;
+	bool		allow_prefetch;
 } Plan;
 
 /* ----------------
diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h
index 40c3408c54..2288b7b5eb 100644
--- a/src/include/storage/streaming_read.h
+++ b/src/include/storage/streaming_read.h
@@ -42,4 +42,6 @@ extern void pg_streaming_read_prefetch(PgStreamingRead *pgsr);
 extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_private);
 extern void pg_streaming_read_free(PgStreamingRead *pgsr);
 
+extern void pg_streaming_read_set_resumable(PgStreamingRead *pgsr);
+
 #endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 0e34145187..ecc910ff35 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2726,6 +2726,8 @@ TBMSharedIteratorState
 TBMStatus
 TBlockState
 TIDBitmap
+TIDQueue
+TIDQueueItem
 TM_FailureData
 TM_IndexDelete
 TM_IndexDeleteOp
-- 
2.37.2

Reply via email to