On Thu, Aug 7, 2025 at 8:41 AM Peter Geoghegan <p...@bowt.ie> wrote:
> On Tue, Aug 5, 2025 at 7:31 PM Thomas Munro <thomas.mu...@gmail.com> wrote:
> > There must be a similar opportunity for parallel index scans.  It has
> > that "seize the scan" concept where parallel workers do one-at-a-time
> > locked linked list leapfrog.
>
> True. More generally, flexibility to reorder work would be useful there.
>
> The structure of parallel B-tree scans is one where each worker
> performs its own "independent" index scan. The workers each only
> return tuples from those leaf pages that they themselves manage to
> read. That isn't particularly efficient, since we'll usually have to
> merge the "independent" index scan tuples together once again using a
> GatherMerge.

Yeah.  This all sails close to the stuff I wrote about in the
post-mortem of my failed attempt to teach parallel bitmap heapscan not
to throw I/O combining opportunities out the window for v18, after
Melanie streamified BHS.  Basically you have competing goals:

 * preserve natural ranges of blocks up to io_combine_limit
 * make workers run out of work at the same time
 * avoiding I/O stalls using lookahead and concurrency

You can't have all three right now: I/O streams are elastic so
allocation decisions made at the producer end don't control work
*finishing* time, so we need something new.  I wrote about an idea
based on work stealing when data runs out.  Read streams would work
independently, but cooperate at end of stream, avoiding interlocking
almost all the time.  That was basically a refinement of an earlier
"shared" read stream that seems too locky.  (Obviously the
seize-the-scan block producer is a total lockfest navitaging a
link-at-a-time data structure, but let's call that a use-case specific
problem.)

Other architectures are surely possible too, including admitting that
precise streams are not right for that problem, and using something
like the PredictBlock() approach mentioned below for prefetching and
then sticking to block-at-a-time work distribution.  Or we could go
the other way and admit that block-at-a-time is also not ideal -- what
if some blocks are 10,000 times more expensive to process than others?
-- and do work stealing that degrades ultimately to tuple granularity,
a logical extreme position.

> > Basically, the stuff that we can't fix with "precise" I/O
> > streaming as I like to call it, where it might still be interesting to
> > think about opportunities to do fuzzier speculative lookahead.  I'll
> > start a new thread.
>
> That sounds interesting. I worry that we won't ever be able to get
> away without some fallback that behaves roughly like OS readahead.

Yeah.  I might write about some of these things properly but here is
an unfiltered brain dump of assorted theories of varying
crackpottedness and some illustrative patches that I'm *not*
proposing:

 * you can make a dumb speculative sequential readahead stream pretty
easily, but it's not entirely satisfying: here's one of the toy
patches I mentioned in Athens, that shows btree leaf scans (but not
parallel ones) doing that, producing nice I/O combining and
concurrency that ramps up in the usual way if it happens to be
sequential (well I just rebased this and didn't test it, but it should
still work); I will describe some other approaches to try to place
this in the space of possibilities I'm aware of...

 * you could make a stream that pulls leaf pages from higher level
internal pages on demand (if you want to avoid the flow control
problems that come from trying to choose a batch size up front before
you know you'll even need it by using a pull approach), or just notice
that it looks sequential and install a block range producer, and if
that doesn't match the next page pointers by the time you get there
then you destroy it and switch strategies, or something

 * you could just pretend it's always sequential and reset the stream
every time you're wrong or some only slightly smarter scheme than
that, but it's still hard to know what's going on in cooperating
processes...

 * you could put sequential extent information in meta blocks or
somehow scatter hints around...

 * you could instead give up on explicit streams for fuzzy problems,
and teach the buffer pool to do the same tricks as the kernel, with a
scheme that lets go of the pins and reacquires them later (hopefully
cheaply with ReadRecentBuffer(), by leaving a trail of breadcrumbs in
SMgrRelation or shared memory, similar to what I already proposed for
btree root pages and another related patch that speeds up seq scans,
which I plan to repost soon): SMgrRelation could hold the state
necessary for the buffer pool to notice when you keep calling
ReadBuffer() for sequential blocks and begin to prefetch them
speculatively with growing distance heuristics so it doesn't overdo
it, but somehow not hold pins on your behalf (this was one of the
driving concerns that made me originally think that I needed an
explicit stream as an explicit opt-in and scoping for extra pins,
which an AM might not want at certain times, truncation or cleanup or
something, who knows)

 * you could steal Linux's BM_READAHEAD concept, where speculatively
loaded pages carry a special marker so they can be recognized by later
ReadBuffer() calls to encourage more magic readahead, because it's
measurably fruitful; this will be seen also by other backends, eg
parallel workers working on the same problem, though there is a bit of
an interleaving edge problem (you probably want to know if adjacent
pages have the flag in some window, and I have an idea for that that
doesn't involve the buffer mapping table); in other words the state
tracked in SMgrRelation is only used to ignite readahead, but shared
flags in or parallel with the buffer pool apply fuel

 * from 30,000 feet, the question is what scope you do the detection
at; you can find examples of OSes that only look at one fd for
sequential detection and only consider strict-next-block (old Unixen,
I suspect maybe Windows but IDK), systems that have a tolerance
windows (Linux), systems that search a small table of active streams
no matter which fds they're coming through (ZFS does this I think,
sort of like our own synchronized_scan detector), and systems that use
per-page accounting to measure and amplify success, and we have
analogies in our architecture as candidate scopes: explicit stream
objects, the per-backend SMgrRelation, the proposed system-wide
SharedSMgrRelation, the buffer pool itself, and perhaps a per-buffer
hint array with relaxed access (this last is something I've
experimented with, both as a way to store relaxed navigation
information for sequential scans skipping the buffer mapping table and
as a place to accumulate prefetch-driving statistics)

 * so far that's just talking about sequential heuristics, but we have
many clues the kernel doesn't, it's just that they're not always
reliable enough for a "precise" read streams and we might not want to
use the approach to speculation I mentioned above where you have a
read stream but as soon as it gives you something you didn't expect
you have to give up completely or reset it and start feeding it again;
presumably you could code around that with a fuzzy speculation buffer
that tolerates a bit of disorder

 * you could make a better version of PrefetchBuffer() for guided
prefetching, let's call it PredictBuffer(), that is initially lazy but
if the predictions turn out to be correct it starts looking further
ahead in the stream of predictions you made and eventually becomes
quite eager, like PrefetchBuffer(), but just lazy enough to perform
I/O combining; note that I'm now talking about "pushing" rather than
"pulling" predictions, another central question with implications, and
one of the nice things about read streams

 * for a totally different line of attack that goes back to precise
pull-based streams, you could imagine a read stream that lets you
'peek' at data coming down the pipe as soon as it is ready (ie it's
already in cache, or it isn't but the IO finishes before the stream
consumer gets to it), so you can get a head start on a jump requiring
I/O in a self-referential data structure like a linked list (with some
obvious limitations); here is a toy patch that allows you to install
such a callback, which could feed next-block information to the main
block number callback, so now we have three times of interest in the
I/O stream: block numbers are pulled into the producer end, valid
pages are pushed out to you as soon as possible somewhere in the
middle or probably often just a step ahead the producer and can feed
block numbers back to it, and pages are eventually pulled out of the
consumer end for processing; BUT NOTE: this idea is not entirely
compatible with the lazy I/O completion draining of io_method=io_uring
(or the posix_aio patch I dumped on the list the other day, and the
Windows equivalent could plausibly go either way), and works much
better with io_method=worker whose completions are advertised eagerly,
so this implementation of the idea is  a dead end, if even the goal
itself is interesting, not sure

 * the same effect could be achieved with chained streams where the
consumer-facing stream is a simple elastic queue of buffers that is
fed by the real I/O stream, with the peeking in between; that would
suit those I/O methods much better; it might need a new
read_stream_next_buffer_conditional() that calls that
WaitReadBuffersWouldStall() function, unless the consumer queue is
empty and it has to call read_stream_next_buffer() which might block;
the point being to periodically pump the peeking mechanism

 * the peek concept is pretty weak on its own because it's hard to
reach a state where you have enough lookahead window that it can
follow a navigational jump in time to save you from a stall but ...
maybe there are streams that contain a lot of either sequential or
well cached blocks with occasional jumps to random I/O; if you could
somehow combine the advanced vapourware of several of these magic
bullet points, then perhaps you can avoid some stalls

Please take all of that with an absolutely massive grain of salt, it's
just very raw ideas...
From 2eeec2f0488a74fa1f3cfdaeef36c5f9926d417c Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sat, 12 Oct 2024 09:57:01 +1300
Subject: [PATCH 1/2] Add AutoReadStream.

Provide auto_read_buffer() as a drop-in replacement for ReadBuffer().
As long as the caller requests sequential pages, they come from a
ReadStream that predicts that it'll keep doing that.  Otherwise it falls
back to regular ReadBuffer().

XXX experimental
---
 src/backend/storage/aio/Makefile           |  1 +
 src/backend/storage/aio/auto_read_stream.c | 92 ++++++++++++++++++++++
 src/backend/storage/aio/meson.build        |  1 +
 src/include/storage/auto_read_stream.h     | 16 ++++
 src/tools/pgindent/typedefs.list           |  1 +
 5 files changed, 111 insertions(+)
 create mode 100644 src/backend/storage/aio/auto_read_stream.c
 create mode 100644 src/include/storage/auto_read_stream.h

diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile
index 3f2469cc399..7270f486109 100644
--- a/src/backend/storage/aio/Makefile
+++ b/src/backend/storage/aio/Makefile
@@ -15,6 +15,7 @@ OBJS = \
        aio_init.o \
        aio_io.o \
        aio_target.o \
+       auto_read_stream.o \
        method_io_uring.o \
        method_sync.o \
        method_worker.o \
diff --git a/src/backend/storage/aio/auto_read_stream.c 
b/src/backend/storage/aio/auto_read_stream.c
new file mode 100644
index 00000000000..a8dac066bd6
--- /dev/null
+++ b/src/backend/storage/aio/auto_read_stream.c
@@ -0,0 +1,92 @@
+#include "postgres.h"
+#include "storage/auto_read_stream.h"
+#include "storage/bufmgr.h"
+#include "storage/read_stream.h"
+
+struct AutoReadStream
+{
+       ReadStream *stream;
+       Relation        rel;
+       BlockNumber next;
+       BlockNumber last;
+};
+
+static BlockNumber
+auto_read_stream_cb(ReadStream *stream,
+                                       void *callback_private_data,
+                                       void *per_buffer_data)
+{
+       AutoReadStream *auto_stream = callback_private_data;
+       BlockNumber next = auto_stream->next;
+
+       if (next == InvalidBlockNumber)
+               return InvalidBuffer;
+       auto_stream->next = Min(auto_stream->next + 1, auto_stream->last);
+
+       return next;
+}
+
+AutoReadStream *
+auto_read_stream_begin(BufferAccessStrategy strategy,
+                                          Relation rel,
+                                          ForkNumber forknum)
+{
+       AutoReadStream *auto_stream;
+
+       auto_stream = palloc(sizeof(*auto_stream));
+       auto_stream->rel = rel;
+       auto_stream->next = InvalidBlockNumber;
+       auto_stream->last = InvalidBlockNumber;
+       auto_stream->stream = read_stream_begin_relation(0,
+                                                                               
                         strategy,
+                                                                               
                         rel,
+                                                                               
                         forknum,
+                                                                               
                         auto_read_stream_cb,
+                                                                               
                         auto_stream,
+                                                                               
                         0);
+
+       return auto_stream;
+}
+
+void
+auto_read_stream_end(AutoReadStream *auto_stream)
+{
+       read_stream_end(auto_stream->stream);
+       pfree(auto_stream);
+}
+
+Buffer
+auto_read_buffer(AutoReadStream *auto_stream, BlockNumber blocknum)
+{
+
+       if (auto_stream->last != InvalidBlockNumber)
+       {
+               Buffer          buffer = 
read_stream_next_buffer(auto_stream->stream, NULL);
+
+               /* Did the stream guess right? */
+               if (buffer != InvalidBlockNumber)
+               {
+                       if (BufferGetBlockNumber(buffer) == blocknum)
+                               return buffer;
+                       ReleaseBuffer(buffer);
+               }
+       }
+
+       /* Figure out the highest block number if we haven't already. */
+       if (auto_stream->last == InvalidBlockNumber)
+       {
+               BlockNumber nblocks;
+
+               nblocks = RelationGetNumberOfBlocks(auto_stream->rel);
+               if (nblocks > 0)
+                       auto_stream->last = nblocks - 1;
+       }
+
+       /* Take a guess at the next block. */
+       if (auto_stream->last != InvalidBlockNumber &&
+               auto_stream->last >= blocknum + 1)
+               auto_stream->next = blocknum + 1;
+       read_stream_reset(auto_stream->stream);
+
+       return ReadBuffer(auto_stream->rel, blocknum);
+}
diff --git a/src/backend/storage/aio/meson.build 
b/src/backend/storage/aio/meson.build
index da6df2d3654..07c2fb2e42c 100644
--- a/src/backend/storage/aio/meson.build
+++ b/src/backend/storage/aio/meson.build
@@ -7,6 +7,7 @@ backend_sources += files(
   'aio_init.c',
   'aio_io.c',
   'aio_target.c',
+  'auto_read_stream.c',
   'method_io_uring.c',
   'method_sync.c',
   'method_worker.c',
diff --git a/src/include/storage/auto_read_stream.h 
b/src/include/storage/auto_read_stream.h
new file mode 100644
index 00000000000..96d4a9ac726
--- /dev/null
+++ b/src/include/storage/auto_read_stream.h
@@ -0,0 +1,16 @@
+#ifndef AUTO_READ_STREAM_H
+#define AUTO_READ_STREAM_H
+
+#include "storage/bufmgr.h"
+
+struct AutoReadStream;
+typedef struct AutoReadStream AutoReadStream;
+
+extern AutoReadStream *auto_read_stream_begin(BufferAccessStrategy strategy,
+                                                                               
          Relation rel,
+                                                                               
          ForkNumber forknum);
+extern void auto_read_stream_end(AutoReadStream *auto_stream);
+extern Buffer auto_read_buffer(AutoReadStream *auto_stream,
+                                                          BlockNumber 
blocknum);
+
+#endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e6f2e93b2d6..43f206e30ed 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -173,6 +173,7 @@ AuthRequest
 AuthToken
 AutoPrewarmReadStreamData
 AutoPrewarmSharedState
+AutoReadStream
 AutoVacOpts
 AutoVacuumShmemStruct
 AutoVacuumWorkItem
-- 
2.39.5 (Apple Git-154)

From b3dee96bfaf31a4dff31b903ad05a7d5430d23e2 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sat, 12 Oct 2024 09:57:17 +1300
Subject: [PATCH 2/2] Use AutoReadStream for btree leaf page scans.

Leaf pages tend to be sequential on recently built/clustered indexes, so
there is an opportunity to do I/O combining and look-ahead.  It's hard
to do it 'precisely' with a ReadStream because they form a linked list.

XXX POC
---
 src/backend/access/nbtree/nbtpage.c   | 22 ++++++++++++++++++++++
 src/backend/access/nbtree/nbtree.c    |  5 +++++
 src/backend/access/nbtree/nbtsearch.c |  2 +-
 src/include/access/nbtree.h           |  5 +++++
 4 files changed, 33 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/nbtree/nbtpage.c 
b/src/backend/access/nbtree/nbtpage.c
index c79dd38ee18..e617bf108dd 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -856,6 +856,28 @@ _bt_getbuf(Relation rel, BlockNumber blkno, int access)
        return buf;
 }
 
+/*
+ * Like _bt_getbuf(), but use auto_stream instead of reading directly.  This
+ * allows I/O to be combined if the blocks happen to be sequential on disk.
+ */
+Buffer
+_bt_getbuf_auto(AutoReadStream *auto_stream,
+                               Relation rel,
+                               BlockNumber blkno,
+                               int access)
+{
+       Buffer          buf;
+
+       Assert(BlockNumberIsValid(blkno));
+
+       /* Read an existing block of the relation */
+       buf = auto_read_buffer(auto_stream, blkno);
+       _bt_lockbuf(rel, buf, access);
+       _bt_checkpage(rel, buf);
+
+       return buf;
+}
+
 /*
  *     _bt_allocbuf() -- Allocate a new block/page.
  *
diff --git a/src/backend/access/nbtree/nbtree.c 
b/src/backend/access/nbtree/nbtree.c
index fdff960c130..309147bea15 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -364,6 +364,9 @@ btbeginscan(Relation rel, int nkeys, int norderbys)
        so->killedItems = NULL;         /* until needed */
        so->numKilled = 0;
 
+       /* XXX defer until it looks like it's worth it? */
+       so->auto_stream = auto_read_stream_begin(NULL, rel, MAIN_FORKNUM);
+
        /*
         * We don't know yet whether the scan will be index-only, so we do not
         * allocate the tuple workspace arrays until btrescan.  However, we set 
up
@@ -483,6 +486,8 @@ btendscan(IndexScanDesc scan)
        so->markItemIndex = -1;
        BTScanPosUnpinIfPinned(so->markPos);
 
+       auto_read_stream_end(so->auto_stream);
+
        /* No need to invalidate positions, the RAM is about to be freed. */
 
        /* Release storage */
diff --git a/src/backend/access/nbtree/nbtsearch.c 
b/src/backend/access/nbtree/nbtsearch.c
index d69798795b4..f15e05042b4 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -2408,7 +2408,7 @@ _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno,
                {
                        /* read blkno, but check for interrupts first */
                        CHECK_FOR_INTERRUPTS();
-                       so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
+                       so->currPos.buf = _bt_getbuf_auto(so->auto_stream, rel, 
blkno, BT_READ);
                }
                else
                {
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index e709d2e0afe..c4b82cd4466 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -22,6 +22,7 @@
 #include "catalog/pg_am_d.h"
 #include "catalog/pg_index.h"
 #include "lib/stringinfo.h"
+#include "storage/auto_read_stream.h"
 #include "storage/bufmgr.h"
 #include "storage/shm_toc.h"
 #include "utils/skipsupport.h"
@@ -1072,6 +1073,8 @@ typedef struct BTScanOpaqueData
        int                     numKilled;              /* number of currently 
stored items */
        bool            dropPin;                /* drop leaf pin before 
btgettuple returns? */
 
+       AutoReadStream *auto_stream;
+
        /*
         * If we are doing an index-only scan, these are the tuple storage
         * workspaces for the currPos and markPos respectively.  Each is of size
@@ -1273,6 +1276,8 @@ extern void _bt_metaversion(Relation rel, bool 
*heapkeyspace,
                                                        bool *allequalimage);
 extern void _bt_checkpage(Relation rel, Buffer buf);
 extern Buffer _bt_getbuf(Relation rel, BlockNumber blkno, int access);
+extern Buffer _bt_getbuf_auto(AutoReadStream *auto_stream,
+                                                         Relation rel, 
BlockNumber blkno, int access);
 extern Buffer _bt_allocbuf(Relation rel, Relation heaprel);
 extern Buffer _bt_relandgetbuf(Relation rel, Buffer obuf,
                                                           BlockNumber blkno, 
int access);
-- 
2.39.5 (Apple Git-154)

From ac2dd3de67deef67bc15207944e72263c5625905 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Wed, 6 Aug 2025 16:23:13 +1200
Subject: [PATCH 1/2] Experiment:  Peeking ahead in I/O streams.

Allow consumers of self-referential block streams to install a callback
that gets a chance to "peek" at each block as soon as possible.

Consumers of self-referential block streams can use this to feed
navigational information to the block number callback much earlier in
the pipeline while they are still consuming older blocks.

This can't help if there is an IO stall in the way of progress at every
jump, but some mixed hit/miss patterns may be able to generate some
extra concurrency this way.

XXX highly experimental idea exploration...
---
 src/backend/storage/aio/read_stream.c | 74 ++++++++++++++++++++++++---
 src/include/storage/bufmgr.h          | 13 ++++-
 src/include/storage/read_stream.h     |  8 +++
 3 files changed, 87 insertions(+), 8 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c 
b/src/backend/storage/aio/read_stream.c
index 0e7f5557f5c..2c64ce9bbd1 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -114,9 +114,12 @@ struct ReadStream
 
        /*
         * The callback that will tell us which block numbers to read, and an
-        * opaque pointer that will be pass to it for its own purposes.
+        * opaque pointer that will be pass to it for its own purposes.  
Optional
+        * peek_callback will have a chance to examine data in buffers some time
+        * before they are returned.
         */
        ReadStreamBlockNumberCB callback;
+       ReadStreamPeekCB peek_callback;
        void       *callback_private_data;
 
        /* Next expected block, for detecting sequential access. */
@@ -140,6 +143,7 @@ struct ReadStream
 
        /* Circular queue of buffers. */
        int16           oldest_buffer_index;    /* Next pinned buffer to return 
*/
+       int16           peek_buffer_index;      /* Next pinned buffer to peek 
at */
        int16           next_buffer_index;      /* Index of next buffer to pin 
*/
        Buffer          buffers[FLEXIBLE_ARRAY_MEMBER];
 };
@@ -216,6 +220,26 @@ read_stream_unget_block(ReadStream *stream, BlockNumber 
blocknum)
        stream->buffered_blocknum = blocknum;
 }
 
+/*
+ * Pass as many pinned and valid buffers as we can to peek_callback.
+ */
+static void
+read_stream_peek(ReadStream *stream)
+{
+       while (stream->peek_buffer_index != stream->next_buffer_index &&
+                  !(stream->ios_in_progress > 0 &&
+                        stream->peek_buffer_index ==
+                        stream->ios[stream->oldest_io_index].buffer_index))
+       {
+               stream->peek_callback(stream,
+                                                         
stream->callback_private_data,
+                                                         
get_per_buffer_data(stream, stream->peek_buffer_index),
+                                                         
stream->buffers[stream->peek_buffer_index]);
+               if (++stream->peek_buffer_index == stream->queue_size)
+                       stream->peek_buffer_index = 0;
+       }
+}
+
 /*
  * Start as much of the current pending read as we can.  If we have to split it
  * because of the per-backend buffer limit, or the buffer manager decides to
@@ -429,6 +453,14 @@ read_stream_look_ahead(ReadStream *stream)
                        continue;
                }
 
+               /*
+                * New naviation information might be available if we got a 
cache hit
+                * in a self-referential stream, so invoke peek_callback before 
every
+                * block number callback.
+                */
+               if (stream->peek_callback)
+                       read_stream_peek(stream);
+
                /*
                 * See which block the callback wants next in the stream.  We 
need to
                 * compute the index of the Nth block of the pending read 
including
@@ -791,6 +823,7 @@ read_stream_next_buffer(ReadStream *stream, void 
**per_buffer_data)
                Assert(stream->distance == 1);
                Assert(stream->pending_read_nblocks == 0);
                Assert(stream->per_buffer_data_size == 0);
+               Assert(stream->peek_callback == NULL);
                Assert(stream->initialized_buffers > 
stream->oldest_buffer_index);
 
                /* We're going to return the buffer we pinned last time. */
@@ -888,15 +921,24 @@ read_stream_next_buffer(ReadStream *stream, void 
**per_buffer_data)
        Assert(BufferIsValid(buffer));
 
        /* Do we have to wait for an associated I/O first? */
-       if (stream->ios_in_progress > 0 &&
-               stream->ios[stream->oldest_io_index].buffer_index == 
oldest_buffer_index)
+       while (stream->ios_in_progress > 0)
        {
                int16           io_index = stream->oldest_io_index;
                int32           distance;       /* wider temporary value, 
clamped below */
 
-               /* Sanity check that we still agree on the buffers. */
-               Assert(stream->ios[io_index].op.buffers ==
-                          &stream->buffers[oldest_buffer_index]);
+               /*
+                * If the oldest IO covers the buffer we're returning, we have 
to wait
+                * for it.  Otherwise, process as many as we can 
opportunistically
+                * without stalling.
+                *
+                * XXX This works for io_method=worker because its IOs progress 
is
+                * autonomous.  For io_method=io_uring it doesn't because this 
backend
+                * has to drain the completion queue.  In theory you could 
check with
+                * a cheap load if the cqe has data.
+                */
+               if (stream->ios[stream->oldest_io_index].buffer_index != 
oldest_buffer_index &&
+                       WaitReadBuffersMightStall(&stream->ios[io_index].op))
+                       break;
 
                WaitReadBuffers(&stream->ios[io_index].op);
 
@@ -920,6 +962,13 @@ read_stream_next_buffer(ReadStream *stream, void 
**per_buffer_data)
                        stream->seq_until_processed = InvalidBlockNumber;
        }
 
+       /*
+        * Invoke peek_callback so it can see the oldest buffer, before we zap 
it.
+        * XXX Could reorder things and not need this
+        */
+       if (stream->peek_callback)
+               read_stream_peek(stream);
+
        /*
         * We must zap this queue entry, or else it would appear as a forwarded
         * buffer.  If it's potentially in the overflow zone (ie from a
@@ -977,7 +1026,8 @@ read_stream_next_buffer(ReadStream *stream, void 
**per_buffer_data)
                stream->pinned_buffers == 1 &&
                stream->distance == 1 &&
                stream->pending_read_nblocks == 0 &&
-               stream->per_buffer_data_size == 0)
+               stream->per_buffer_data_size == 0 &&
+               stream->peek_callback == NULL)
        {
                stream->fast_path = true;
        }
@@ -1057,3 +1107,13 @@ read_stream_end(ReadStream *stream)
        read_stream_reset(stream);
        pfree(stream);
 }
+
+/*
+ * Install a callback that will have a chance to examine buffers as soon as
+ * possible.
+ */
+void
+read_stream_set_peek_callback(ReadStream *stream, ReadStreamPeekCB 
peek_callback)
+{
+       stream->peek_callback = peek_callback;
+}
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 41fdc1e7693..4ea522e0d4a 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -15,7 +15,7 @@
 #define BUFMGR_H
 
 #include "port/pg_iovec.h"
-#include "storage/aio_types.h"
+#include "storage/aio.h"
 #include "storage/block.h"
 #include "storage/buf.h"
 #include "storage/bufpage.h"
@@ -228,6 +228,17 @@ extern bool StartReadBuffers(ReadBuffersOperation 
*operation,
                                                         int flags);
 extern void WaitReadBuffers(ReadBuffersOperation *operation);
 
+/*
+ * Return true if a ReadBuffersOperation is not known to be physically
+ * completed already.
+ */
+static inline bool
+WaitReadBuffersMightStall(ReadBuffersOperation *operation)
+{
+       return !pgaio_wref_valid(&operation->io_wref) ||
+               !pgaio_wref_check_done(&operation->io_wref);
+}
+
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
 extern bool BufferIsExclusiveLocked(Buffer buffer);
diff --git a/src/include/storage/read_stream.h 
b/src/include/storage/read_stream.h
index 9b0d65161d0..c735bb0c6ab 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -78,6 +78,12 @@ typedef BlockNumber (*ReadStreamBlockNumberCB) (ReadStream 
*stream,
                                                                                
                void *callback_private_data,
                                                                                
                void *per_buffer_data);
 
+/* Callback that has a chance to peek at buffers when they are ready. */
+typedef void (*ReadStreamPeekCB) (ReadStream *stream,
+                                                                 void 
*callback_private_data,
+                                                                 void 
*per_buffer_data,
+                                                                 Buffer 
buffer);
+
 extern BlockNumber block_range_read_stream_cb(ReadStream *stream,
                                                                                
          void *callback_private_data,
                                                                                
          void *per_buffer_data);
@@ -99,6 +105,8 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags,
                                                                                
                   ReadStreamBlockNumberCB callback,
                                                                                
                   void *callback_private_data,
                                                                                
                   size_t per_buffer_data_size);
+extern void read_stream_set_peek_callback(ReadStream *stream,
+                                                                               
  ReadStreamPeekCB peek_callback);
 extern void read_stream_reset(ReadStream *stream);
 extern void read_stream_end(ReadStream *stream);
 
-- 
2.39.5 (Apple Git-154)

Reply via email to