On Wed, Jan 24, 2024 at 4:19 AM Tomas Vondra
<tomas.von...@enterprisedb.com> wrote:
>
> On 1/24/24 01:51, Melanie Plageman wrote:
>
> >>> There are also table AM layering violations in my sketch which would
> >>> have to be worked out (not to mention some resource leakage I didn't
> >>> bother investigating [which causes it to fail tests]).
> >>>
> >>> 0001 is all of Thomas' streaming read API code that isn't yet in
> >>> master and 0002 is my rough sketch of index prefetching using the
> >>> streaming read API
> >>>
> >>> There are also numerous optimizations that your index prefetching
> >>> patch set does that would need to be added in some way. I haven't
> >>> thought much about it yet. I wanted to see what you thought of this
> >>> approach first. Basically, is it workable?
> >>
> >> It seems workable, yes. I'm not sure it's much simpler than my patch
> >> (considering a lot of the code is in the optimizations, which are
> >> missing from this patch).
> >>
> >> I think the question is where should the optimizations happen. I suppose
> >> some of them might/should happen in the StreamingRead API itself - like
> >> the detection of sequential patterns, recently prefetched blocks, ...
> >
> > So, the streaming read API does detection of sequential patterns and
> > not prefetching things that are in shared buffers. It doesn't handle
> > avoiding prefetching recently prefetched blocks yet AFAIK. But I
> > daresay this would be relevant for other streaming read users and
> > could certainly be implemented there.
> >
>
> Yes, the "recently prefetched stuff" cache seems like a fairly natural
> complement to the pattern detection and shared-buffers check.
>
> FWIW I wonder if we should make some of this customizable, so that
> systems with customized storage (e.g. neon or with direct I/O) can e.g.
> disable some of these checks. Or replace them with their version.

That's a promising idea.

> >> But I'm not sure what to do about optimizations that are more specific
> >> to the access path. Consider for example the index-only scans. We don't
> >> want to prefetch all the pages, we need to inspect the VM and prefetch
> >> just the not-all-visible ones. And then pass the info to the index scan,
> >> so that it does not need to check the VM again. It's not clear to me how
> >> to do this with this approach.
> >
> > Yea, this is an issue I'll need to think about. To really spell out
> > the problem: the callback dequeues a TID from the tid_queue and looks
> > up its block in the VM. It's all visible. So, it shouldn't return that
> > block to the streaming read API to fetch from the heap because it
> > doesn't need to be read. But, where does the callback put the TID so
> > that the caller can get it? I'm going to think more about this.
> >
>
> Yes, that's the problem for index-only scans. I'd generalize it so that
> it's about the callback being able to (a) decide if it needs to read the
> heap page, and (b) store some custom info for the TID.

Actually, I think this is no big deal. See attached. I just don't
enqueue tids whose blocks are all visible. I had to switch the order
from fetch heap then fill queue to fill queue then fetch heap.

While doing this I noticed some wrong results in the regression tests
(like in the alter table test), so I suspect I have some kind of
control flow issue. Perhaps I should fix the resource leak so I can
actually see the failing tests :)

As for your a) and b) above.

Regarding a): We discussed allowing speculative prefetching and
separating the logic for prefetching from actually reading blocks (so
you can prefetch blocks you ultimately don't read). We decided this
may not belong in a streaming read API. What do you think?

Regarding b): We can store per buffer data for anything that actually
goes down through the streaming read API, but, in the index only case,
we don't want the streaming read API to know about blocks that it
doesn't actually need to read.

- Melanie
From f6cb591ba520351ab7f0e7cbf9d6df3dacda6b44 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sat, 22 Jul 2023 17:31:54 +1200
Subject: [PATCH v3 1/2] Streaming Read API

---
 contrib/pg_prewarm/pg_prewarm.c          |  40 +-
 src/backend/access/transam/xlogutils.c   |   2 +-
 src/backend/postmaster/bgwriter.c        |   8 +-
 src/backend/postmaster/checkpointer.c    |  15 +-
 src/backend/storage/Makefile             |   2 +-
 src/backend/storage/aio/Makefile         |  14 +
 src/backend/storage/aio/meson.build      |   5 +
 src/backend/storage/aio/streaming_read.c | 435 ++++++++++++++++++
 src/backend/storage/buffer/bufmgr.c      | 560 +++++++++++++++--------
 src/backend/storage/buffer/localbuf.c    |  14 +-
 src/backend/storage/meson.build          |   1 +
 src/backend/storage/smgr/smgr.c          |  49 +-
 src/include/storage/bufmgr.h             |  22 +
 src/include/storage/smgr.h               |   4 +-
 src/include/storage/streaming_read.h     |  45 ++
 src/include/utils/rel.h                  |   6 -
 src/tools/pgindent/typedefs.list         |   2 +
 17 files changed, 986 insertions(+), 238 deletions(-)
 create mode 100644 src/backend/storage/aio/Makefile
 create mode 100644 src/backend/storage/aio/meson.build
 create mode 100644 src/backend/storage/aio/streaming_read.c
 create mode 100644 src/include/storage/streaming_read.h

diff --git a/contrib/pg_prewarm/pg_prewarm.c b/contrib/pg_prewarm/pg_prewarm.c
index 8541e4d6e4..9617bf130b 100644
--- a/contrib/pg_prewarm/pg_prewarm.c
+++ b/contrib/pg_prewarm/pg_prewarm.c
@@ -20,6 +20,7 @@
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "storage/smgr.h"
+#include "storage/streaming_read.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -38,6 +39,25 @@ typedef enum
 
 static PGIOAlignedBlock blockbuffer;
 
+struct pg_prewarm_streaming_read_private
+{
+       BlockNumber blocknum;
+       int64           last_block;
+};
+
+static BlockNumber
+pg_prewarm_streaming_read_next(PgStreamingRead *pgsr,
+                                                          void *pgsr_private,
+                                                          void 
*per_buffer_data)
+{
+       struct pg_prewarm_streaming_read_private *p = pgsr_private;
+
+       if (p->blocknum <= p->last_block)
+               return p->blocknum++;
+
+       return InvalidBlockNumber;
+}
+
 /*
  * pg_prewarm(regclass, mode text, fork text,
  *                       first_block int8, last_block int8)
@@ -183,18 +203,36 @@ pg_prewarm(PG_FUNCTION_ARGS)
        }
        else if (ptype == PREWARM_BUFFER)
        {
+               struct pg_prewarm_streaming_read_private p;
+               PgStreamingRead *pgsr;
+
                /*
                 * In buffer mode, we actually pull the data into 
shared_buffers.
                 */
+
+               /* Set up the private state for our streaming buffer read 
callback. */
+               p.blocknum = first_block;
+               p.last_block = last_block;
+
+               pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_DEFAULT,
+                                                                               
          &p,
+                                                                               
          0,
+                                                                               
          NULL,
+                                                                               
          BMR_REL(rel),
+                                                                               
          forkNumber,
+                                                                               
          pg_prewarm_streaming_read_next);
+
                for (block = first_block; block <= last_block; ++block)
                {
                        Buffer          buf;
 
                        CHECK_FOR_INTERRUPTS();
-                       buf = ReadBufferExtended(rel, forkNumber, block, 
RBM_NORMAL, NULL);
+                       buf = pg_streaming_read_buffer_get_next(pgsr, NULL);
                        ReleaseBuffer(buf);
                        ++blocks_done;
                }
+               Assert(pg_streaming_read_buffer_get_next(pgsr, NULL) == 
InvalidBuffer);
+               pg_streaming_read_free(pgsr);
        }
 
        /* Close relation, release lock. */
diff --git a/src/backend/access/transam/xlogutils.c 
b/src/backend/access/transam/xlogutils.c
index aa8667abd1..8775b5789b 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -657,7 +657,7 @@ XLogDropDatabase(Oid dbid)
         * This is unnecessarily heavy-handed, as it will close SMgrRelation
         * objects for other databases as well. DROP DATABASE occurs seldom 
enough
         * that it's not worth introducing a variant of smgrclose for just this
-        * purpose. XXX: Or should we rather leave the smgr entries dangling?
+        * purpose.
         */
        smgrcloseall();
 
diff --git a/src/backend/postmaster/bgwriter.c 
b/src/backend/postmaster/bgwriter.c
index d7d6cc0cd7..13e5376619 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -246,10 +246,12 @@ BackgroundWriterMain(void)
                if (FirstCallSinceLastCheckpoint())
                {
                        /*
-                        * After any checkpoint, close all smgr files.  This is 
so we
-                        * won't hang onto smgr references to deleted files 
indefinitely.
+                        * After any checkpoint, free all smgr objects.  
Otherwise we
+                        * would never do so for dropped relations, as the 
bgwriter does
+                        * not process shared invalidation messages or call
+                        * AtEOXact_SMgr().
                         */
-                       smgrcloseall();
+                       smgrdestroyall();
                }
 
                /*
diff --git a/src/backend/postmaster/checkpointer.c 
b/src/backend/postmaster/checkpointer.c
index 5e949fc885..5d843b6142 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -469,10 +469,12 @@ CheckpointerMain(void)
                                ckpt_performed = CreateRestartPoint(flags);
 
                        /*
-                        * After any checkpoint, close all smgr files.  This is 
so we
-                        * won't hang onto smgr references to deleted files 
indefinitely.
+                        * After any checkpoint, free all smgr objects.  
Otherwise we
+                        * would never do so for dropped relations, as the 
checkpointer
+                        * does not process shared invalidation messages or call
+                        * AtEOXact_SMgr().
                         */
-                       smgrcloseall();
+                       smgrdestroyall();
 
                        /*
                         * Indicate checkpoint completion to any waiting 
backends.
@@ -958,11 +960,8 @@ RequestCheckpoint(int flags)
                 */
                CreateCheckPoint(flags | CHECKPOINT_IMMEDIATE);
 
-               /*
-                * After any checkpoint, close all smgr files.  This is so we 
won't
-                * hang onto smgr references to deleted files indefinitely.
-                */
-               smgrcloseall();
+               /* Free all smgr objects, as CheckpointerMain() normally would. 
*/
+               smgrdestroyall();
 
                return;
        }
diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile
index 8376cdfca2..eec03f6f2b 100644
--- a/src/backend/storage/Makefile
+++ b/src/backend/storage/Makefile
@@ -8,6 +8,6 @@ subdir = src/backend/storage
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS     = buffer file freespace ipc large_object lmgr page smgr sync
+SUBDIRS     = aio buffer file freespace ipc large_object lmgr page smgr sync
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile
new file mode 100644
index 0000000000..bcab44c802
--- /dev/null
+++ b/src/backend/storage/aio/Makefile
@@ -0,0 +1,14 @@
+#
+# Makefile for storage/aio
+#
+# src/backend/storage/aio/Makefile
+#
+
+subdir = src/backend/storage/aio
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = \
+       streaming_read.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/meson.build 
b/src/backend/storage/aio/meson.build
new file mode 100644
index 0000000000..39aef2a84a
--- /dev/null
+++ b/src/backend/storage/aio/meson.build
@@ -0,0 +1,5 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+backend_sources += files(
+  'streaming_read.c',
+)
diff --git a/src/backend/storage/aio/streaming_read.c 
b/src/backend/storage/aio/streaming_read.c
new file mode 100644
index 0000000000..19605090fe
--- /dev/null
+++ b/src/backend/storage/aio/streaming_read.c
@@ -0,0 +1,435 @@
+#include "postgres.h"
+
+#include "storage/streaming_read.h"
+#include "utils/rel.h"
+
+/*
+ * Element type for PgStreamingRead's circular array of block ranges.
+ *
+ * For hits, need_to_complete is false and there is just one block per
+ * range, already pinned and ready for use.
+ *
+ * For misses, need_to_complete is true and buffers[] holds a range of
+ * blocks that are contiguous in storage (though the buffers may not be
+ * contiguous in memory), so we can complete them with a single call to
+ * CompleteReadBuffers().
+ */
+typedef struct PgStreamingReadRange
+{
+       bool            advice_issued;
+       bool            need_complete;
+       BlockNumber blocknum;
+       int                     nblocks;
+       int                     per_buffer_data_index[MAX_BUFFERS_PER_TRANSFER];
+       Buffer          buffers[MAX_BUFFERS_PER_TRANSFER];
+} PgStreamingReadRange;
+
+struct PgStreamingRead
+{
+       int                     max_ios;
+       int                     ios_in_progress;
+       int                     ios_in_progress_trigger;
+       int                     max_pinned_buffers;
+       int                     pinned_buffers;
+       int                     pinned_buffers_trigger;
+       int                     next_tail_buffer;
+       bool            finished;
+       void       *pgsr_private;
+       PgStreamingReadBufferCB callback;
+       BufferAccessStrategy strategy;
+       BufferManagerRelation bmr;
+       ForkNumber      forknum;
+
+       bool            advice_enabled;
+
+       /* Next expected block, for detecting sequential access. */
+       BlockNumber seq_blocknum;
+
+       /* Space for optional per-buffer private data. */
+       size_t          per_buffer_data_size;
+       void       *per_buffer_data;
+       int                     per_buffer_data_next;
+
+       /* Circular buffer of ranges. */
+       int                     size;
+       int                     head;
+       int                     tail;
+       PgStreamingReadRange ranges[FLEXIBLE_ARRAY_MEMBER];
+};
+
+static PgStreamingRead *
+pg_streaming_read_buffer_alloc_internal(int flags,
+                                                                               
void *pgsr_private,
+                                                                               
size_t per_buffer_data_size,
+                                                                               
BufferAccessStrategy strategy)
+{
+       PgStreamingRead *pgsr;
+       int                     size;
+       int                     max_ios;
+       uint32          max_pinned_buffers;
+
+
+       /*
+        * Decide how many assumed I/Os we will allow to run concurrently.  That
+        * is, advice to the kernel to tell it that we will soon read.  This
+        * number also affects how far we look ahead for opportunities to start
+        * more I/Os.
+        */
+       if (flags & PGSR_FLAG_MAINTENANCE)
+               max_ios = maintenance_io_concurrency;
+       else
+               max_ios = effective_io_concurrency;
+
+       /*
+        * The desired level of I/O concurrency controls how far ahead we are
+        * willing to look ahead.  We also clamp it to at least
+        * MAX_BUFFER_PER_TRANFER so that we can have a chance to build up a 
full
+        * sized read, even when max_ios is zero.
+        */
+       max_pinned_buffers = Max(max_ios * 4, MAX_BUFFERS_PER_TRANSFER);
+
+       /*
+        * The *_io_concurrency GUCs, we might have 0.  We want to allow at 
least
+        * one, to keep our gating logic simple.
+        */
+       max_ios = Max(max_ios, 1);
+
+       /*
+        * Don't allow this backend to pin too many buffers.  For now we'll 
apply
+        * the limit for the shared buffer pool and the local buffer pool, 
without
+        * worrying which it is.
+        */
+       LimitAdditionalPins(&max_pinned_buffers);
+       LimitAdditionalLocalPins(&max_pinned_buffers);
+       Assert(max_pinned_buffers > 0);
+
+       /*
+        * pgsr->ranges is a circular buffer.  When it is empty, head == tail.
+        * When it is full, there is an empty element between head and tail.  
Head
+        * can also be empty (nblocks == 0), therefore we need two extra 
elements
+        * for non-occupied ranges, on top of max_pinned_buffers to allow for 
the
+        * maxmimum possible number of occupied ranges of the smallest possible
+        * size of one.
+        */
+       size = max_pinned_buffers + 2;
+
+       pgsr = (PgStreamingRead *)
+               palloc0(offsetof(PgStreamingRead, ranges) +
+                               sizeof(pgsr->ranges[0]) * size);
+
+       pgsr->max_ios = max_ios;
+       pgsr->per_buffer_data_size = per_buffer_data_size;
+       pgsr->max_pinned_buffers = max_pinned_buffers;
+       pgsr->pgsr_private = pgsr_private;
+       pgsr->strategy = strategy;
+       pgsr->size = size;
+
+#ifdef USE_PREFETCH
+
+       /*
+        * This system supports prefetching advice.  As long as direct I/O isn't
+        * enabled, and the caller hasn't promised sequential access, we can use
+        * it.
+        */
+       if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
+               (flags & PGSR_FLAG_SEQUENTIAL) == 0)
+               pgsr->advice_enabled = true;
+#endif
+
+       /*
+        * We want to avoid creating ranges that are smaller than they could be
+        * just because we hit max_pinned_buffers.  We only look ahead when the
+        * number of pinned buffers falls below this trigger number, or put
+        * another way, we stop looking ahead when we wouldn't be able to build 
a
+        * "full sized" range.
+        */
+       pgsr->pinned_buffers_trigger =
+               Max(1, (int) max_pinned_buffers - MAX_BUFFERS_PER_TRANSFER);
+
+       /* Space the callback to store extra data along with each block. */
+       if (per_buffer_data_size)
+               pgsr->per_buffer_data = palloc(per_buffer_data_size * 
max_pinned_buffers);
+
+       return pgsr;
+}
+
+/*
+ * Create a new streaming read object that can be used to perform the
+ * equivalent of a series of ReadBuffer() calls for one fork of one relation.
+ * Internally, it generates larger vectored reads where possible by looking
+ * ahead.
+ */
+PgStreamingRead *
+pg_streaming_read_buffer_alloc(int flags,
+                                                          void *pgsr_private,
+                                                          size_t 
per_buffer_data_size,
+                                                          BufferAccessStrategy 
strategy,
+                                                          
BufferManagerRelation bmr,
+                                                          ForkNumber forknum,
+                                                          
PgStreamingReadBufferCB next_block_cb)
+{
+       PgStreamingRead *result;
+
+       result = pg_streaming_read_buffer_alloc_internal(flags,
+                                                                               
                         pgsr_private,
+                                                                               
                         per_buffer_data_size,
+                                                                               
                         strategy);
+       result->callback = next_block_cb;
+       result->bmr = bmr;
+       result->forknum = forknum;
+
+       return result;
+}
+
+/*
+ * Start building a new range.  This is called after the previous one
+ * reached maximum size, or the callback's next block can't be merged with it.
+ *
+ * Since the previous head range has now reached its full potential size, this
+ * is also a good time to issue 'prefetch' advice, because we know that'll
+ * soon be reading.  In future, we could start an actual I/O here.
+ */
+static PgStreamingReadRange *
+pg_streaming_read_new_range(PgStreamingRead *pgsr)
+{
+       PgStreamingReadRange *head_range;
+
+       head_range = &pgsr->ranges[pgsr->head];
+       Assert(head_range->nblocks > 0);
+
+       /*
+        * If a call to CompleteReadBuffers() will be needed, and we can issue
+        * advice to the kernel to get the read started.  We suppress it if the
+        * access pattern appears to be completely sequential, though, because 
on
+        * some systems that interfers with the kernel's own sequential read 
ahead
+        * heurstics and hurts performance.
+        */
+       if (pgsr->advice_enabled)
+       {
+               BlockNumber blocknum = head_range->blocknum;
+               int                     nblocks = head_range->nblocks;
+
+               if (head_range->need_complete && blocknum != pgsr->seq_blocknum)
+               {
+                       SMgrRelation smgr =
+                               pgsr->bmr.smgr ? pgsr->bmr.smgr :
+                               RelationGetSmgr(pgsr->bmr.rel);
+
+                       Assert(!head_range->advice_issued);
+
+                       smgrprefetch(smgr, pgsr->forknum, blocknum, nblocks);
+
+                       /*
+                        * Count this as an I/O that is concurrently in 
progress, though
+                        * we don't really know if the kernel generates a 
physical I/O.
+                        */
+                       head_range->advice_issued = true;
+                       pgsr->ios_in_progress++;
+               }
+
+               /* Remember the block after this range, for sequence detection. 
*/
+               pgsr->seq_blocknum = blocknum + nblocks;
+       }
+
+       /* Create a new head range.  There must be space. */
+       Assert(pgsr->size > pgsr->max_pinned_buffers);
+       Assert((pgsr->head + 1) % pgsr->size != pgsr->tail);
+       if (++pgsr->head == pgsr->size)
+               pgsr->head = 0;
+       head_range = &pgsr->ranges[pgsr->head];
+       head_range->nblocks = 0;
+
+       return head_range;
+}
+
+static void
+pg_streaming_read_look_ahead(PgStreamingRead *pgsr)
+{
+       /*
+        * If we're finished or can't start more I/O, then don't look ahead.
+        */
+       if (pgsr->finished || pgsr->ios_in_progress == pgsr->max_ios)
+               return;
+
+       /*
+        * We'll also wait until the number of pinned buffers falls below our
+        * trigger level, so that we have the chance to create a full range.
+        */
+       if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger)
+               return;
+
+       do
+       {
+               BufferManagerRelation bmr;
+               ForkNumber      forknum;
+               BlockNumber blocknum;
+               Buffer          buffer;
+               bool            found;
+               bool            need_complete;
+               PgStreamingReadRange *head_range;
+               void       *per_buffer_data;
+
+               /* Do we have a full-sized range? */
+               head_range = &pgsr->ranges[pgsr->head];
+               if (head_range->nblocks == lengthof(head_range->buffers))
+               {
+                       Assert(head_range->need_complete);
+                       head_range = pg_streaming_read_new_range(pgsr);
+
+                       /*
+                        * Give up now if I/O is saturated, or we wouldn't be 
able form
+                        * another full range after this due to the pin limit.
+                        */
+                       if (pgsr->pinned_buffers >= 
pgsr->pinned_buffers_trigger ||
+                               pgsr->ios_in_progress == pgsr->max_ios)
+                               break;
+               }
+
+               per_buffer_data = (char *) pgsr->per_buffer_data +
+                       pgsr->per_buffer_data_size * pgsr->per_buffer_data_next;
+
+               /* Find out which block the callback wants to read next. */
+               blocknum = pgsr->callback(pgsr, pgsr->pgsr_private, 
per_buffer_data);
+               if (blocknum == InvalidBlockNumber)
+               {
+                       pgsr->finished = true;
+                       break;
+               }
+               bmr = pgsr->bmr;
+               forknum = pgsr->forknum;
+
+               Assert(pgsr->pinned_buffers < pgsr->max_pinned_buffers);
+
+               buffer = PrepareReadBuffer(bmr,
+                                                                  forknum,
+                                                                  blocknum,
+                                                                  
pgsr->strategy,
+                                                                  &found);
+               pgsr->pinned_buffers++;
+
+               need_complete = !found;
+
+               /* Is there a head range that we can't extend? */
+               head_range = &pgsr->ranges[pgsr->head];
+               if (head_range->nblocks > 0 &&
+                       (!need_complete ||
+                        !head_range->need_complete ||
+                        head_range->blocknum + head_range->nblocks != 
blocknum))
+               {
+                       /* Yes, time to start building a new one. */
+                       head_range = pg_streaming_read_new_range(pgsr);
+                       Assert(head_range->nblocks == 0);
+               }
+
+               if (head_range->nblocks == 0)
+               {
+                       /* Initialize a new range beginning at this block. */
+                       head_range->blocknum = blocknum;
+                       head_range->need_complete = need_complete;
+                       head_range->advice_issued = false;
+               }
+               else
+               {
+                       /* We can extend an existing range by one block. */
+                       Assert(head_range->blocknum + head_range->nblocks == 
blocknum);
+                       Assert(head_range->need_complete);
+               }
+
+               head_range->per_buffer_data_index[head_range->nblocks] = 
pgsr->per_buffer_data_next++;
+               head_range->buffers[head_range->nblocks] = buffer;
+               head_range->nblocks++;
+
+               if (pgsr->per_buffer_data_next == pgsr->max_pinned_buffers)
+                       pgsr->per_buffer_data_next = 0;
+
+       } while (pgsr->pinned_buffers < pgsr->max_pinned_buffers &&
+                        pgsr->ios_in_progress < pgsr->max_ios);
+
+       if (pgsr->ranges[pgsr->head].nblocks > 0)
+               pg_streaming_read_new_range(pgsr);
+}
+
+Buffer
+pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void 
**per_buffer_data)
+{
+       pg_streaming_read_look_ahead(pgsr);
+
+       /* See if we have one buffer to return. */
+       while (pgsr->tail != pgsr->head)
+       {
+               PgStreamingReadRange *tail_range;
+
+               tail_range = &pgsr->ranges[pgsr->tail];
+
+               /*
+                * Do we need to perform an I/O before returning the buffers 
from this
+                * range?
+                */
+               if (tail_range->need_complete)
+               {
+                       CompleteReadBuffers(pgsr->bmr,
+                                                               
tail_range->buffers,
+                                                               pgsr->forknum,
+                                                               
tail_range->blocknum,
+                                                               
tail_range->nblocks,
+                                                               false,
+                                                               pgsr->strategy);
+                       tail_range->need_complete = false;
+
+                       /*
+                        * We don't really know if the kernel generated an 
physical I/O
+                        * when we issued advice, let alone when it finished, 
but it has
+                        * certainly finished after a read call returns.
+                        */
+                       if (tail_range->advice_issued)
+                               pgsr->ios_in_progress--;
+               }
+
+               /* Are there more buffers available in this range? */
+               if (pgsr->next_tail_buffer < tail_range->nblocks)
+               {
+                       int                     buffer_index;
+                       Buffer          buffer;
+
+                       buffer_index = pgsr->next_tail_buffer++;
+                       buffer = tail_range->buffers[buffer_index];
+
+                       Assert(BufferIsValid(buffer));
+
+                       /* We are giving away ownership of this pinned buffer. 
*/
+                       Assert(pgsr->pinned_buffers > 0);
+                       pgsr->pinned_buffers--;
+
+                       if (per_buffer_data)
+                               *per_buffer_data = (char *) 
pgsr->per_buffer_data +
+                                       
tail_range->per_buffer_data_index[buffer_index] *
+                                       pgsr->per_buffer_data_size;
+
+                       return buffer;
+               }
+
+               /* Advance tail to next range, if there is one. */
+               if (++pgsr->tail == pgsr->size)
+                       pgsr->tail = 0;
+               pgsr->next_tail_buffer = 0;
+       }
+
+       Assert(pgsr->pinned_buffers == 0);
+
+       return InvalidBuffer;
+}
+
+void
+pg_streaming_read_free(PgStreamingRead *pgsr)
+{
+       Buffer          buffer;
+
+       /* Stop looking ahead, and unpin anything that wasn't consumed. */
+       pgsr->finished = true;
+       while ((buffer = pg_streaming_read_buffer_get_next(pgsr, NULL)) != 
InvalidBuffer)
+               ReleaseBuffer(buffer);
+
+       if (pgsr->per_buffer_data)
+               pfree(pgsr->per_buffer_data);
+       pfree(pgsr);
+}
diff --git a/src/backend/storage/buffer/bufmgr.c 
b/src/backend/storage/buffer/bufmgr.c
index 7d601bef6d..2157a97b97 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -472,7 +472,7 @@ ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
 )
 
 
-static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence,
+static Buffer ReadBuffer_common(BufferManagerRelation bmr,
                                                                ForkNumber 
forkNum, BlockNumber blockNum,
                                                                ReadBufferMode 
mode, BufferAccessStrategy strategy,
                                                                bool *hit);
@@ -501,7 +501,7 @@ static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
 static int     SyncOneBuffer(int buf_id, bool skip_recently_used,
                                                  WritebackContext *wb_context);
 static void WaitIO(BufferDesc *buf);
-static bool StartBufferIO(BufferDesc *buf, bool forInput);
+static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
 static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
                                                          uint32 set_flag_bits, 
bool forget_owner);
 static void AbortBufferIO(Buffer buffer);
@@ -795,15 +795,9 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, 
BlockNumber blockNum,
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                 errmsg("cannot access temporary tables of 
other sessions")));
 
-       /*
-        * Read the buffer, and update pgstat counters to reflect a cache hit or
-        * miss.
-        */
-       pgstat_count_buffer_read(reln);
-       buf = ReadBuffer_common(RelationGetSmgr(reln), 
reln->rd_rel->relpersistence,
+       buf = ReadBuffer_common(BMR_REL(reln),
                                                        forkNum, blockNum, 
mode, strategy, &hit);
-       if (hit)
-               pgstat_count_buffer_hit(reln);
+
        return buf;
 }
 
@@ -827,8 +821,9 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, 
ForkNumber forkNum,
 
        SMgrRelation smgr = smgropen(rlocator, InvalidBackendId);
 
-       return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT :
-                                                        
RELPERSISTENCE_UNLOGGED, forkNum, blockNum,
+       return ReadBuffer_common(BMR_SMGR(smgr, permanent ? 
RELPERSISTENCE_PERMANENT :
+                                                                         
RELPERSISTENCE_UNLOGGED),
+                                                        forkNum, blockNum,
                                                         mode, strategy, &hit);
 }
 
@@ -1002,7 +997,7 @@ ExtendBufferedRelTo(BufferManagerRelation bmr,
                bool            hit;
 
                Assert(extended_by == 0);
-               buffer = ReadBuffer_common(bmr.smgr, bmr.relpersistence,
+               buffer = ReadBuffer_common(bmr,
                                                                   fork, 
extend_to - 1, mode, strategy,
                                                                   &hit);
        }
@@ -1016,18 +1011,11 @@ ExtendBufferedRelTo(BufferManagerRelation bmr,
  * *hit is set to true if the request was satisfied from shared buffer cache.
  */
 static Buffer
-ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
+ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum,
                                  BlockNumber blockNum, ReadBufferMode mode,
                                  BufferAccessStrategy strategy, bool *hit)
 {
-       BufferDesc *bufHdr;
-       Block           bufBlock;
-       bool            found;
-       IOContext       io_context;
-       IOObject        io_object;
-       bool            isLocalBuf = SmgrIsTemp(smgr);
-
-       *hit = false;
+       Buffer          buffer;
 
        /*
         * Backward compatibility path, most code should use ExtendBufferedRel()
@@ -1046,175 +1034,339 @@ ReadBuffer_common(SMgrRelation smgr, char 
relpersistence, ForkNumber forkNum,
                if (mode == RBM_ZERO_AND_LOCK || mode == 
RBM_ZERO_AND_CLEANUP_LOCK)
                        flags |= EB_LOCK_FIRST;
 
-               return ExtendBufferedRel(BMR_SMGR(smgr, relpersistence),
-                                                                forkNum, 
strategy, flags);
+               *hit = false;
+
+               return ExtendBufferedRel(bmr, forkNum, strategy, flags);
        }
 
-       TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
-                                                                          
smgr->smgr_rlocator.locator.spcOid,
-                                                                          
smgr->smgr_rlocator.locator.dbOid,
-                                                                          
smgr->smgr_rlocator.locator.relNumber,
-                                                                          
smgr->smgr_rlocator.backend);
+       buffer = PrepareReadBuffer(bmr,
+                                                          forkNum,
+                                                          blockNum,
+                                                          strategy,
+                                                          hit);
+
+       /* At this point we do NOT hold any locks. */
 
+       if (mode == RBM_ZERO_AND_CLEANUP_LOCK || mode == RBM_ZERO_AND_LOCK)
+       {
+               /* if we just want zeroes and a lock, we're done */
+               ZeroBuffer(buffer, mode);
+       }
+       else if (!*hit)
+       {
+               /* we might need to perform I/O */
+               CompleteReadBuffers(bmr,
+                                                       &buffer,
+                                                       forkNum,
+                                                       blockNum,
+                                                       1,
+                                                       mode == 
RBM_ZERO_ON_ERROR,
+                                                       strategy);
+       }
+
+       return buffer;
+}
+
+/*
+ * Prepare to read a block.  The buffer is pinned.  If this is a 'hit', then
+ * the returned buffer can be used immediately.  Otherwise, a physical read
+ * should be completed with CompleteReadBuffers(), or the buffer should be
+ * zeroed with ZeroBuffer().  PrepareReadBuffer() followed by
+ * CompleteReadBuffers() or ZeroBuffer() is equivalent to ReadBuffer(), but
+ * the caller has the opportunity to combine reads of multiple neighboring
+ * blocks into one CompleteReadBuffers() call.
+ *
+ * *foundPtr is set to true for a hit, and false for a miss.
+ */
+Buffer
+PrepareReadBuffer(BufferManagerRelation bmr,
+                                 ForkNumber forkNum,
+                                 BlockNumber blockNum,
+                                 BufferAccessStrategy strategy,
+                                 bool *foundPtr)
+{
+       BufferDesc *bufHdr;
+       bool            isLocalBuf;
+       IOContext       io_context;
+       IOObject        io_object;
+
+       Assert(blockNum != P_NEW);
+
+       if (bmr.rel)
+       {
+               bmr.smgr = RelationGetSmgr(bmr.rel);
+               bmr.relpersistence = bmr.rel->rd_rel->relpersistence;
+       }
+
+       isLocalBuf = SmgrIsTemp(bmr.smgr);
        if (isLocalBuf)
        {
-               /*
-                * We do not use a BufferAccessStrategy for I/O of temporary 
tables.
-                * However, in some cases, the "strategy" may not be NULL, so 
we can't
-                * rely on IOContextForStrategy() to set the right IOContext 
for us.
-                * This may happen in cases like CREATE TEMPORARY TABLE AS...
-                */
                io_context = IOCONTEXT_NORMAL;
                io_object = IOOBJECT_TEMP_RELATION;
-               bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
-               if (found)
-                       pgBufferUsage.local_blks_hit++;
-               else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
-                                mode == RBM_ZERO_ON_ERROR)
-                       pgBufferUsage.local_blks_read++;
        }
        else
        {
-               /*
-                * lookup the buffer.  IO_IN_PROGRESS is set if the requested 
block is
-                * not currently in memory.
-                */
                io_context = IOContextForStrategy(strategy);
                io_object = IOOBJECT_RELATION;
-               bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
-                                                        strategy, &found, 
io_context);
-               if (found)
-                       pgBufferUsage.shared_blks_hit++;
-               else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
-                                mode == RBM_ZERO_ON_ERROR)
-                       pgBufferUsage.shared_blks_read++;
        }
 
-       /* At this point we do NOT hold any locks. */
+       TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
+                                                                          
bmr.smgr->smgr_rlocator.locator.spcOid,
+                                                                          
bmr.smgr->smgr_rlocator.locator.dbOid,
+                                                                          
bmr.smgr->smgr_rlocator.locator.relNumber,
+                                                                          
bmr.smgr->smgr_rlocator.backend);
 
-       /* if it was already in the buffer pool, we're done */
-       if (found)
+       ResourceOwnerEnlarge(CurrentResourceOwner);
+       if (isLocalBuf)
+       {
+               bufHdr = LocalBufferAlloc(bmr.smgr, forkNum, blockNum, 
foundPtr);
+               if (*foundPtr)
+                       pgBufferUsage.local_blks_hit++;
+       }
+       else
+       {
+               bufHdr = BufferAlloc(bmr.smgr, bmr.relpersistence, forkNum, 
blockNum,
+                                                        strategy, foundPtr, 
io_context);
+               if (*foundPtr)
+                       pgBufferUsage.shared_blks_hit++;
+       }
+       if (bmr.rel)
+       {
+               /*
+                * While pgBufferUsage's "read" counter isn't bumped unless we 
reach
+                * CompleteReadBuffers() (so, not for hits, and not for buffers 
that
+                * are zeroed instead), the per-relation stats always count 
them.
+                */
+               pgstat_count_buffer_read(bmr.rel);
+               if (*foundPtr)
+                       pgstat_count_buffer_hit(bmr.rel);
+       }
+       if (*foundPtr)
        {
-               /* Just need to update stats before we exit */
-               *hit = true;
                VacuumPageHit++;
                pgstat_count_io_op(io_object, io_context, IOOP_HIT);
-
                if (VacuumCostActive)
                        VacuumCostBalance += VacuumCostPageHit;
 
                TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-                                                                               
  smgr->smgr_rlocator.locator.spcOid,
-                                                                               
  smgr->smgr_rlocator.locator.dbOid,
-                                                                               
  smgr->smgr_rlocator.locator.relNumber,
-                                                                               
  smgr->smgr_rlocator.backend,
-                                                                               
  found);
+                                                                               
  bmr.smgr->smgr_rlocator.locator.spcOid,
+                                                                               
  bmr.smgr->smgr_rlocator.locator.dbOid,
+                                                                               
  bmr.smgr->smgr_rlocator.locator.relNumber,
+                                                                               
  bmr.smgr->smgr_rlocator.backend,
+                                                                               
  true);
+       }
 
-               /*
-                * In RBM_ZERO_AND_LOCK mode the caller expects the page to be 
locked
-                * on return.
-                */
-               if (!isLocalBuf)
-               {
-                       if (mode == RBM_ZERO_AND_LOCK)
-                               
LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
-                                                         LW_EXCLUSIVE);
-                       else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
-                               
LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
-               }
+       return BufferDescriptorGetBuffer(bufHdr);
+}
 
-               return BufferDescriptorGetBuffer(bufHdr);
+static inline bool
+CompleteReadBuffersCanStartIO(Buffer buffer, bool nowait)
+{
+       if (BufferIsLocal(buffer))
+       {
+               BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+
+               return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
        }
+       else
+               return StartBufferIO(GetBufferDescriptor(buffer - 1), true, 
nowait);
+}
 
-       /*
-        * if we have gotten to this point, we have allocated a buffer for the
-        * page but its contents are not yet valid.  IO_IN_PROGRESS is set for 
it,
-        * if it's a shared buffer.
-        */
-       Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID));       /* 
spinlock not needed */
+/*
+ * Complete a set reads prepared with PrepareReadBuffers().  The buffers must
+ * cover a cluster of neighboring block numbers.
+ *
+ * Typically this performs one physical vector read covering the block range,
+ * but if some of the buffers have already been read in the meantime by any
+ * backend, zero or multiple reads may be performed.
+ */
+void
+CompleteReadBuffers(BufferManagerRelation bmr,
+                                       Buffer *buffers,
+                                       ForkNumber forknum,
+                                       BlockNumber blocknum,
+                                       int nblocks,
+                                       bool zero_on_error,
+                                       BufferAccessStrategy strategy)
+{
+       bool            isLocalBuf;
+       IOContext       io_context;
+       IOObject        io_object;
 
-       bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : 
BufHdrGetBlock(bufHdr);
+       if (bmr.rel)
+       {
+               bmr.smgr = RelationGetSmgr(bmr.rel);
+               bmr.relpersistence = bmr.rel->rd_rel->relpersistence;
+       }
+
+       isLocalBuf = SmgrIsTemp(bmr.smgr);
+       if (isLocalBuf)
+       {
+               io_context = IOCONTEXT_NORMAL;
+               io_object = IOOBJECT_TEMP_RELATION;
+       }
+       else
+       {
+               io_context = IOContextForStrategy(strategy);
+               io_object = IOOBJECT_RELATION;
+       }
 
        /*
-        * Read in the page, unless the caller intends to overwrite it and just
-        * wants us to allocate a buffer.
+        * We count all these blocks as read by this backend.  This is 
traditional
+        * behavior, but might turn out to be not true if we find that someone
+        * else has beaten us and completed the read of some of these blocks.  
In
+        * that case the system globally double-counts, but we traditionally 
don't
+        * count this as a "hit", and we don't have a separate counter for 
"miss,
+        * but another backend completed the read".
         */
-       if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
-               MemSet((char *) bufBlock, 0, BLCKSZ);
+       if (isLocalBuf)
+               pgBufferUsage.local_blks_read += nblocks;
        else
+               pgBufferUsage.shared_blks_read += nblocks;
+
+       for (int i = 0; i < nblocks; ++i)
        {
-               instr_time      io_start = 
pgstat_prepare_io_time(track_io_timing);
+               int                     io_buffers_len;
+               Buffer          io_buffers[MAX_BUFFERS_PER_TRANSFER];
+               void       *io_pages[MAX_BUFFERS_PER_TRANSFER];
+               instr_time      io_start;
+               BlockNumber io_first_block;
 
-               smgrread(smgr, forkNum, blockNum, bufBlock);
+#ifdef USE_ASSERT_CHECKING
 
-               pgstat_count_io_op_time(io_object, io_context,
-                                                               IOOP_READ, 
io_start, 1);
+               /*
+                * We could get all the information from buffer headers, but it 
can be
+                * expensive to access buffer header cache lines so we make the 
caller
+                * provide all the information we need, and assert that it is
+                * consistent.
+                */
+               {
+                       RelFileLocator xlocator;
+                       ForkNumber      xforknum;
+                       BlockNumber xblocknum;
+
+                       BufferGetTag(buffers[i], &xlocator, &xforknum, 
&xblocknum);
+                       
Assert(RelFileLocatorEquals(bmr.smgr->smgr_rlocator.locator, xlocator));
+                       Assert(xforknum == forknum);
+                       Assert(xblocknum == blocknum + i);
+               }
+#endif
+
+               /*
+                * Skip this block if someone else has already completed it.  
If an
+                * I/O is already in progress in another backend, this will 
wait for
+                * the outcome: either done, or something went wrong and we will
+                * retry.
+                */
+               if (!CompleteReadBuffersCanStartIO(buffers[i], false))
+               {
+                       /*
+                        * Report this as a 'hit' for this backend, even though 
it must
+                        * have started out as a miss in PrepareReadBuffer().
+                        */
+                       TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i,
+                                                                               
          bmr.smgr->smgr_rlocator.locator.spcOid,
+                                                                               
          bmr.smgr->smgr_rlocator.locator.dbOid,
+                                                                               
          bmr.smgr->smgr_rlocator.locator.relNumber,
+                                                                               
          bmr.smgr->smgr_rlocator.backend,
+                                                                               
          true);
+                       continue;
+               }
+
+               /* We found a buffer that we need to read in. */
+               io_buffers[0] = buffers[i];
+               io_pages[0] = BufferGetBlock(buffers[i]);
+               io_first_block = blocknum + i;
+               io_buffers_len = 1;
 
-               /* check for garbage data */
-               if (!PageIsVerifiedExtended((Page) bufBlock, blockNum,
-                                                                       
PIV_LOG_WARNING | PIV_REPORT_STAT))
+               /*
+                * How many neighboring-on-disk blocks can we can scatter-read 
into
+                * other buffers at the same time?  In this case we don't wait 
if we
+                * see an I/O already in progress.  We already hold 
BM_IO_IN_PROGRESS
+                * for the head block, so we should get on with that I/O as 
soon as
+                * possible.  We'll come back to this block again, above.
+                */
+               while ((i + 1) < nblocks &&
+                          CompleteReadBuffersCanStartIO(buffers[i + 1], true))
+               {
+                       /* Must be consecutive block numbers. */
+                       Assert(BufferGetBlockNumber(buffers[i + 1]) ==
+                                  BufferGetBlockNumber(buffers[i]) + 1);
+
+                       io_buffers[io_buffers_len] = buffers[++i];
+                       io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
+               }
+
+               io_start = pgstat_prepare_io_time(track_io_timing);
+               smgrreadv(bmr.smgr, forknum, io_first_block, io_pages, 
io_buffers_len);
+               pgstat_count_io_op_time(io_object, io_context, IOOP_READ, 
io_start,
+                                                               io_buffers_len);
+
+               /* Verify each block we read, and terminate the I/O. */
+               for (int j = 0; j < io_buffers_len; ++j)
                {
-                       if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
+                       BufferDesc *bufHdr;
+                       Block           bufBlock;
+
+                       if (isLocalBuf)
                        {
-                               ereport(WARNING,
-                                               
(errcode(ERRCODE_DATA_CORRUPTED),
-                                                errmsg("invalid page in block 
%u of relation %s; zeroing out page",
-                                                               blockNum,
-                                                               
relpath(smgr->smgr_rlocator, forkNum))));
-                               MemSet((char *) bufBlock, 0, BLCKSZ);
+                               bufHdr = 
GetLocalBufferDescriptor(-io_buffers[j] - 1);
+                               bufBlock = LocalBufHdrGetBlock(bufHdr);
                        }
                        else
-                               ereport(ERROR,
-                                               
(errcode(ERRCODE_DATA_CORRUPTED),
-                                                errmsg("invalid page in block 
%u of relation %s",
-                                                               blockNum,
-                                                               
relpath(smgr->smgr_rlocator, forkNum))));
-               }
-       }
-
-       /*
-        * In RBM_ZERO_AND_LOCK / RBM_ZERO_AND_CLEANUP_LOCK mode, grab the 
buffer
-        * content lock before marking the page as valid, to make sure that no
-        * other backend sees the zeroed page before the caller has had a chance
-        * to initialize it.
-        *
-        * Since no-one else can be looking at the page contents yet, there is 
no
-        * difference between an exclusive lock and a cleanup-strength lock. 
(Note
-        * that we cannot use LockBuffer() or LockBufferForCleanup() here, 
because
-        * they assert that the buffer is already valid.)
-        */
-       if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
-               !isLocalBuf)
-       {
-               LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), 
LW_EXCLUSIVE);
-       }
+                       {
+                               bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
+                               bufBlock = BufHdrGetBlock(bufHdr);
+                       }
 
-       if (isLocalBuf)
-       {
-               /* Only need to adjust flags */
-               uint32          buf_state = pg_atomic_read_u32(&bufHdr->state);
+                       /* check for garbage data */
+                       if (!PageIsVerifiedExtended((Page) bufBlock, 
io_first_block + j,
+                                                                               
PIV_LOG_WARNING | PIV_REPORT_STAT))
+                       {
+                               if (zero_on_error || zero_damaged_pages)
+                               {
+                                       ereport(WARNING,
+                                                       
(errcode(ERRCODE_DATA_CORRUPTED),
+                                                        errmsg("invalid page 
in block %u of relation %s; zeroing out page",
+                                                                       
io_first_block + j,
+                                                                       
relpath(bmr.smgr->smgr_rlocator, forknum))));
+                                       memset(bufBlock, 0, BLCKSZ);
+                               }
+                               else
+                                       ereport(ERROR,
+                                                       
(errcode(ERRCODE_DATA_CORRUPTED),
+                                                        errmsg("invalid page 
in block %u of relation %s",
+                                                                       
io_first_block + j,
+                                                                       
relpath(bmr.smgr->smgr_rlocator, forknum))));
+                       }
 
-               buf_state |= BM_VALID;
-               pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
-       }
-       else
-       {
-               /* Set BM_VALID, terminate IO, and wake up any waiters */
-               TerminateBufferIO(bufHdr, false, BM_VALID, true);
-       }
+                       /* Terminate I/O and set BM_VALID. */
+                       if (isLocalBuf)
+                       {
+                               uint32          buf_state = 
pg_atomic_read_u32(&bufHdr->state);
 
-       VacuumPageMiss++;
-       if (VacuumCostActive)
-               VacuumCostBalance += VacuumCostPageMiss;
+                               buf_state |= BM_VALID;
+                               pg_atomic_unlocked_write_u32(&bufHdr->state, 
buf_state);
+                       }
+                       else
+                       {
+                               /* Set BM_VALID, terminate IO, and wake up any 
waiters */
+                               TerminateBufferIO(bufHdr, false, BM_VALID, 
true);
+                       }
 
-       TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-                                                                         
smgr->smgr_rlocator.locator.spcOid,
-                                                                         
smgr->smgr_rlocator.locator.dbOid,
-                                                                         
smgr->smgr_rlocator.locator.relNumber,
-                                                                         
smgr->smgr_rlocator.backend,
-                                                                         
found);
+                       /* Report I/Os as completing individually. */
+                       TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, 
io_first_block + j,
+                                                                               
          bmr.smgr->smgr_rlocator.locator.spcOid,
+                                                                               
          bmr.smgr->smgr_rlocator.locator.dbOid,
+                                                                               
          bmr.smgr->smgr_rlocator.locator.relNumber,
+                                                                               
          bmr.smgr->smgr_rlocator.backend,
+                                                                               
          false);
+               }
 
-       return BufferDescriptorGetBuffer(bufHdr);
+               VacuumPageMiss += io_buffers_len;
+               if (VacuumCostActive)
+                       VacuumCostBalance += VacuumCostPageMiss * 
io_buffers_len;
+       }
 }
 
 /*
@@ -1228,11 +1380,8 @@ ReadBuffer_common(SMgrRelation smgr, char 
relpersistence, ForkNumber forkNum,
  *
  * The returned buffer is pinned and is already marked as holding the
  * desired page.  If it already did have the desired page, *foundPtr is
- * set true.  Otherwise, *foundPtr is set false and the buffer is marked
- * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
- *
- * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
- * we keep it for simplicity in ReadBuffer.
+ * set true.  Otherwise, *foundPtr is set false.  A read should be
+ * performed with CompleteReadBuffers().
  *
  * io_context is passed as an output parameter to avoid calling
  * IOContextForStrategy() when there is a shared buffers hit and no IO
@@ -1291,19 +1440,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, 
ForkNumber forkNum,
                {
                        /*
                         * We can only get here if (a) someone else is still 
reading in
-                        * the page, or (b) a previous read attempt failed.  We 
have to
-                        * wait for any active read attempt to finish, and then 
set up our
-                        * own read attempt if the page is still not BM_VALID.
-                        * StartBufferIO does it all.
+                        * the page, (b) a previous read attempt failed, or (c) 
someone
+                        * called PrepareReadBuffer() but not yet 
CompleteReadBuffers().
                         */
-                       if (StartBufferIO(buf, true))
-                       {
-                               /*
-                                * If we get here, previous attempts to read 
the buffer must
-                                * have failed ... but we shall bravely try 
again.
-                                */
-                               *foundPtr = false;
-                       }
+                       *foundPtr = false;
                }
 
                return buf;
@@ -1368,19 +1508,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, 
ForkNumber forkNum,
                {
                        /*
                         * We can only get here if (a) someone else is still 
reading in
-                        * the page, or (b) a previous read attempt failed.  We 
have to
-                        * wait for any active read attempt to finish, and then 
set up our
-                        * own read attempt if the page is still not BM_VALID.
-                        * StartBufferIO does it all.
+                        * the page, (b) a previous read attempt failed, or (c) 
someone
+                        * called PrepareReadBuffer() but not yet 
CompleteReadBuffers().
                         */
-                       if (StartBufferIO(existing_buf_hdr, true))
-                       {
-                               /*
-                                * If we get here, previous attempts to read 
the buffer must
-                                * have failed ... but we shall bravely try 
again.
-                                */
-                               *foundPtr = false;
-                       }
+                       *foundPtr = false;
                }
 
                return existing_buf_hdr;
@@ -1412,15 +1543,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, 
ForkNumber forkNum,
        LWLockRelease(newPartitionLock);
 
        /*
-        * Buffer contents are currently invalid.  Try to obtain the right to
-        * start I/O.  If StartBufferIO returns false, then someone else managed
-        * to read it before we did, so there's nothing left for BufferAlloc() 
to
-        * do.
+        * Buffer contents are currently invalid.
         */
-       if (StartBufferIO(victim_buf_hdr, true))
-               *foundPtr = false;
-       else
-               *foundPtr = true;
+       *foundPtr = false;
 
        return victim_buf_hdr;
 }
@@ -1774,7 +1899,7 @@ again:
  * pessimistic, but outside of toy-sized shared_buffers it should allow
  * sufficient pins.
  */
-static void
+void
 LimitAdditionalPins(uint32 *additional_pins)
 {
        uint32          max_backends;
@@ -2043,7 +2168,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 
                                buf_state &= ~BM_VALID;
                                UnlockBufHdr(existing_hdr, buf_state);
-                       } while (!StartBufferIO(existing_hdr, true));
+                       } while (!StartBufferIO(existing_hdr, true, false));
                }
                else
                {
@@ -2066,7 +2191,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
                        LWLockRelease(partition_lock);
 
                        /* XXX: could combine the locked operations in it with 
the above */
-                       StartBufferIO(victim_buf_hdr, true);
+                       StartBufferIO(victim_buf_hdr, true, false);
                }
        }
 
@@ -2381,7 +2506,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
        else
        {
                /*
-                * If we previously pinned the buffer, it must surely be valid.
+                * If we previously pinned the buffer, it is likely to be 
valid, but
+                * it may not be if PrepareReadBuffer() was called and
+                * CompleteReadBuffers() hasn't been called yet.  We'll check by
+                * loading the flags without locking.  This is racy, but it's 
OK to
+                * return false spuriously: when CompleteReadBuffers() calls
+                * StartBufferIO(), it'll see that it's now valid.
                 *
                 * Note: We deliberately avoid a Valgrind client request here.
                 * Individual access methods can optionally superimpose buffer 
page
@@ -2390,7 +2520,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
                 * that the buffer page is legitimately non-accessible here.  We
                 * cannot meddle with that.
                 */
-               result = true;
+               result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0;
        }
 
        ref->refcount++;
@@ -3458,7 +3588,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject 
io_object,
         * someone else flushed the buffer before we could, so we need not do
         * anything.
         */
-       if (!StartBufferIO(buf, false))
+       if (!StartBufferIO(buf, false, false))
                return;
 
        /* Setup error traceback support for ereport() */
@@ -4845,6 +4975,46 @@ ConditionalLockBuffer(Buffer buffer)
                                                                        
LW_EXCLUSIVE);
 }
 
+/*
+ * Zero a buffer, and lock it as RBM_ZERO_AND_LOCK or
+ * RBM_ZERO_AND_CLEANUP_LOCK would.  The buffer must be already pinned.  It
+ * does not have to be valid, but it is valid and locked on return.
+ */
+void
+ZeroBuffer(Buffer buffer, ReadBufferMode mode)
+{
+       BufferDesc *bufHdr;
+       uint32          buf_state;
+
+       Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
+
+       if (BufferIsLocal(buffer))
+               bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+       else
+       {
+               bufHdr = GetBufferDescriptor(buffer - 1);
+               if (mode == RBM_ZERO_AND_LOCK)
+                       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+               else
+                       LockBufferForCleanup(buffer);
+       }
+
+       memset(BufferGetPage(buffer), 0, BLCKSZ);
+
+       if (BufferIsLocal(buffer))
+       {
+               buf_state = pg_atomic_read_u32(&bufHdr->state);
+               buf_state |= BM_VALID;
+               pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+       }
+       else
+       {
+               buf_state = LockBufHdr(bufHdr);
+               buf_state |= BM_VALID;
+               UnlockBufHdr(bufHdr, buf_state);
+       }
+}
+
 /*
  * Verify that this backend is pinning the buffer exactly once.
  *
@@ -5197,9 +5367,15 @@ WaitIO(BufferDesc *buf)
  *
  * Returns true if we successfully marked the buffer as I/O busy,
  * false if someone else already did the work.
+ *
+ * If nowait is true, then we don't wait for an I/O to be finished by another
+ * backend.  In that case, false indicates either that the I/O was already
+ * finished, or is still in progress.  This is useful for callers that want to
+ * find out if they can perform the I/O as part of a larger operation, without
+ * waiting for the answer or distinguishing the reasons why not.
  */
 static bool
-StartBufferIO(BufferDesc *buf, bool forInput)
+StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 {
        uint32          buf_state;
 
@@ -5212,6 +5388,8 @@ StartBufferIO(BufferDesc *buf, bool forInput)
                if (!(buf_state & BM_IO_IN_PROGRESS))
                        break;
                UnlockBufHdr(buf, buf_state);
+               if (nowait)
+                       return false;
                WaitIO(buf);
        }
 
diff --git a/src/backend/storage/buffer/localbuf.c 
b/src/backend/storage/buffer/localbuf.c
index 1be4f4f8da..717b8f58da 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -109,10 +109,9 @@ PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
  * LocalBufferAlloc -
  *       Find or create a local buffer for the given page of the given 
relation.
  *
- * API is similar to bufmgr.c's BufferAlloc, except that we do not need
- * to do any locking since this is all local.   Also, IO_IN_PROGRESS
- * does not get set.  Lastly, we support only default access strategy
- * (hence, usage_count is always advanced).
+ * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
+ * any locking since this is all local.  We support only default access
+ * strategy (hence, usage_count is always advanced).
  */
 BufferDesc *
 LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
@@ -288,7 +287,7 @@ GetLocalVictimBuffer(void)
 }
 
 /* see LimitAdditionalPins() */
-static void
+void
 LimitAdditionalLocalPins(uint32 *additional_pins)
 {
        uint32          max_pins;
@@ -298,9 +297,10 @@ LimitAdditionalLocalPins(uint32 *additional_pins)
 
        /*
         * In contrast to LimitAdditionalPins() other backends don't play a role
-        * here. We can allow up to NLocBuffer pins in total.
+        * here. We can allow up to NLocBuffer pins in total, but it might not 
be
+        * initialized yet so read num_temp_buffers.
         */
-       max_pins = (NLocBuffer - NLocalPinnedBuffers);
+       max_pins = (num_temp_buffers - NLocalPinnedBuffers);
 
        if (*additional_pins >= max_pins)
                *additional_pins = max_pins;
diff --git a/src/backend/storage/meson.build b/src/backend/storage/meson.build
index 40345bdca2..739d13293f 100644
--- a/src/backend/storage/meson.build
+++ b/src/backend/storage/meson.build
@@ -1,5 +1,6 @@
 # Copyright (c) 2022-2024, PostgreSQL Global Development Group
 
+subdir('aio')
 subdir('buffer')
 subdir('file')
 subdir('freespace')
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 563a0be5c7..0d7272e796 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -147,7 +147,9 @@ smgrshutdown(int code, Datum arg)
 /*
  * smgropen() -- Return an SMgrRelation object, creating it if need be.
  *
- * This does not attempt to actually open the underlying file.
+ * This does not attempt to actually open the underlying files.  The returned
+ * object remains valid at least until AtEOXact_SMgr() is called, or until
+ * smgrdestroy() is called in non-transaction backends.
  */
 SMgrRelation
 smgropen(RelFileLocator rlocator, BackendId backend)
@@ -259,10 +261,10 @@ smgrexists(SMgrRelation reln, ForkNumber forknum)
 }
 
 /*
- * smgrclose() -- Close and delete an SMgrRelation object.
+ * smgrdestroy() -- Delete an SMgrRelation object.
  */
 void
-smgrclose(SMgrRelation reln)
+smgrdestroy(SMgrRelation reln)
 {
        SMgrRelation *owner;
        ForkNumber      forknum;
@@ -289,12 +291,14 @@ smgrclose(SMgrRelation reln)
 }
 
 /*
- * smgrrelease() -- Release all resources used by this object.
+ * smgrclose() -- Release all resources used by this object.
  *
- * The object remains valid.
+ * The object remains valid, but is moved to the unknown list where it will
+ * be destroyed by AtEOXact_SMgr().  It may be re-owned if it is accessed by a
+ * relation before then.
  */
 void
-smgrrelease(SMgrRelation reln)
+smgrclose(SMgrRelation reln)
 {
        for (ForkNumber forknum = 0; forknum <= MAX_FORKNUM; forknum++)
        {
@@ -302,15 +306,20 @@ smgrrelease(SMgrRelation reln)
                reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
        }
        reln->smgr_targblock = InvalidBlockNumber;
+
+       if (reln->smgr_owner)
+       {
+               *reln->smgr_owner = NULL;
+               reln->smgr_owner = NULL;
+               dlist_push_tail(&unowned_relns, &reln->node);
+       }
 }
 
 /*
- * smgrreleaseall() -- Release resources used by all objects.
- *
- * This is called for PROCSIGNAL_BARRIER_SMGRRELEASE.
+ * smgrcloseall() -- Close all objects.
  */
 void
-smgrreleaseall(void)
+smgrcloseall(void)
 {
        HASH_SEQ_STATUS status;
        SMgrRelation reln;
@@ -322,14 +331,17 @@ smgrreleaseall(void)
        hash_seq_init(&status, SMgrRelationHash);
 
        while ((reln = (SMgrRelation) hash_seq_search(&status)) != NULL)
-               smgrrelease(reln);
+               smgrclose(reln);
 }
 
 /*
- * smgrcloseall() -- Close all existing SMgrRelation objects.
+ * smgrdestroyall() -- Destroy all SMgrRelation objects.
+ *
+ * It must be known that there are no pointers to SMgrRelations, other than
+ * those registered with smgrsetowner().
  */
 void
-smgrcloseall(void)
+smgrdestroyall(void)
 {
        HASH_SEQ_STATUS status;
        SMgrRelation reln;
@@ -341,7 +353,7 @@ smgrcloseall(void)
        hash_seq_init(&status, SMgrRelationHash);
 
        while ((reln = (SMgrRelation) hash_seq_search(&status)) != NULL)
-               smgrclose(reln);
+               smgrdestroy(reln);
 }
 
 /*
@@ -733,7 +745,8 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
  * AtEOXact_SMgr
  *
  * This routine is called during transaction commit or abort (it doesn't
- * particularly care which).  All transient SMgrRelation objects are closed.
+ * particularly care which).  All transient SMgrRelation objects are
+ * destroyed.
  *
  * We do this as a compromise between wanting transient SMgrRelations to
  * live awhile (to amortize the costs of blind writes of multiple blocks)
@@ -747,7 +760,7 @@ AtEOXact_SMgr(void)
        dlist_mutable_iter iter;
 
        /*
-        * Zap all unowned SMgrRelations.  We rely on smgrclose() to remove each
+        * Zap all unowned SMgrRelations.  We rely on smgrdestroy() to remove 
each
         * one from the list.
         */
        dlist_foreach_modify(iter, &unowned_relns)
@@ -757,7 +770,7 @@ AtEOXact_SMgr(void)
 
                Assert(rel->smgr_owner == NULL);
 
-               smgrclose(rel);
+               smgrdestroy(rel);
        }
 }
 
@@ -768,6 +781,6 @@ AtEOXact_SMgr(void)
 bool
 ProcessBarrierSmgrRelease(void)
 {
-       smgrreleaseall();
+       smgrcloseall();
        return true;
 }
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index d51d46d335..a38f1acb37 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -14,6 +14,7 @@
 #ifndef BUFMGR_H
 #define BUFMGR_H
 
+#include "port/pg_iovec.h"
 #include "storage/block.h"
 #include "storage/buf.h"
 #include "storage/bufpage.h"
@@ -158,6 +159,11 @@ extern PGDLLIMPORT int32 *LocalRefCount;
 #define BUFFER_LOCK_SHARE              1
 #define BUFFER_LOCK_EXCLUSIVE  2
 
+/*
+ * Maximum number of buffers for multi-buffer I/O functions.  This is set to
+ * allow 128kB transfers, unless BLCKSZ and IOV_MAX imply a a smaller maximum.
+ */
+#define MAX_BUFFERS_PER_TRANSFER Min(PG_IOV_MAX, (128 * 1024) / BLCKSZ)
 
 /*
  * prototypes for functions in bufmgr.c
@@ -177,6 +183,18 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator 
rlocator,
                                                                                
ForkNumber forkNum, BlockNumber blockNum,
                                                                                
ReadBufferMode mode, BufferAccessStrategy strategy,
                                                                                
bool permanent);
+extern Buffer PrepareReadBuffer(BufferManagerRelation bmr,
+                                                               ForkNumber 
forkNum,
+                                                               BlockNumber 
blockNum,
+                                                               
BufferAccessStrategy strategy,
+                                                               bool *foundPtr);
+extern void CompleteReadBuffers(BufferManagerRelation bmr,
+                                                               Buffer *buffers,
+                                                               ForkNumber 
forknum,
+                                                               BlockNumber 
blocknum,
+                                                               int nblocks,
+                                                               bool 
zero_on_error,
+                                                               
BufferAccessStrategy strategy);
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
 extern bool BufferIsExclusiveLocked(Buffer buffer);
@@ -247,9 +265,13 @@ extern void LockBufferForCleanup(Buffer buffer);
 extern bool ConditionalLockBufferForCleanup(Buffer buffer);
 extern bool IsBufferCleanupOK(Buffer buffer);
 extern bool HoldingBufferPinThatDelaysRecovery(void);
+extern void ZeroBuffer(Buffer buffer, ReadBufferMode mode);
 
 extern bool BgBufferSync(struct WritebackContext *wb_context);
 
+extern void LimitAdditionalPins(uint32 *additional_pins);
+extern void LimitAdditionalLocalPins(uint32 *additional_pins);
+
 /* in buf_init.c */
 extern void InitBufferPool(void);
 extern Size BufferShmemSize(void);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 527cd2a056..d8ffe397fa 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -85,8 +85,8 @@ extern void smgrclearowner(SMgrRelation *owner, SMgrRelation 
reln);
 extern void smgrclose(SMgrRelation reln);
 extern void smgrcloseall(void);
 extern void smgrcloserellocator(RelFileLocatorBackend rlocator);
-extern void smgrrelease(SMgrRelation reln);
-extern void smgrreleaseall(void);
+extern void smgrdestroy(SMgrRelation reln);
+extern void smgrdestroyall(void);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
diff --git a/src/include/storage/streaming_read.h 
b/src/include/storage/streaming_read.h
new file mode 100644
index 0000000000..40c3408c54
--- /dev/null
+++ b/src/include/storage/streaming_read.h
@@ -0,0 +1,45 @@
+#ifndef STREAMING_READ_H
+#define STREAMING_READ_H
+
+#include "storage/bufmgr.h"
+#include "storage/fd.h"
+#include "storage/smgr.h"
+
+/* Default tuning, reasonable for many users. */
+#define PGSR_FLAG_DEFAULT 0x00
+
+/*
+ * I/O streams that are performing maintenance work on behalf of potentially
+ * many users.
+ */
+#define PGSR_FLAG_MAINTENANCE 0x01
+
+/*
+ * We usually avoid issuing prefetch advice automatically when sequential
+ * access is detected, but this flag explicitly disables it, for cases that
+ * might not be correctly detected.  Explicit advice is known to perform worse
+ * than letting the kernel (at least Linux) detect sequential access.
+ */
+#define PGSR_FLAG_SEQUENTIAL 0x02
+
+struct PgStreamingRead;
+typedef struct PgStreamingRead PgStreamingRead;
+
+/* Callback that returns the next block number to read. */
+typedef BlockNumber (*PgStreamingReadBufferCB) (PgStreamingRead *pgsr,
+                                                                               
                void *pgsr_private,
+                                                                               
                void *per_buffer_private);
+
+extern PgStreamingRead *pg_streaming_read_buffer_alloc(int flags,
+                                                                               
                           void *pgsr_private,
+                                                                               
                           size_t per_buffer_private_size,
+                                                                               
                           BufferAccessStrategy strategy,
+                                                                               
                           BufferManagerRelation bmr,
+                                                                               
                           ForkNumber forknum,
+                                                                               
                           PgStreamingReadBufferCB next_block_cb);
+
+extern void pg_streaming_read_prefetch(PgStreamingRead *pgsr);
+extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void 
**per_buffer_private);
+extern void pg_streaming_read_free(PgStreamingRead *pgsr);
+
+#endif
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index a584b1ddff..6636cc82c0 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -561,12 +561,6 @@ typedef struct ViewOptions
  *
  * Very little code is authorized to touch rel->rd_smgr directly.  Instead
  * use this function to fetch its value.
- *
- * Note: since a relcache flush can cause the file handle to be closed again,
- * it's unwise to hold onto the pointer returned by this function for any
- * long period.  Recommended practice is to just re-execute RelationGetSmgr
- * each time you need to access the SMgrRelation.  It's quite cheap in
- * comparison to whatever an smgr function is going to do.
  */
 static inline SMgrRelation
 RelationGetSmgr(Relation rel)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 29fd1cae64..018ebbcbaa 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2089,6 +2089,8 @@ PgStat_TableCounts
 PgStat_TableStatus
 PgStat_TableXactStatus
 PgStat_WalStats
+PgStreamingRead
+PgStreamingReadRange
 PgXmlErrorContext
 PgXmlStrictness
 Pg_finfo_record
-- 
2.37.2

From 339b39d7f2cc5fc4bd6aa3429a12e6f3a4f9d2db Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Fri, 19 Jan 2024 16:10:30 -0500
Subject: [PATCH v3 2/2] use streaming reads in index scan

ci-os-only:
---
 src/backend/access/heap/heapam_handler.c | 19 ++++++--
 src/backend/access/index/indexam.c       | 42 +++++++++++++++++
 src/backend/executor/nodeIndexonlyscan.c | 26 ++++++++++-
 src/backend/executor/nodeIndexscan.c     | 57 +++++++++++++++++++-----
 src/backend/storage/aio/streaming_read.c | 10 ++++-
 src/include/access/relscan.h             |  7 +++
 src/include/executor/nodeIndexscan.h     |  6 +++
 src/include/storage/streaming_read.h     |  2 +
 8 files changed, 151 insertions(+), 18 deletions(-)

diff --git a/src/backend/access/heap/heapam_handler.c 
b/src/backend/access/heap/heapam_handler.c
index d15a02b2be..e5e13e92d8 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -127,9 +127,22 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
                /* Switch to correct buffer if we don't have it already */
                Buffer          prev_buf = hscan->xs_cbuf;
 
-               hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf,
-                                                                               
          hscan->xs_base.rel,
-                                                                               
          ItemPointerGetBlockNumber(tid));
+               if (scan->pgsr && scan->do_pgsr)
+               {
+                       hscan->xs_cbuf = 
pg_streaming_read_buffer_get_next(scan->pgsr, (void **) &tid);
+                       if (!BufferIsValid(hscan->xs_cbuf))
+                               return false;
+               }
+               else
+               {
+                       ItemPointerSet(&scan->tid_queue, InvalidBlockNumber, 
InvalidOffsetNumber);
+                       if (!ItemPointerIsValid(tid))
+                               return false;
+                       hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf,
+                                                                               
                        hscan->xs_base.rel,
+                                                                               
                        ItemPointerGetBlockNumber(tid));
+               }
+
 
                /*
                 * Prune page, but only if we weren't already on this page
diff --git a/src/backend/access/index/indexam.c 
b/src/backend/access/index/indexam.c
index 63dff101e2..f247a1d2d3 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -211,6 +211,29 @@ index_insert_cleanup(Relation indexRelation,
                indexRelation->rd_indam->aminsertcleanup(indexInfo);
 }
 
+static ItemPointerData
+index_tid_dequeue(ItemPointer tid_queue)
+{
+       ItemPointerData result = *tid_queue;
+       ItemPointerSet(tid_queue, InvalidBlockNumber, InvalidOffsetNumber);
+       return result;
+}
+
+
+static BlockNumber
+index_pgsr_next_single(PgStreamingRead *pgsr, void *pgsr_private, void 
*per_buffer_data)
+{
+       IndexFetchTableData *scan = (IndexFetchTableData *) pgsr_private;
+       ItemPointerData data = index_tid_dequeue(&scan->tid_queue);
+
+       ItemPointer dest = per_buffer_data;
+       *dest = data;
+
+       if (!ItemPointerIsValid(&data))
+               return InvalidBlockNumber;
+       return ItemPointerGetBlockNumber(&data);
+}
+
 /*
  * index_beginscan - start a scan of an index with amgettuple
  *
@@ -236,7 +259,22 @@ index_beginscan(Relation heapRelation,
        scan->xs_snapshot = snapshot;
 
        /* prepare to fetch index matches from table */
+       scan->index_done = false;
        scan->xs_heapfetch = table_index_fetch_begin(heapRelation);
+       ItemPointerSet(&scan->xs_heapfetch->tid_queue, InvalidBlockNumber,
+                       InvalidOffsetNumber);
+
+       // TODO: can't put this here bc not AM agnostic
+       scan->xs_heapfetch->pgsr = 
pg_streaming_read_buffer_alloc(PGSR_FLAG_DEFAULT,
+                                                                               
                scan->xs_heapfetch,
+                                                                               
                sizeof(ItemPointerData),
+                                                                               
                NULL,
+                                                                               
                BMR_REL(scan->heapRelation),
+                                                                               
                MAIN_FORKNUM,
+                                                                               
                index_pgsr_next_single);
+
+       pg_streaming_read_set_resumable(scan->xs_heapfetch->pgsr);
+       scan->xs_heapfetch->do_pgsr = false;
 
        return scan;
 }
@@ -525,6 +563,9 @@ index_beginscan_parallel(Relation heaprel, Relation 
indexrel, int nkeys,
 
        /* prepare to fetch index matches from table */
        scan->xs_heapfetch = table_index_fetch_begin(heaprel);
+       ItemPointerSet(&scan->xs_heapfetch->tid_queue, InvalidBlockNumber,
+                       InvalidOffsetNumber);
+       scan->xs_heapfetch->do_pgsr = false;
 
        return scan;
 }
@@ -566,6 +607,7 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection 
direction)
                if (scan->xs_heapfetch)
                        table_index_fetch_reset(scan->xs_heapfetch);
 
+               ItemPointerSet(&scan->xs_heaptid, InvalidBlockNumber, 
InvalidOffsetNumber);
                return NULL;
        }
        Assert(ItemPointerIsValid(&scan->xs_heaptid));
diff --git a/src/backend/executor/nodeIndexonlyscan.c 
b/src/backend/executor/nodeIndexonlyscan.c
index 2c2c9c10b5..7979ecf1e4 100644
--- a/src/backend/executor/nodeIndexonlyscan.c
+++ b/src/backend/executor/nodeIndexonlyscan.c
@@ -111,17 +111,36 @@ IndexOnlyNext(IndexOnlyScanState *node)
                                                 node->ioss_NumScanKeys,
                                                 node->ioss_OrderByKeys,
                                                 node->ioss_NumOrderByKeys);
+
+               scandesc->xs_heapfetch->do_pgsr = true;
        }
 
        /*
         * OK, now that we have what we need, fetch the next tuple.
         */
-       while ((tid = index_getnext_tid(scandesc, direction)) != NULL)
+       while (true)
        {
                bool            tuple_from_heap = false;
 
                CHECK_FOR_INTERRUPTS();
 
+               Assert(!TID_QUEUE_FULL(&scandesc->xs_heapfetch->tid_queue));
+               do
+               {
+                       tid = index_getnext_tid(scandesc, direction);
+
+                       if (!tid)
+                       {
+                               scandesc->index_done = true;
+                               break;
+                       }
+
+                       index_tid_enqueue(tid, 
&scandesc->xs_heapfetch->tid_queue);
+               } while (!TID_QUEUE_FULL(&scandesc->xs_heapfetch->tid_queue));
+
+               if (!tid && TID_QUEUE_EMPTY(&scandesc->xs_heapfetch->tid_queue))
+                       break;
+
                /*
                 * We can skip the heap fetch if the TID references a heap page 
on
                 * which all tuples are known visible to everybody.  In any 
case,
@@ -156,7 +175,7 @@ IndexOnlyNext(IndexOnlyScanState *node)
                 * It's worth going through this complexity to avoid needing to 
lock
                 * the VM buffer, which could cause significant contention.
                 */
-               if (!VM_ALL_VISIBLE(scandesc->heapRelation,
+               if (!tid || !VM_ALL_VISIBLE(scandesc->heapRelation,
                                                        
ItemPointerGetBlockNumber(tid),
                                                        &node->ioss_VMBuffer))
                {
@@ -187,6 +206,9 @@ IndexOnlyNext(IndexOnlyScanState *node)
 
                        tuple_from_heap = true;
                }
+               else
+                       ItemPointerSet(&scandesc->xs_heapfetch->tid_queue,
+                                       InvalidBlockNumber, 
InvalidOffsetNumber);
 
                /*
                 * Fill the scan tuple slot with data from the index.  This 
might be
diff --git a/src/backend/executor/nodeIndexscan.c 
b/src/backend/executor/nodeIndexscan.c
index 03142b4a94..be91854436 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -77,6 +77,16 @@ static HeapTuple reorderqueue_pop(IndexScanState *node);
  *             using the index specified in the IndexScanState information.
  * ----------------------------------------------------------------
  */
+
+void
+index_tid_enqueue(ItemPointer tid, ItemPointer tid_queue)
+{
+       Assert(!ItemPointerIsValid(tid_queue));
+
+       ItemPointerSet(tid_queue, ItemPointerGetBlockNumber(tid),
+                       ItemPointerGetOffsetNumber(tid));
+}
+
 static TupleTableSlot *
 IndexNext(IndexScanState *node)
 {
@@ -123,31 +133,54 @@ IndexNext(IndexScanState *node)
                        index_rescan(scandesc,
                                                 node->iss_ScanKeys, 
node->iss_NumScanKeys,
                                                 node->iss_OrderByKeys, 
node->iss_NumOrderByKeys);
+
+               scandesc->xs_heapfetch->do_pgsr = true;
        }
 
        /*
         * ok, now that we have what we need, fetch the next tuple.
         */
-       while (index_getnext_slot(scandesc, direction, slot))
+
+       while (true)
        {
                CHECK_FOR_INTERRUPTS();
 
-               /*
-                * If the index was lossy, we have to recheck the index quals 
using
-                * the fetched tuple.
-                */
-               if (scandesc->xs_recheck)
+               Assert(!TID_QUEUE_FULL(&scandesc->xs_heapfetch->tid_queue));
+               do
                {
-                       econtext->ecxt_scantuple = slot;
-                       if (!ExecQualAndReset(node->indexqualorig, econtext))
+                       ItemPointer tid = index_getnext_tid(scandesc, 
direction);
+
+                       if (!tid)
                        {
-                               /* Fails recheck, so drop it and loop back for 
another */
-                               InstrCountFiltered2(node, 1);
-                               continue;
+                               scandesc->index_done = true;
+                               break;
                        }
+
+                       index_tid_enqueue(tid, 
&scandesc->xs_heapfetch->tid_queue);
+               } while (!TID_QUEUE_FULL(&scandesc->xs_heapfetch->tid_queue));
+
+               if (index_fetch_heap(scandesc, slot))
+               {
+                       /*
+                       * If the index was lossy, we have to recheck the index 
quals using
+                       * the fetched tuple.
+                       */
+                       if (scandesc->xs_recheck)
+                       {
+                               econtext->ecxt_scantuple = slot;
+                               if (!ExecQualAndReset(node->indexqualorig, 
econtext))
+                               {
+                                       /* Fails recheck, so drop it and loop 
back for another */
+                                       InstrCountFiltered2(node, 1);
+                                       continue;
+                               }
+                       }
+
+                       return slot;
                }
 
-               return slot;
+               if (scandesc->index_done)
+                       break;
        }
 
        /*
diff --git a/src/backend/storage/aio/streaming_read.c 
b/src/backend/storage/aio/streaming_read.c
index 19605090fe..6465963f83 100644
--- a/src/backend/storage/aio/streaming_read.c
+++ b/src/backend/storage/aio/streaming_read.c
@@ -34,6 +34,7 @@ struct PgStreamingRead
        int                     pinned_buffers_trigger;
        int                     next_tail_buffer;
        bool            finished;
+       bool            resumable;
        void       *pgsr_private;
        PgStreamingReadBufferCB callback;
        BufferAccessStrategy strategy;
@@ -292,7 +293,8 @@ pg_streaming_read_look_ahead(PgStreamingRead *pgsr)
                blocknum = pgsr->callback(pgsr, pgsr->pgsr_private, 
per_buffer_data);
                if (blocknum == InvalidBlockNumber)
                {
-                       pgsr->finished = true;
+                       if (!pgsr->resumable)
+                               pgsr->finished = true;
                        break;
                }
                bmr = pgsr->bmr;
@@ -433,3 +435,9 @@ pg_streaming_read_free(PgStreamingRead *pgsr)
                pfree(pgsr->per_buffer_data);
        pfree(pgsr);
 }
+
+void
+pg_streaming_read_set_resumable(PgStreamingRead *pgsr)
+{
+       pgsr->resumable = true;
+}
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 521043304a..ade7f59946 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -18,6 +18,7 @@
 #include "access/itup.h"
 #include "port/atomics.h"
 #include "storage/buf.h"
+#include "storage/streaming_read.h"
 #include "storage/spin.h"
 #include "utils/relcache.h"
 
@@ -104,6 +105,9 @@ typedef struct ParallelBlockTableScanWorkerData 
*ParallelBlockTableScanWorker;
 typedef struct IndexFetchTableData
 {
        Relation        rel;
+       PgStreamingRead *pgsr;
+       ItemPointerData tid_queue;
+       bool do_pgsr;
 } IndexFetchTableData;
 
 /*
@@ -162,6 +166,9 @@ typedef struct IndexScanDescData
        bool       *xs_orderbynulls;
        bool            xs_recheckorderby;
 
+       bool    index_done;
+
+
        /* parallel index scan information, in shared memory */
        struct ParallelIndexScanDescData *parallel_scan;
 }                      IndexScanDescData;
diff --git a/src/include/executor/nodeIndexscan.h 
b/src/include/executor/nodeIndexscan.h
index 3cddece67c..7dbff789e9 100644
--- a/src/include/executor/nodeIndexscan.h
+++ b/src/include/executor/nodeIndexscan.h
@@ -44,4 +44,10 @@ extern bool ExecIndexEvalArrayKeys(ExprContext *econtext,
                                                                   
IndexArrayKeyInfo *arrayKeys, int numArrayKeys);
 extern bool ExecIndexAdvanceArrayKeys(IndexArrayKeyInfo *arrayKeys, int 
numArrayKeys);
 
+#define TID_QUEUE_FULL(tid_queue) (ItemPointerIsValid(tid_queue))
+/* If it were a real queue empty and full wouldn't be opposites */
+#define TID_QUEUE_EMPTY(tid_queue) (!ItemPointerIsValid(tid_queue))
+
+extern void index_tid_enqueue(ItemPointer tid, ItemPointer tid_queue);
+
 #endif                                                 /* NODEINDEXSCAN_H */
diff --git a/src/include/storage/streaming_read.h 
b/src/include/storage/streaming_read.h
index 40c3408c54..2288b7b5eb 100644
--- a/src/include/storage/streaming_read.h
+++ b/src/include/storage/streaming_read.h
@@ -42,4 +42,6 @@ extern void pg_streaming_read_prefetch(PgStreamingRead *pgsr);
 extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void 
**per_buffer_private);
 extern void pg_streaming_read_free(PgStreamingRead *pgsr);
 
+extern void pg_streaming_read_set_resumable(PgStreamingRead *pgsr);
+
 #endif
-- 
2.37.2

Reply via email to