On Thu, Aug 7, 2025 at 8:41 AM Peter Geoghegan <p...@bowt.ie> wrote: > On Tue, Aug 5, 2025 at 7:31 PM Thomas Munro <thomas.mu...@gmail.com> wrote: > > There must be a similar opportunity for parallel index scans. It has > > that "seize the scan" concept where parallel workers do one-at-a-time > > locked linked list leapfrog. > > True. More generally, flexibility to reorder work would be useful there. > > The structure of parallel B-tree scans is one where each worker > performs its own "independent" index scan. The workers each only > return tuples from those leaf pages that they themselves manage to > read. That isn't particularly efficient, since we'll usually have to > merge the "independent" index scan tuples together once again using a > GatherMerge.
Yeah. This all sails close to the stuff I wrote about in the post-mortem of my failed attempt to teach parallel bitmap heapscan not to throw I/O combining opportunities out the window for v18, after Melanie streamified BHS. Basically you have competing goals: * preserve natural ranges of blocks up to io_combine_limit * make workers run out of work at the same time * avoiding I/O stalls using lookahead and concurrency You can't have all three right now: I/O streams are elastic so allocation decisions made at the producer end don't control work *finishing* time, so we need something new. I wrote about an idea based on work stealing when data runs out. Read streams would work independently, but cooperate at end of stream, avoiding interlocking almost all the time. That was basically a refinement of an earlier "shared" read stream that seems too locky. (Obviously the seize-the-scan block producer is a total lockfest navitaging a link-at-a-time data structure, but let's call that a use-case specific problem.) Other architectures are surely possible too, including admitting that precise streams are not right for that problem, and using something like the PredictBlock() approach mentioned below for prefetching and then sticking to block-at-a-time work distribution. Or we could go the other way and admit that block-at-a-time is also not ideal -- what if some blocks are 10,000 times more expensive to process than others? -- and do work stealing that degrades ultimately to tuple granularity, a logical extreme position. > > Basically, the stuff that we can't fix with "precise" I/O > > streaming as I like to call it, where it might still be interesting to > > think about opportunities to do fuzzier speculative lookahead. I'll > > start a new thread. > > That sounds interesting. I worry that we won't ever be able to get > away without some fallback that behaves roughly like OS readahead. Yeah. I might write about some of these things properly but here is an unfiltered brain dump of assorted theories of varying crackpottedness and some illustrative patches that I'm *not* proposing: * you can make a dumb speculative sequential readahead stream pretty easily, but it's not entirely satisfying: here's one of the toy patches I mentioned in Athens, that shows btree leaf scans (but not parallel ones) doing that, producing nice I/O combining and concurrency that ramps up in the usual way if it happens to be sequential (well I just rebased this and didn't test it, but it should still work); I will describe some other approaches to try to place this in the space of possibilities I'm aware of... * you could make a stream that pulls leaf pages from higher level internal pages on demand (if you want to avoid the flow control problems that come from trying to choose a batch size up front before you know you'll even need it by using a pull approach), or just notice that it looks sequential and install a block range producer, and if that doesn't match the next page pointers by the time you get there then you destroy it and switch strategies, or something * you could just pretend it's always sequential and reset the stream every time you're wrong or some only slightly smarter scheme than that, but it's still hard to know what's going on in cooperating processes... * you could put sequential extent information in meta blocks or somehow scatter hints around... * you could instead give up on explicit streams for fuzzy problems, and teach the buffer pool to do the same tricks as the kernel, with a scheme that lets go of the pins and reacquires them later (hopefully cheaply with ReadRecentBuffer(), by leaving a trail of breadcrumbs in SMgrRelation or shared memory, similar to what I already proposed for btree root pages and another related patch that speeds up seq scans, which I plan to repost soon): SMgrRelation could hold the state necessary for the buffer pool to notice when you keep calling ReadBuffer() for sequential blocks and begin to prefetch them speculatively with growing distance heuristics so it doesn't overdo it, but somehow not hold pins on your behalf (this was one of the driving concerns that made me originally think that I needed an explicit stream as an explicit opt-in and scoping for extra pins, which an AM might not want at certain times, truncation or cleanup or something, who knows) * you could steal Linux's BM_READAHEAD concept, where speculatively loaded pages carry a special marker so they can be recognized by later ReadBuffer() calls to encourage more magic readahead, because it's measurably fruitful; this will be seen also by other backends, eg parallel workers working on the same problem, though there is a bit of an interleaving edge problem (you probably want to know if adjacent pages have the flag in some window, and I have an idea for that that doesn't involve the buffer mapping table); in other words the state tracked in SMgrRelation is only used to ignite readahead, but shared flags in or parallel with the buffer pool apply fuel * from 30,000 feet, the question is what scope you do the detection at; you can find examples of OSes that only look at one fd for sequential detection and only consider strict-next-block (old Unixen, I suspect maybe Windows but IDK), systems that have a tolerance windows (Linux), systems that search a small table of active streams no matter which fds they're coming through (ZFS does this I think, sort of like our own synchronized_scan detector), and systems that use per-page accounting to measure and amplify success, and we have analogies in our architecture as candidate scopes: explicit stream objects, the per-backend SMgrRelation, the proposed system-wide SharedSMgrRelation, the buffer pool itself, and perhaps a per-buffer hint array with relaxed access (this last is something I've experimented with, both as a way to store relaxed navigation information for sequential scans skipping the buffer mapping table and as a place to accumulate prefetch-driving statistics) * so far that's just talking about sequential heuristics, but we have many clues the kernel doesn't, it's just that they're not always reliable enough for a "precise" read streams and we might not want to use the approach to speculation I mentioned above where you have a read stream but as soon as it gives you something you didn't expect you have to give up completely or reset it and start feeding it again; presumably you could code around that with a fuzzy speculation buffer that tolerates a bit of disorder * you could make a better version of PrefetchBuffer() for guided prefetching, let's call it PredictBuffer(), that is initially lazy but if the predictions turn out to be correct it starts looking further ahead in the stream of predictions you made and eventually becomes quite eager, like PrefetchBuffer(), but just lazy enough to perform I/O combining; note that I'm now talking about "pushing" rather than "pulling" predictions, another central question with implications, and one of the nice things about read streams * for a totally different line of attack that goes back to precise pull-based streams, you could imagine a read stream that lets you 'peek' at data coming down the pipe as soon as it is ready (ie it's already in cache, or it isn't but the IO finishes before the stream consumer gets to it), so you can get a head start on a jump requiring I/O in a self-referential data structure like a linked list (with some obvious limitations); here is a toy patch that allows you to install such a callback, which could feed next-block information to the main block number callback, so now we have three times of interest in the I/O stream: block numbers are pulled into the producer end, valid pages are pushed out to you as soon as possible somewhere in the middle or probably often just a step ahead the producer and can feed block numbers back to it, and pages are eventually pulled out of the consumer end for processing; BUT NOTE: this idea is not entirely compatible with the lazy I/O completion draining of io_method=io_uring (or the posix_aio patch I dumped on the list the other day, and the Windows equivalent could plausibly go either way), and works much better with io_method=worker whose completions are advertised eagerly, so this implementation of the idea is a dead end, if even the goal itself is interesting, not sure * the same effect could be achieved with chained streams where the consumer-facing stream is a simple elastic queue of buffers that is fed by the real I/O stream, with the peeking in between; that would suit those I/O methods much better; it might need a new read_stream_next_buffer_conditional() that calls that WaitReadBuffersWouldStall() function, unless the consumer queue is empty and it has to call read_stream_next_buffer() which might block; the point being to periodically pump the peeking mechanism * the peek concept is pretty weak on its own because it's hard to reach a state where you have enough lookahead window that it can follow a navigational jump in time to save you from a stall but ... maybe there are streams that contain a lot of either sequential or well cached blocks with occasional jumps to random I/O; if you could somehow combine the advanced vapourware of several of these magic bullet points, then perhaps you can avoid some stalls Please take all of that with an absolutely massive grain of salt, it's just very raw ideas...
From 2eeec2f0488a74fa1f3cfdaeef36c5f9926d417c Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sat, 12 Oct 2024 09:57:01 +1300 Subject: [PATCH 1/2] Add AutoReadStream. Provide auto_read_buffer() as a drop-in replacement for ReadBuffer(). As long as the caller requests sequential pages, they come from a ReadStream that predicts that it'll keep doing that. Otherwise it falls back to regular ReadBuffer(). XXX experimental --- src/backend/storage/aio/Makefile | 1 + src/backend/storage/aio/auto_read_stream.c | 92 ++++++++++++++++++++++ src/backend/storage/aio/meson.build | 1 + src/include/storage/auto_read_stream.h | 16 ++++ src/tools/pgindent/typedefs.list | 1 + 5 files changed, 111 insertions(+) create mode 100644 src/backend/storage/aio/auto_read_stream.c create mode 100644 src/include/storage/auto_read_stream.h diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile index 3f2469cc399..7270f486109 100644 --- a/src/backend/storage/aio/Makefile +++ b/src/backend/storage/aio/Makefile @@ -15,6 +15,7 @@ OBJS = \ aio_init.o \ aio_io.o \ aio_target.o \ + auto_read_stream.o \ method_io_uring.o \ method_sync.o \ method_worker.o \ diff --git a/src/backend/storage/aio/auto_read_stream.c b/src/backend/storage/aio/auto_read_stream.c new file mode 100644 index 00000000000..a8dac066bd6 --- /dev/null +++ b/src/backend/storage/aio/auto_read_stream.c @@ -0,0 +1,92 @@ +#include "postgres.h" +#include "storage/auto_read_stream.h" +#include "storage/bufmgr.h" +#include "storage/read_stream.h" + +struct AutoReadStream +{ + ReadStream *stream; + Relation rel; + BlockNumber next; + BlockNumber last; +}; + +static BlockNumber +auto_read_stream_cb(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + AutoReadStream *auto_stream = callback_private_data; + BlockNumber next = auto_stream->next; + + if (next == InvalidBlockNumber) + return InvalidBuffer; + auto_stream->next = Min(auto_stream->next + 1, auto_stream->last); + + return next; +} + +AutoReadStream * +auto_read_stream_begin(BufferAccessStrategy strategy, + Relation rel, + ForkNumber forknum) +{ + AutoReadStream *auto_stream; + + auto_stream = palloc(sizeof(*auto_stream)); + auto_stream->rel = rel; + auto_stream->next = InvalidBlockNumber; + auto_stream->last = InvalidBlockNumber; + auto_stream->stream = read_stream_begin_relation(0, + strategy, + rel, + forknum, + auto_read_stream_cb, + auto_stream, + 0); + + return auto_stream; +} + +void +auto_read_stream_end(AutoReadStream *auto_stream) +{ + read_stream_end(auto_stream->stream); + pfree(auto_stream); +} + +Buffer +auto_read_buffer(AutoReadStream *auto_stream, BlockNumber blocknum) +{ + + if (auto_stream->last != InvalidBlockNumber) + { + Buffer buffer = read_stream_next_buffer(auto_stream->stream, NULL); + + /* Did the stream guess right? */ + if (buffer != InvalidBlockNumber) + { + if (BufferGetBlockNumber(buffer) == blocknum) + return buffer; + ReleaseBuffer(buffer); + } + } + + /* Figure out the highest block number if we haven't already. */ + if (auto_stream->last == InvalidBlockNumber) + { + BlockNumber nblocks; + + nblocks = RelationGetNumberOfBlocks(auto_stream->rel); + if (nblocks > 0) + auto_stream->last = nblocks - 1; + } + + /* Take a guess at the next block. */ + if (auto_stream->last != InvalidBlockNumber && + auto_stream->last >= blocknum + 1) + auto_stream->next = blocknum + 1; + read_stream_reset(auto_stream->stream); + + return ReadBuffer(auto_stream->rel, blocknum); +} diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build index da6df2d3654..07c2fb2e42c 100644 --- a/src/backend/storage/aio/meson.build +++ b/src/backend/storage/aio/meson.build @@ -7,6 +7,7 @@ backend_sources += files( 'aio_init.c', 'aio_io.c', 'aio_target.c', + 'auto_read_stream.c', 'method_io_uring.c', 'method_sync.c', 'method_worker.c', diff --git a/src/include/storage/auto_read_stream.h b/src/include/storage/auto_read_stream.h new file mode 100644 index 00000000000..96d4a9ac726 --- /dev/null +++ b/src/include/storage/auto_read_stream.h @@ -0,0 +1,16 @@ +#ifndef AUTO_READ_STREAM_H +#define AUTO_READ_STREAM_H + +#include "storage/bufmgr.h" + +struct AutoReadStream; +typedef struct AutoReadStream AutoReadStream; + +extern AutoReadStream *auto_read_stream_begin(BufferAccessStrategy strategy, + Relation rel, + ForkNumber forknum); +extern void auto_read_stream_end(AutoReadStream *auto_stream); +extern Buffer auto_read_buffer(AutoReadStream *auto_stream, + BlockNumber blocknum); + +#endif diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e6f2e93b2d6..43f206e30ed 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -173,6 +173,7 @@ AuthRequest AuthToken AutoPrewarmReadStreamData AutoPrewarmSharedState +AutoReadStream AutoVacOpts AutoVacuumShmemStruct AutoVacuumWorkItem -- 2.39.5 (Apple Git-154)
From b3dee96bfaf31a4dff31b903ad05a7d5430d23e2 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sat, 12 Oct 2024 09:57:17 +1300 Subject: [PATCH 2/2] Use AutoReadStream for btree leaf page scans. Leaf pages tend to be sequential on recently built/clustered indexes, so there is an opportunity to do I/O combining and look-ahead. It's hard to do it 'precisely' with a ReadStream because they form a linked list. XXX POC --- src/backend/access/nbtree/nbtpage.c | 22 ++++++++++++++++++++++ src/backend/access/nbtree/nbtree.c | 5 +++++ src/backend/access/nbtree/nbtsearch.c | 2 +- src/include/access/nbtree.h | 5 +++++ 4 files changed, 33 insertions(+), 1 deletion(-) diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index c79dd38ee18..e617bf108dd 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -856,6 +856,28 @@ _bt_getbuf(Relation rel, BlockNumber blkno, int access) return buf; } +/* + * Like _bt_getbuf(), but use auto_stream instead of reading directly. This + * allows I/O to be combined if the blocks happen to be sequential on disk. + */ +Buffer +_bt_getbuf_auto(AutoReadStream *auto_stream, + Relation rel, + BlockNumber blkno, + int access) +{ + Buffer buf; + + Assert(BlockNumberIsValid(blkno)); + + /* Read an existing block of the relation */ + buf = auto_read_buffer(auto_stream, blkno); + _bt_lockbuf(rel, buf, access); + _bt_checkpage(rel, buf); + + return buf; +} + /* * _bt_allocbuf() -- Allocate a new block/page. * diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index fdff960c130..309147bea15 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -364,6 +364,9 @@ btbeginscan(Relation rel, int nkeys, int norderbys) so->killedItems = NULL; /* until needed */ so->numKilled = 0; + /* XXX defer until it looks like it's worth it? */ + so->auto_stream = auto_read_stream_begin(NULL, rel, MAIN_FORKNUM); + /* * We don't know yet whether the scan will be index-only, so we do not * allocate the tuple workspace arrays until btrescan. However, we set up @@ -483,6 +486,8 @@ btendscan(IndexScanDesc scan) so->markItemIndex = -1; BTScanPosUnpinIfPinned(so->markPos); + auto_read_stream_end(so->auto_stream); + /* No need to invalidate positions, the RAM is about to be freed. */ /* Release storage */ diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c index d69798795b4..f15e05042b4 100644 --- a/src/backend/access/nbtree/nbtsearch.c +++ b/src/backend/access/nbtree/nbtsearch.c @@ -2408,7 +2408,7 @@ _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, { /* read blkno, but check for interrupts first */ CHECK_FOR_INTERRUPTS(); - so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ); + so->currPos.buf = _bt_getbuf_auto(so->auto_stream, rel, blkno, BT_READ); } else { diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index e709d2e0afe..c4b82cd4466 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -22,6 +22,7 @@ #include "catalog/pg_am_d.h" #include "catalog/pg_index.h" #include "lib/stringinfo.h" +#include "storage/auto_read_stream.h" #include "storage/bufmgr.h" #include "storage/shm_toc.h" #include "utils/skipsupport.h" @@ -1072,6 +1073,8 @@ typedef struct BTScanOpaqueData int numKilled; /* number of currently stored items */ bool dropPin; /* drop leaf pin before btgettuple returns? */ + AutoReadStream *auto_stream; + /* * If we are doing an index-only scan, these are the tuple storage * workspaces for the currPos and markPos respectively. Each is of size @@ -1273,6 +1276,8 @@ extern void _bt_metaversion(Relation rel, bool *heapkeyspace, bool *allequalimage); extern void _bt_checkpage(Relation rel, Buffer buf); extern Buffer _bt_getbuf(Relation rel, BlockNumber blkno, int access); +extern Buffer _bt_getbuf_auto(AutoReadStream *auto_stream, + Relation rel, BlockNumber blkno, int access); extern Buffer _bt_allocbuf(Relation rel, Relation heaprel); extern Buffer _bt_relandgetbuf(Relation rel, Buffer obuf, BlockNumber blkno, int access); -- 2.39.5 (Apple Git-154)
From ac2dd3de67deef67bc15207944e72263c5625905 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Wed, 6 Aug 2025 16:23:13 +1200 Subject: [PATCH 1/2] Experiment: Peeking ahead in I/O streams. Allow consumers of self-referential block streams to install a callback that gets a chance to "peek" at each block as soon as possible. Consumers of self-referential block streams can use this to feed navigational information to the block number callback much earlier in the pipeline while they are still consuming older blocks. This can't help if there is an IO stall in the way of progress at every jump, but some mixed hit/miss patterns may be able to generate some extra concurrency this way. XXX highly experimental idea exploration... --- src/backend/storage/aio/read_stream.c | 74 ++++++++++++++++++++++++--- src/include/storage/bufmgr.h | 13 ++++- src/include/storage/read_stream.h | 8 +++ 3 files changed, 87 insertions(+), 8 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 0e7f5557f5c..2c64ce9bbd1 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -114,9 +114,12 @@ struct ReadStream /* * The callback that will tell us which block numbers to read, and an - * opaque pointer that will be pass to it for its own purposes. + * opaque pointer that will be pass to it for its own purposes. Optional + * peek_callback will have a chance to examine data in buffers some time + * before they are returned. */ ReadStreamBlockNumberCB callback; + ReadStreamPeekCB peek_callback; void *callback_private_data; /* Next expected block, for detecting sequential access. */ @@ -140,6 +143,7 @@ struct ReadStream /* Circular queue of buffers. */ int16 oldest_buffer_index; /* Next pinned buffer to return */ + int16 peek_buffer_index; /* Next pinned buffer to peek at */ int16 next_buffer_index; /* Index of next buffer to pin */ Buffer buffers[FLEXIBLE_ARRAY_MEMBER]; }; @@ -216,6 +220,26 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum) stream->buffered_blocknum = blocknum; } +/* + * Pass as many pinned and valid buffers as we can to peek_callback. + */ +static void +read_stream_peek(ReadStream *stream) +{ + while (stream->peek_buffer_index != stream->next_buffer_index && + !(stream->ios_in_progress > 0 && + stream->peek_buffer_index == + stream->ios[stream->oldest_io_index].buffer_index)) + { + stream->peek_callback(stream, + stream->callback_private_data, + get_per_buffer_data(stream, stream->peek_buffer_index), + stream->buffers[stream->peek_buffer_index]); + if (++stream->peek_buffer_index == stream->queue_size) + stream->peek_buffer_index = 0; + } +} + /* * Start as much of the current pending read as we can. If we have to split it * because of the per-backend buffer limit, or the buffer manager decides to @@ -429,6 +453,14 @@ read_stream_look_ahead(ReadStream *stream) continue; } + /* + * New naviation information might be available if we got a cache hit + * in a self-referential stream, so invoke peek_callback before every + * block number callback. + */ + if (stream->peek_callback) + read_stream_peek(stream); + /* * See which block the callback wants next in the stream. We need to * compute the index of the Nth block of the pending read including @@ -791,6 +823,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->distance == 1); Assert(stream->pending_read_nblocks == 0); Assert(stream->per_buffer_data_size == 0); + Assert(stream->peek_callback == NULL); Assert(stream->initialized_buffers > stream->oldest_buffer_index); /* We're going to return the buffer we pinned last time. */ @@ -888,15 +921,24 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(BufferIsValid(buffer)); /* Do we have to wait for an associated I/O first? */ - if (stream->ios_in_progress > 0 && - stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index) + while (stream->ios_in_progress > 0) { int16 io_index = stream->oldest_io_index; int32 distance; /* wider temporary value, clamped below */ - /* Sanity check that we still agree on the buffers. */ - Assert(stream->ios[io_index].op.buffers == - &stream->buffers[oldest_buffer_index]); + /* + * If the oldest IO covers the buffer we're returning, we have to wait + * for it. Otherwise, process as many as we can opportunistically + * without stalling. + * + * XXX This works for io_method=worker because its IOs progress is + * autonomous. For io_method=io_uring it doesn't because this backend + * has to drain the completion queue. In theory you could check with + * a cheap load if the cqe has data. + */ + if (stream->ios[stream->oldest_io_index].buffer_index != oldest_buffer_index && + WaitReadBuffersMightStall(&stream->ios[io_index].op)) + break; WaitReadBuffers(&stream->ios[io_index].op); @@ -920,6 +962,13 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->seq_until_processed = InvalidBlockNumber; } + /* + * Invoke peek_callback so it can see the oldest buffer, before we zap it. + * XXX Could reorder things and not need this + */ + if (stream->peek_callback) + read_stream_peek(stream); + /* * We must zap this queue entry, or else it would appear as a forwarded * buffer. If it's potentially in the overflow zone (ie from a @@ -977,7 +1026,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->pinned_buffers == 1 && stream->distance == 1 && stream->pending_read_nblocks == 0 && - stream->per_buffer_data_size == 0) + stream->per_buffer_data_size == 0 && + stream->peek_callback == NULL) { stream->fast_path = true; } @@ -1057,3 +1107,13 @@ read_stream_end(ReadStream *stream) read_stream_reset(stream); pfree(stream); } + +/* + * Install a callback that will have a chance to examine buffers as soon as + * possible. + */ +void +read_stream_set_peek_callback(ReadStream *stream, ReadStreamPeekCB peek_callback) +{ + stream->peek_callback = peek_callback; +} diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 41fdc1e7693..4ea522e0d4a 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -15,7 +15,7 @@ #define BUFMGR_H #include "port/pg_iovec.h" -#include "storage/aio_types.h" +#include "storage/aio.h" #include "storage/block.h" #include "storage/buf.h" #include "storage/bufpage.h" @@ -228,6 +228,17 @@ extern bool StartReadBuffers(ReadBuffersOperation *operation, int flags); extern void WaitReadBuffers(ReadBuffersOperation *operation); +/* + * Return true if a ReadBuffersOperation is not known to be physically + * completed already. + */ +static inline bool +WaitReadBuffersMightStall(ReadBuffersOperation *operation) +{ + return !pgaio_wref_valid(&operation->io_wref) || + !pgaio_wref_check_done(&operation->io_wref); +} + extern void ReleaseBuffer(Buffer buffer); extern void UnlockReleaseBuffer(Buffer buffer); extern bool BufferIsExclusiveLocked(Buffer buffer); diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h index 9b0d65161d0..c735bb0c6ab 100644 --- a/src/include/storage/read_stream.h +++ b/src/include/storage/read_stream.h @@ -78,6 +78,12 @@ typedef BlockNumber (*ReadStreamBlockNumberCB) (ReadStream *stream, void *callback_private_data, void *per_buffer_data); +/* Callback that has a chance to peek at buffers when they are ready. */ +typedef void (*ReadStreamPeekCB) (ReadStream *stream, + void *callback_private_data, + void *per_buffer_data, + Buffer buffer); + extern BlockNumber block_range_read_stream_cb(ReadStream *stream, void *callback_private_data, void *per_buffer_data); @@ -99,6 +105,8 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size); +extern void read_stream_set_peek_callback(ReadStream *stream, + ReadStreamPeekCB peek_callback); extern void read_stream_reset(ReadStream *stream); extern void read_stream_end(ReadStream *stream); -- 2.39.5 (Apple Git-154)