On Sun, Jul 20, 2025 at 1:07 AM Thomas Munro <thomas.mu...@gmail.com> wrote: > On Sat, Jul 19, 2025 at 11:23 PM Tomas Vondra <to...@vondra.me> wrote: > > Thanks for the link. It seems I came up with an almost the same patch, > > with three minor differences: > > > > 1) There's another place that sets "distance = 0" in > > read_stream_next_buffer, so maybe this should preserve the distance too? > > > > 2) I suspect we need to preserve the distance at the beginning of > > read_stream_reset, like > > > > stream->reset_distance = Max(stream->reset_distance, > > stream->distance); > > > > because what if you call _reset before reaching the end of the stream? > > > > 3) Shouldn't it reset the reset_distance to 0 after restoring it? > > Probably. Hmm... an earlier version of this code didn't use distance > == 0 to indicate end-of-stream, but instead had a separate internal > end_of_stream flag. If we brought that back and didn't clobber > distance, we wouldn't need this save-and-restore dance. It seemed > shorter and sweeter without it back then, before _reset() existed in > its present form, but I wonder if end_of_stream would be nicer than > having to add this kind of stuff, without measurable downsides.
... > Good question. Yeah, your flag idea seems like a good way to avoid > baking opinion into this level. I wonder if it should be a bitmask > rather than a boolean, in case we think of more things that need to be > included or not when resetting. Here's a sketch of the above two ideas for discussion (.txt to stay off cfbot's radar for this thread). Better than save/restore? Here also are some alternative experimental patches for preserving accumulated look-ahead distance better in cases like that. Needs more exploration... thoughts/ideas welcome...
From 2df6f67d199cfc0e0aa321830df2ee712057c61a Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Mon, 21 Jul 2025 13:58:47 +1200 Subject: [PATCH 1/3] aio: Refactor read_stream.c end-of-stream state. Previously, stream->distance was set to 0 to indicate that the end of the stream had been reached. In preparation for a later commit that preserves the distance across reset operations, introduce a separate end_of_stream flag instead, so it survives across restart operations. No externally visible effects. Discussion: https://postgr.es/m/CA%2BhUKG%2BWWr4-8TYemyU%3DucQsNe6bUBN_Sq3mCnBoBtxaJ9w3ug%40mail.gmail.com --- src/backend/storage/aio/read_stream.c | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 0e7f5557f5c..5de6f83c253 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -137,6 +137,7 @@ struct ReadStream int16 next_io_index; bool fast_path; + bool end_of_stream; /* Circular queue of buffers. */ int16 oldest_buffer_index; /* Next pinned buffer to return */ @@ -429,6 +430,9 @@ read_stream_look_ahead(ReadStream *stream) continue; } + if (stream->end_of_stream) + break; + /* * See which block the callback wants next in the stream. We need to * compute the index of the Nth block of the pending read including @@ -443,7 +447,7 @@ read_stream_look_ahead(ReadStream *stream) if (blocknum == InvalidBlockNumber) { /* End of stream. */ - stream->distance = 0; + stream->end_of_stream = true; break; } @@ -488,7 +492,7 @@ read_stream_look_ahead(ReadStream *stream) (stream->pending_read_nblocks == stream->io_combine_limit || (stream->pending_read_nblocks >= stream->distance && stream->pinned_buffers == 0) || - stream->distance == 0) && + stream->end_of_stream) && stream->ios_in_progress < stream->max_ios) read_stream_start_pending_read(stream); @@ -498,7 +502,7 @@ read_stream_look_ahead(ReadStream *stream) * stream. In the worst case we can always make progress one buffer at a * time. */ - Assert(stream->pinned_buffers > 0 || stream->distance == 0); + Assert(stream->pinned_buffers > 0 || stream->end_of_stream); if (stream->batch_mode) pgaio_exit_batchmode(); @@ -789,6 +793,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 1); Assert(stream->distance == 1); + Assert(stream->end_of_stream == false); Assert(stream->pending_read_nblocks == 0); Assert(stream->per_buffer_data_size == 0); Assert(stream->initialized_buffers > stream->oldest_buffer_index); @@ -841,7 +846,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) else { /* No more blocks, end of stream. */ - stream->distance = 0; + stream->end_of_stream = true; stream->oldest_buffer_index = stream->next_buffer_index; stream->pinned_buffers = 0; stream->buffers[oldest_buffer_index] = InvalidBuffer; @@ -857,7 +862,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->oldest_buffer_index == stream->next_buffer_index); /* End of stream reached? */ - if (stream->distance == 0) + if (stream->end_of_stream) return InvalidBuffer; /* @@ -871,7 +876,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* End of stream reached? */ if (stream->pinned_buffers == 0) { - Assert(stream->distance == 0); + Assert(stream->end_of_stream); return InvalidBuffer; } } @@ -976,6 +981,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->forwarded_buffers == 0 && stream->pinned_buffers == 1 && stream->distance == 1 && + stream->end_of_stream == false && stream->pending_read_nblocks == 0 && stream->per_buffer_data_size == 0) { @@ -1013,7 +1019,7 @@ read_stream_reset(ReadStream *stream) Buffer buffer; /* Stop looking ahead. */ - stream->distance = 0; + stream->end_of_stream = true; /* Forget buffered block number and fast path state. */ stream->buffered_blocknum = InvalidBlockNumber; @@ -1046,6 +1052,7 @@ read_stream_reset(ReadStream *stream) /* Start off assuming data is cached. */ stream->distance = 1; + stream->end_of_stream = false; } /* -- 2.39.5 (Apple Git-154)
From 5c3cf2018845093041dcf23bd6e8ae90b5c5906a Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Mon, 21 Jul 2025 14:38:32 +1200 Subject: [PATCH 2/3] aio: Add READ_STREAM_RESET_CONTINUE flag. For users that sometimes run out of block numbers temporarily but then later want to continue, provide a way to reset the stream and continue without forgetting the internal look-ahead distance. This is done as a bitmask, just in case we think of more reasons for fine-grained control of what is reset or not in the future. XXX Name debatable; is it better to use a 'high level' concept like 'continuing an interrupted stream', or a lower level concept explicitly referring to what is reset or not? XXX Where existing users pass 0 or the new flag not thought about very hard Discussion: https://postgr.es/m/CA%2BhUKG%2BWWr4-8TYemyU%3DucQsNe6bUBN_Sq3mCnBoBtxaJ9w3ug%40mail.gmail.com --- src/backend/access/gist/gistvacuum.c | 2 +- src/backend/access/heap/heapam.c | 4 ++-- src/backend/access/nbtree/nbtree.c | 2 +- src/backend/access/spgist/spgvacuum.c | 2 +- src/backend/storage/aio/read_stream.c | 13 +++++++++---- src/include/storage/read_stream.h | 14 +++++++++++++- 6 files changed, 27 insertions(+), 10 deletions(-) diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c index dca236b6e57..45044e3cc84 100644 --- a/src/backend/access/gist/gistvacuum.c +++ b/src/backend/access/gist/gistvacuum.c @@ -260,7 +260,7 @@ gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, * InvalidBuffer, the read stream API won't invoke our callback again * until the stream has been reset. */ - read_stream_reset(stream); + read_stream_reset(stream, READ_STREAM_RESET_CONTINUE); } read_stream_end(stream); diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 0dcd6ee817e..1ffc4a872db 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -667,7 +667,7 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir) if (unlikely(scan->rs_dir != dir)) { scan->rs_prefetch_block = scan->rs_cblock; - read_stream_reset(scan->rs_read_stream); + read_stream_reset(scan->rs_read_stream, 0); } scan->rs_dir = dir; @@ -1284,7 +1284,7 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params, * in initscan(). */ if (scan->rs_read_stream) - read_stream_reset(scan->rs_read_stream); + read_stream_reset(scan->rs_read_stream, 0); /* * reinitialize scan descriptor diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index fdff960c130..596df227c20 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -1318,7 +1318,7 @@ btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, * InvalidBuffer, the read stream API won't invoke our callback again * until the stream has been reset. */ - read_stream_reset(stream); + read_stream_reset(stream, READ_STREAM_RESET_CONTINUE); } read_stream_end(stream); diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c index 2678f7ab782..c4e4a89680e 100644 --- a/src/backend/access/spgist/spgvacuum.c +++ b/src/backend/access/spgist/spgvacuum.c @@ -885,7 +885,7 @@ spgvacuumscan(spgBulkDeleteState *bds) * InvalidBuffer, the read stream API won't invoke our callback again * until the stream has been reset. */ - read_stream_reset(stream); + read_stream_reset(stream, READ_STREAM_RESET_CONTINUE); } read_stream_end(stream); diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 5de6f83c253..f242b373b22 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -1011,9 +1011,13 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) * to be used again for different blocks. This can be used to clear an * end-of-stream condition and start again, or to throw away blocks that were * speculatively read and read some different blocks instead. + * + * READ_STREAM_RESET_CONTINUE can be pass to flags to indicate that a stream + * was temporarily interrupted but internal look-ahead distance heuristics + * should not be reset, because a similar access pattern is expected. */ void -read_stream_reset(ReadStream *stream) +read_stream_reset(ReadStream *stream, int flags) { int16 index; Buffer buffer; @@ -1050,8 +1054,9 @@ read_stream_reset(ReadStream *stream) Assert(stream->pinned_buffers == 0); Assert(stream->ios_in_progress == 0); - /* Start off assuming data is cached. */ - stream->distance = 1; + /* Start off like a newly initialized stream, unless asked not to. */ + if ((flags & READ_STREAM_RESET_CONTINUE) == 0) + stream->distance = 1; stream->end_of_stream = false; } @@ -1061,6 +1066,6 @@ read_stream_reset(ReadStream *stream) void read_stream_end(ReadStream *stream) { - read_stream_reset(stream); + read_stream_reset(stream, 0); pfree(stream); } diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h index 9b0d65161d0..90ec278117f 100644 --- a/src/include/storage/read_stream.h +++ b/src/include/storage/read_stream.h @@ -17,6 +17,8 @@ #include "storage/bufmgr.h" #include "storage/smgr.h" +/* Flags controlling stream initialization. */ + /* Default tuning, reasonable for many users. */ #define READ_STREAM_DEFAULT 0x00 @@ -42,6 +44,16 @@ */ #define READ_STREAM_FULL 0x04 +/* Flags controlling read_stream_reset(). */ + +/* + * If the callback reports end-of-stream or higher levels decide to abandon + * blocks that it generated speculatively, the stream can be reset and allowed + * to try to fetch blocks again without forgetting internal heuristics by + * passing this flag. + */ +#define READ_STREAM_RESET_CONTINUE 0x01 + /* --- * Opt-in to using AIO batchmode. * @@ -99,7 +111,7 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size); -extern void read_stream_reset(ReadStream *stream); +extern void read_stream_reset(ReadStream *stream, int flags); extern void read_stream_end(ReadStream *stream); #endif /* READ_STREAM_H */ -- 2.39.5 (Apple Git-154)
From 7e862c651a9b023bc9b3363f59d17b8bc6a22089 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Mon, 21 Jul 2025 16:34:59 +1200 Subject: [PATCH 3/3] aio: Improve read_stream.c look-ahead heuristics A. Previously we would reduce the look-ahead distance by one every time we got a cache hit, which sometimes performed poorly with mixed hit/miss patterns, especially if it was trapped at one. The new rule prevents distance reduction while any IO is still running, which allows for more concurrent I/O with some mixed hit/miss patterns that the old algorithm didn't handle well. XXX But you can always adjust your test patterns with more hit between the misses to make it give up I/O concurrency that you could have benefited from.... XXX But it also finds it way back to the fast path sooner... XXX Highly experimental! Suggested-by: AF --- src/backend/storage/aio/read_stream.c | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index f242b373b22..8961fad463f 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -343,22 +343,25 @@ read_stream_start_pending_read(ReadStream *stream) /* Remember whether we need to wait before returning this buffer. */ if (!need_wait) { - /* Look-ahead distance decays, no I/O necessary. */ - if (stream->distance > 1) + /* Look-ahead distance decays when there is no I/O activity. */ + if (stream->distance > 1 && stream->ios_in_progress == 0) stream->distance--; } else { - /* - * Remember to call WaitReadBuffers() before returning head buffer. - * Look-ahead distance will be adjusted after waiting. - */ + /* Remember to call WaitReadBuffers() before returning head buffer. */ stream->ios[io_index].buffer_index = buffer_index; if (++stream->next_io_index == stream->max_ios) stream->next_io_index = 0; Assert(stream->ios_in_progress < stream->max_ios); stream->ios_in_progress++; stream->seq_blocknum = stream->pending_read_blocknum + nblocks; + + /* Look-ahead distance grows by twice this read size. */ + if (stream->max_pinned_buffers - stream->distance < nblocks * 2) + stream->distance = stream->max_pinned_buffers; + else + stream->distance += nblocks * 2; } /* @@ -897,7 +900,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index) { int16 io_index = stream->oldest_io_index; - int32 distance; /* wider temporary value, clamped below */ /* Sanity check that we still agree on the buffers. */ Assert(stream->ios[io_index].op.buffers == @@ -910,11 +912,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) if (++stream->oldest_io_index == stream->max_ios) stream->oldest_io_index = 0; - /* Look-ahead distance ramps up rapidly after we do I/O. */ - distance = stream->distance * 2; - distance = Min(distance, stream->max_pinned_buffers); - stream->distance = distance; - /* * If we've reached the first block of a sequential region we're * issuing advice for, cancel that until the next jump. The kernel -- 2.39.5 (Apple Git-154)
From 60d5e1bb888b601ef04810d107376f190d069bf3 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Mon, 21 Jul 2025 16:34:59 +1200 Subject: [PATCH 3/3] aio: Improve read_stream.c look-ahead heuristics. Previously we would reduce the look-ahead distance by one every time we got a cache hit, which sometimes performed poorly with mixed hit/miss patterns, especially if it was trapped at one. Instead, add a crude estimation for whether it looks like the look-ahead window has any chance of spanning the gap between misses, if allowed to grow large enough. You still need misses to grow the distance by doubling, but you don't give it up until you've seen so many hits that there is no way we could ever see past them. XXX A somewhat extreme position... but on the other hand it doesn't contain any arbitrary unprincipled constants that would lose built-up look-ahead steam when you add just one more hit between misses to your test pattern, until you've reach the actual possible limit defined by the GUC settings. (?) XXX Perhaps the answer is somewhere in the middle, stream->distance * K rather than stream->max_pinned_buffed? XXX Highly experimental! --- src/backend/storage/aio/read_stream.c | 34 +++++++++++++++++---------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index f242b373b22..7995217662e 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -123,6 +123,9 @@ struct ReadStream BlockNumber seq_blocknum; BlockNumber seq_until_processed; + /* Counter used for distance heuristics. */ + uint64 blocks_since_last_io; + /* The read operation we are currently preparing. */ BlockNumber pending_read_blocknum; int16 pending_read_nblocks; @@ -198,6 +201,7 @@ read_stream_get_block(ReadStream *stream, void *per_buffer_data) blocknum = stream->callback(stream, stream->callback_private_data, per_buffer_data); + stream->blocks_since_last_io++; } return blocknum; @@ -343,22 +347,31 @@ read_stream_start_pending_read(ReadStream *stream) /* Remember whether we need to wait before returning this buffer. */ if (!need_wait) { - /* Look-ahead distance decays, no I/O necessary. */ - if (stream->distance > 1) + /* + * Look-ahead distance decays if we haven't had any cache misses in a + * hypothetical window the size of the maximum possible look-ahead + * distance. + */ + if (stream->distance > 1 && + stream->blocks_since_last_io > stream->max_pinned_buffers) stream->distance--; } else { - /* - * Remember to call WaitReadBuffers() before returning head buffer. - * Look-ahead distance will be adjusted after waiting. - */ + /* Remember to call WaitReadBuffers() before returning head buffer. */ stream->ios[io_index].buffer_index = buffer_index; if (++stream->next_io_index == stream->max_ios) stream->next_io_index = 0; Assert(stream->ios_in_progress < stream->max_ios); stream->ios_in_progress++; stream->seq_blocknum = stream->pending_read_blocknum + nblocks; + + /* Look-ahead distance doubles. */ + if (stream->distance > stream->max_pinned_buffers - stream->distance) + stream->distance = stream->max_pinned_buffers; + else + stream->distance += stream->distance; + stream->blocks_since_last_io = 0; } /* @@ -897,7 +910,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index) { int16 io_index = stream->oldest_io_index; - int32 distance; /* wider temporary value, clamped below */ /* Sanity check that we still agree on the buffers. */ Assert(stream->ios[io_index].op.buffers == @@ -910,11 +922,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) if (++stream->oldest_io_index == stream->max_ios) stream->oldest_io_index = 0; - /* Look-ahead distance ramps up rapidly after we do I/O. */ - distance = stream->distance * 2; - distance = Min(distance, stream->max_pinned_buffers); - stream->distance = distance; - /* * If we've reached the first block of a sequential region we're * issuing advice for, cancel that until the next jump. The kernel @@ -1056,7 +1063,10 @@ read_stream_reset(ReadStream *stream, int flags) /* Start off like a newly initialized stream, unless asked not to. */ if ((flags & READ_STREAM_RESET_CONTINUE) == 0) + { + stream->blocks_since_last_io = 0; stream->distance = 1; + } stream->end_of_stream = false; } -- 2.39.5 (Apple Git-154)