From 948caab48ac94703fd07ed3ea497b1d63f7e9476 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 27 Feb 2024 00:01:42 +1300
Subject: [PATCH v13 2/4] Provide API for streaming relation data.

Introduce an abstraction where relation data can be accessed as a
stream of buffers, with an implementation that is more efficient than
the equivalent sequence of ReadBuffer() calls.

Client code supplies a callback that can say which block number is
wanted next, and then consumes individual buffers one at a time from the
stream.  This division allows read_stream.c to build up large calls to
StartReadBuffers() up to io_combine_limit, and issue posix_fadvise()
advice ahead of time in a systematic way when random access is detected.

This API is based on an idea from Andres Freund to pave the way for
asynchronous I/O in future work as required to support direct I/O.  The
goal is to have an abstraction that insulates client code from future
changes to the I/O subsystem that would benefit from information about
future needs.

An extended API may be necessary in future for more complicated cases
(for example recovery, whose LsnReadQueue device in xlogprefetcher.c is
a distant cousin of this code and should eventually be replaced by
this), but this basic API is sufficient for many common usage patterns
involving predictable access to a single relation fork.

Author: Thomas Munro <thomas.munro@gmail.com>
Author: Heikki Linnakangas <hlinnaka@iki.fi> (contributions)
Author: Melanie Plageman <melanieplageman@gmail.com> (contributions)
Suggested-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Nazir Bilal Yavuz <byavuz81@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6uT5TUm2gkvA@mail.gmail.com
---
 src/backend/storage/Makefile          |   2 +-
 src/backend/storage/aio/Makefile      |  14 +
 src/backend/storage/aio/meson.build   |   5 +
 src/backend/storage/aio/read_stream.c | 750 ++++++++++++++++++++++++++
 src/backend/storage/meson.build       |   1 +
 src/include/storage/read_stream.h     |  63 +++
 src/tools/pgindent/typedefs.list      |   2 +
 7 files changed, 836 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/storage/aio/Makefile
 create mode 100644 src/backend/storage/aio/meson.build
 create mode 100644 src/backend/storage/aio/read_stream.c
 create mode 100644 src/include/storage/read_stream.h

diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile
index 8376cdfca2..eec03f6f2b 100644
--- a/src/backend/storage/Makefile
+++ b/src/backend/storage/Makefile
@@ -8,6 +8,6 @@ subdir = src/backend/storage
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS     = buffer file freespace ipc large_object lmgr page smgr sync
+SUBDIRS     = aio buffer file freespace ipc large_object lmgr page smgr sync
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile
new file mode 100644
index 0000000000..2f29a9ec4d
--- /dev/null
+++ b/src/backend/storage/aio/Makefile
@@ -0,0 +1,14 @@
+#
+# Makefile for storage/aio
+#
+# src/backend/storage/aio/Makefile
+#
+
+subdir = src/backend/storage/aio
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = \
+	read_stream.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build
new file mode 100644
index 0000000000..10e1aa3b20
--- /dev/null
+++ b/src/backend/storage/aio/meson.build
@@ -0,0 +1,5 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+backend_sources += files(
+  'read_stream.c',
+)
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
new file mode 100644
index 0000000000..6b4b97372c
--- /dev/null
+++ b/src/backend/storage/aio/read_stream.c
@@ -0,0 +1,750 @@
+/*-------------------------------------------------------------------------
+ *
+ * read_stream.c
+ *	  Mechanism for accessing buffered relation data with look-ahead
+ *
+ * Code that needs to access relation data typically pins blocks one at a
+ * time, often in a predictable order that might be sequential or data-driven.
+ * Calling the simple ReadBuffer() function for each block is inefficient,
+ * because blocks that are not yet in the buffer pool require I/O operations
+ * that are small and might stall waiting for storage.  This mechanism looks
+ * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
+ * neighboring blocks together and ahead of time, with an adaptive look-ahead
+ * distance.
+ *
+ * A user-provided callback generates a stream of block numbers that is used
+ * to form reads of up to io_combine_limit, by attempting to merge them with a
+ * pending read.  When that isn't possible, the existing pending read is sent
+ * to StartReadBuffers() so that a new one can begin to form.
+ *
+ * The algorithm for controlling the look-ahead distance tries to classify the
+ * stream into three ideal behaviors:
+ *
+ * A) No I/O is necessary, because the requested blocks are fully cached
+ * already.  There is no benefit to looking ahead more than one block, so
+ * distance is 1.  This is the default initial assumption.
+ *
+ * B) I/O is necessary, but fadvise is undesirable because the access is
+ * sequential, or impossible because direct I/O is enabled or the system
+ * doesn't support advice.  There is no benefit in looking ahead more than
+ * io_combine_limit, because in this case only goal is larger read system
+ * calls.  Looking further ahead would pin many buffers and perform
+ * speculative work looking ahead for no benefit.
+ *
+ * C) I/O is necesssary, it appears random, and this system supports fadvise.
+ * We'll look further ahead in order to reach the configured level of I/O
+ * concurrency.
+ *
+ * The distance increases rapidly and decays slowly, so that it moves towards
+ * those levels as different I/O patterns are discovered.  For example, a
+ * sequential scan of fully cached data doesn't bother looking ahead, but a
+ * sequential scan that hits a region of uncached blocks will start issuing
+ * increasingly wide read calls until it plateaus at io_combine_limit.
+ *
+ * The main data structure is a circular queue of buffers of size
+ * max_pinned_buffers plus some extra space for technical reasons, ready to be
+ * returned by read_stream_next_buffer().  Each buffer also has an optional
+ * variable sized object that is passed from the callback to the consumer of
+ * buffers.
+ *
+ * Parallel to the queue of buffers, there is a circular queue of in-progress
+ * I/Os that have been started with StartReadBuffers(), and for which
+ * WaitReadBuffers() must be called before returning the buffer.
+ *
+ * For example, if the callback return block numbers 10, 42, 43, 60 in
+ * successive calls, then these data structures might appear as follows:
+ *
+ *                          buffers buf/data       ios
+ *
+ *                          +----+  +-----+       +--------+
+ *                          |    |  |     |  +----+ 42..44 | <- oldest_io_index
+ *                          +----+  +-----+  |    +--------+
+ *   oldest_buffer_index -> | 10 |  |  ?  |  | +--+ 60..60 |
+ *                          +----+  +-----+  | |  +--------+
+ *                          | 42 |  |  ?  |<-+ |  |        | <- next_io_index
+ *                          +----+  +-----+    |  +--------+
+ *                          | 43 |  |  ?  |    |  |        |
+ *                          +----+  +-----+    |  +--------+
+ *                          | 44 |  |  ?  |    |  |        |
+ *                          +----+  +-----+    |  +--------+
+ *                          | 60 |  |  ?  |<---+
+ *                          +----+  +-----+
+ *     next_buffer_index -> |    |  |     |
+ *                          +----+  +-----+
+ *
+ * In the example, 5 buffers are pinned, and the next buffer to be streamed to
+ * the client is block 10.  Block 10 was a hit and has no associated I/O, but
+ * the range 42..44 requires an I/O wait before its buffers are returned, as
+ * does block 60.
+ *
+ *
+ * Portions Copyright (c) 2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/storage/aio/read_stream.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "catalog/pg_tablespace.h"
+#include "miscadmin.h"
+#include "storage/fd.h"
+#include "storage/smgr.h"
+#include "storage/read_stream.h"
+#include "utils/memdebug.h"
+#include "utils/rel.h"
+#include "utils/spccache.h"
+
+typedef struct InProgressIO
+{
+	int16		buffer_index;
+	ReadBuffersOperation op;
+} InProgressIO;
+
+/*
+ * State for managing a stream of reads.
+ */
+struct ReadStream
+{
+	int16		max_ios;
+	int16		ios_in_progress;
+	int16		queue_size;
+	int16		max_pinned_buffers;
+	int16		pinned_buffers;
+	int16		distance;
+	bool		advice_enabled;
+
+	/*
+	 * Sometimes we need to be able to 'unget' a block number to resolve a
+	 * flow control problem when I/Os are split.
+	 */
+	BlockNumber unget_blocknum;
+	bool		have_unget_blocknum;
+
+	/*
+	 * The callback that will tell us which block numbers to read, and an
+	 * opaque pointer that will be pass to it for its own purposes.
+	 */
+	ReadStreamBlockNumberCB callback;
+	void	   *callback_private_data;
+
+	/* Next expected block, for detecting sequential access. */
+	BlockNumber seq_blocknum;
+
+	/* The read operation we are currently preparing. */
+	BlockNumber pending_read_blocknum;
+	int16		pending_read_nblocks;
+
+	/* Space for buffers and optional per-buffer private data. */
+	size_t		per_buffer_data_size;
+	void	   *per_buffer_data;
+
+	/* Read operations that have been started but not waited for yet. */
+	InProgressIO *ios;
+	int16		oldest_io_index;
+	int16		next_io_index;
+
+	/* Circular queue of buffers. */
+	int16		oldest_buffer_index;	/* Next pinned buffer to return */
+	int16		next_buffer_index;	/* Index of next buffer to pin */
+	Buffer		buffers[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/*
+ * Return a pointer to the per-buffer data by index.
+ */
+static inline void *
+get_per_buffer_data(ReadStream *stream, int16 buffer_index)
+{
+	return (char *) stream->per_buffer_data +
+		stream->per_buffer_data_size * buffer_index;
+}
+
+/*
+ * Ask the callback which block it would like us to read next, with a small
+ * buffer in front to allow read_stream_unget_block() to work.
+ */
+static inline BlockNumber
+read_stream_get_block(ReadStream *stream, void *per_buffer_data)
+{
+	if (!stream->have_unget_blocknum)
+		return stream->callback(stream,
+								stream->callback_private_data,
+								per_buffer_data);
+
+	/*
+	 * You can only unget one block, and next_buffer_index can't change across
+	 * a get, unget, get sequence, so the callback's per_buffer_data, if any,
+	 * is still present in the correct slot.  We just have to return the
+	 * previous block number.
+	 */
+	stream->have_unget_blocknum = false;
+	return stream->unget_blocknum;
+}
+
+/*
+ * In order to deal with short reads in StartReadBuffers(), we sometimes need
+ * to defer handling of a block until later.
+ */
+static inline void
+read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
+{
+	Assert(!stream->have_unget_blocknum);
+	stream->have_unget_blocknum = true;
+	stream->unget_blocknum = blocknum;
+}
+
+static void
+read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
+{
+	bool		need_wait;
+	int			nblocks;
+	int			flags;
+	int16		io_index;
+	int16		overflow;
+	int16		buffer_index;
+
+	/* This should only be called with a pending read. */
+	Assert(stream->pending_read_nblocks > 0);
+	Assert(stream->pending_read_nblocks <= io_combine_limit);
+
+	/* We had better not exceed the pin limit by starting this read. */
+	Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
+		   stream->max_pinned_buffers);
+
+	/* We had better not be overwriting an existing pinned buffer. */
+	if (stream->pinned_buffers > 0)
+		Assert(stream->next_buffer_index != stream->oldest_buffer_index);
+	else
+		Assert(stream->next_buffer_index == stream->oldest_buffer_index);
+
+	/*
+	 * If advice hasn't been suppressed, this system supports it, and this
+	 * isn't a strictly sequential pattern, then we'll issue advice.
+	 */
+	if (!suppress_advice &&
+		stream->advice_enabled &&
+		stream->pending_read_blocknum != stream->seq_blocknum)
+		flags = READ_BUFFERS_ISSUE_ADVICE;
+	else
+		flags = 0;
+
+	/* We say how many blocks we want to read, but may be smaller on return. */
+	buffer_index = stream->next_buffer_index;
+	io_index = stream->next_io_index;
+	nblocks = stream->pending_read_nblocks;
+	need_wait = StartReadBuffers(&stream->ios[io_index].op,
+								 &stream->buffers[buffer_index],
+								 stream->pending_read_blocknum,
+								 &nblocks,
+								 flags);
+	stream->pinned_buffers += nblocks;
+
+	/* Remember whether we need to wait before returning this buffer. */
+	if (!need_wait)
+	{
+		/* Look-ahead distance decays, no I/O necessary (behavior A). */
+		if (stream->distance > 1)
+			stream->distance--;
+	}
+	else
+	{
+		/*
+		 * Remember to call WaitReadBuffers() before returning head buffer.
+		 * Look-ahead distance will be adjusted after waiting.
+		 */
+		stream->ios[io_index].buffer_index = buffer_index;
+		if (++stream->next_io_index == stream->max_ios)
+			stream->next_io_index = 0;
+		Assert(stream->ios_in_progress < stream->max_ios);
+		stream->ios_in_progress++;
+		stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
+	}
+
+	/*
+	 * We gave a contiguous range of buffer space to StartReadBuffers(), but
+	 * we want it to wrap around at queue_size.  Slide overflowing buffers to
+	 * the front of the array.
+	 */
+	overflow = (buffer_index + nblocks) - stream->queue_size;
+	if (overflow > 0)
+		memmove(&stream->buffers[0],
+				&stream->buffers[stream->queue_size],
+				sizeof(stream->buffers[0]) * overflow);
+
+	/* Compute location of start of next read, without using % operator. */
+	buffer_index += nblocks;
+	if (buffer_index >= stream->queue_size)
+		buffer_index -= stream->queue_size;
+	Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
+	stream->next_buffer_index = buffer_index;
+
+	/* Adjust the pending read to cover the remaining portion, if any. */
+	stream->pending_read_blocknum += nblocks;
+	stream->pending_read_nblocks -= nblocks;
+}
+
+static void
+read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
+{
+	while (stream->ios_in_progress < stream->max_ios &&
+		   stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
+	{
+		BlockNumber blocknum;
+		int16		buffer_index;
+		void	   *per_buffer_data;
+
+		if (stream->pending_read_nblocks == io_combine_limit)
+		{
+			read_stream_start_pending_read(stream, suppress_advice);
+			suppress_advice = false;
+			continue;
+		}
+
+		/*
+		 * See which block the callback wants next in the stream.  We need to
+		 * compute the index of the Nth block of the pending read including
+		 * wrap-around, but we don't want to use the expensive % operator.
+		 */
+		buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
+		if (buffer_index >= stream->queue_size)
+			buffer_index -= stream->queue_size;
+		Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
+		per_buffer_data = get_per_buffer_data(stream, buffer_index);
+		blocknum = read_stream_get_block(stream, per_buffer_data);
+		if (blocknum == InvalidBlockNumber)
+		{
+			/* End of stream. */
+			stream->distance = 0;
+			break;
+		}
+
+		/* Can we merge it with the pending read? */
+		if (stream->pending_read_nblocks > 0 &&
+			stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
+		{
+			stream->pending_read_nblocks++;
+			continue;
+		}
+
+		/* We have to start the pending read before we can build another. */
+		if (stream->pending_read_nblocks > 0)
+		{
+			read_stream_start_pending_read(stream, suppress_advice);
+			suppress_advice = false;
+			if (stream->ios_in_progress == stream->max_ios)
+			{
+				/* And we've hit the limit.  Rewind, and stop here. */
+				read_stream_unget_block(stream, blocknum);
+				return;
+			}
+		}
+
+		/* This is the start of a new pending read. */
+		stream->pending_read_blocknum = blocknum;
+		stream->pending_read_nblocks = 1;
+	}
+
+	/*
+	 * We don't start the pending read just because we've hit the distance
+	 * limit, preferring to give it another chance to grow to full
+	 * io_combine_limit size once more buffers have been consumed.  However,
+	 * if we've already reached io_combine_limit, or we've reached the
+	 * distance limit and there isn't anything pinned yet, or the callback has
+	 * signaled end-of-stream, we start the read immediately.
+	 */
+	if (stream->pending_read_nblocks > 0 &&
+		(stream->pending_read_nblocks == io_combine_limit ||
+		 (stream->pending_read_nblocks == stream->distance &&
+		  stream->pinned_buffers == 0) ||
+		 stream->distance == 0) &&
+		stream->ios_in_progress < stream->max_ios)
+		read_stream_start_pending_read(stream, suppress_advice);
+}
+
+/*
+ * Create a new read stream object that can be used to perform the equivalent
+ * of a series of ReadBuffer() calls for one fork of one relation.
+ * Internally, it generates larger vectored reads where possible by looking
+ * ahead.  The callback should return block numbers or InvalidBlockNumber to
+ * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
+ * write extra data for each block into the space provided to it.  It will
+ * also receive callback_private_data for its own purposes.
+ */
+ReadStream *
+read_stream_begin_relation(int flags,
+						   BufferAccessStrategy strategy,
+						   BufferManagerRelation bmr,
+						   ForkNumber forknum,
+						   ReadStreamBlockNumberCB callback,
+						   void *callback_private_data,
+						   size_t per_buffer_data_size)
+{
+	ReadStream *stream;
+	size_t		size;
+	int16		queue_size;
+	int16		max_ios;
+	uint32		max_pinned_buffers;
+	Oid			tablespace_id;
+
+	/* Make sure our bmr's smgr and persistent are populated. */
+	if (bmr.smgr == NULL)
+	{
+		bmr.smgr = RelationGetSmgr(bmr.rel);
+		bmr.relpersistence = bmr.rel->rd_rel->relpersistence;
+	}
+
+	/*
+	 * Decide how many I/Os we will allow to run at the same time.  That
+	 * currently means advice to the kernel to tell it that we will soon read.
+	 * This number also affects how far we look ahead for opportunities to
+	 * start more I/Os.
+	 */
+	tablespace_id = bmr.smgr->smgr_rlocator.locator.spcOid;
+	if (!OidIsValid(MyDatabaseId) ||
+		(bmr.rel && IsCatalogRelation(bmr.rel)) ||
+		IsCatalogRelationOid(bmr.smgr->smgr_rlocator.locator.relNumber))
+	{
+		/*
+		 * Avoid circularity while trying to look up tablespace settings or
+		 * before spccache.c is ready.
+		 */
+		max_ios = effective_io_concurrency;
+	}
+	else if (flags & READ_STREAM_MAINTENANCE)
+		max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
+	else
+		max_ios = get_tablespace_io_concurrency(tablespace_id);
+	max_ios = Min(max_ios, PG_INT16_MAX);
+
+	/*
+	 * Choose the maximum number of buffers we're prepared to pin.  We try to
+	 * pin fewer if we can, though.  We clamp it to at least io_combine_limit
+	 * so that we can have a chance to build up a full io_combine_limit sized
+	 * read, even when max_ios is zero.  Be careful not to allow int16 to
+	 * overflow (even though that's not possible with the current GUC range
+	 * limits), allowing also for the spare entry and the overflow space.
+	 */
+	max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
+	max_pinned_buffers = Min(max_pinned_buffers,
+							 PG_INT16_MAX - io_combine_limit - 1);
+
+	/* Don't allow this backend to pin more than its share of buffers. */
+	if (SmgrIsTemp(bmr.smgr))
+		LimitAdditionalLocalPins(&max_pinned_buffers);
+	else
+		LimitAdditionalPins(&max_pinned_buffers);
+	Assert(max_pinned_buffers > 0);
+
+	/*
+	 * We need one extra entry for buffers and per-buffer data, because users
+	 * of per-buffer data have access to the object until the next call to
+	 * read_stream_next_buffer(), so we need a gap between the head and tail
+	 * of the queue so that we don't clobber it.
+	 */
+	queue_size = max_pinned_buffers + 1;
+
+	/*
+	 * Allocate the object, the buffers, the ios and per_data_data space in
+	 * one big chunk.  Though we have queue_size buffers, we want to be able
+	 * to assume that all the buffers for a single read are contiguous (i.e.
+	 * don't wrap around halfway through), so we allow temporary overflows of
+	 * up to the maximum possible read size by allocating an extra
+	 * io_combine_limit - 1 elements.
+	 */
+	size = offsetof(ReadStream, buffers);
+	size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
+	size += sizeof(InProgressIO) * Max(1, max_ios);
+	size += per_buffer_data_size * queue_size;
+	size += MAXIMUM_ALIGNOF * 2;
+	stream = (ReadStream *) palloc(size);
+	memset(stream, 0, offsetof(ReadStream, buffers));
+	stream->ios = (InProgressIO *)
+		MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
+	if (per_buffer_data_size > 0)
+		stream->per_buffer_data = (void *)
+			MAXALIGN(&stream->ios[Max(1, max_ios)]);
+
+#ifdef USE_PREFETCH
+
+	/*
+	 * This system supports prefetching advice.  We can use it as long as
+	 * direct I/O isn't enabled, the caller hasn't promised sequential access
+	 * (overriding our detection heuristics), and max_ios hasn't been set to
+	 * zero.
+	 */
+	if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
+		(flags & READ_STREAM_SEQUENTIAL) == 0 &&
+		max_ios > 0)
+		stream->advice_enabled = true;
+#endif
+
+	/*
+	 * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
+	 * above.  If we had real asynchronous I/O we might need a slightly
+	 * different definition.
+	 */
+	if (max_ios == 0)
+		max_ios = 1;
+
+	stream->max_ios = max_ios;
+	stream->per_buffer_data_size = per_buffer_data_size;
+	stream->max_pinned_buffers = max_pinned_buffers;
+	stream->queue_size = queue_size;
+
+	if (!bmr.smgr)
+	{
+		bmr.smgr = RelationGetSmgr(bmr.rel);
+		bmr.relpersistence = bmr.rel->rd_rel->relpersistence;
+	}
+	stream->callback = callback;
+	stream->callback_private_data = callback_private_data;
+
+	/*
+	 * Skip the initial ramp-up phase if the caller says we're going to be
+	 * reading the whole relation.  This way we start out assuming we'll be
+	 * doing full io_combine_limit sized reads (behavior B).
+	 */
+	if (flags & READ_STREAM_FULL)
+		stream->distance = Min(max_pinned_buffers, io_combine_limit);
+	else
+		stream->distance = 1;
+
+	/*
+	 * Since we always currently always access the same relation, we can
+	 * initialize parts of the ReadBuffersOperation objects and leave them
+	 * that way, to avoid wasting CPU cycles writing to them for each read.
+	 */
+	for (int i = 0; i < max_ios; ++i)
+	{
+		stream->ios[i].op.bmr = bmr;
+		stream->ios[i].op.forknum = forknum;
+		stream->ios[i].op.strategy = strategy;
+	}
+
+	return stream;
+}
+
+/*
+ * Pull one pinned buffer out of a stream.  Each call returns successive
+ * blocks in the order specified by the callback.  If per_buffer_data_size was
+ * set to a non-zero size, *per_buffer_data receives a pointer to the extra
+ * per-buffer data that the callback had a chance to populate, which remains
+ * valid until the next call to read_stream_next_buffer().  When the stream
+ * runs out of data, InvalidBuffer is returned.  The caller may decide to end
+ * the stream early at any time by calling read_stream_end().
+ */
+Buffer
+read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
+{
+	Buffer		buffer;
+	int16		oldest_buffer_index;
+
+	/*
+	 * A fast path for all-cached scans (behavior A).  This is the same as the
+	 * usual algorithm, but it is specialized for no I/O and no per-buffer
+	 * data, so we can skip the queue management code, stay in the same buffer
+	 * slot and use singular StartReadBuffer().
+	 */
+	if (likely(per_buffer_data == NULL &&
+			   stream->ios_in_progress == 0 &&
+			   stream->pinned_buffers == 1 &&
+			   stream->distance == 1))
+	{
+		BlockNumber next_blocknum;
+
+		/*
+		 * We have a pinned buffer that we need to serve up, but we also want
+		 * to probe the next one before we return, just in case we need to
+		 * start an I/O.  We can re-use the same buffer slot, and an arbitrary
+		 * I/O slot since they're all free.
+		 */
+		oldest_buffer_index = stream->oldest_buffer_index;
+		Assert((oldest_buffer_index + 1) % stream->queue_size ==
+			   stream->next_buffer_index);
+		buffer = stream->buffers[oldest_buffer_index];
+		Assert(buffer != InvalidBuffer);
+		Assert(stream->pending_read_nblocks <= 1);
+		if (unlikely(stream->pending_read_nblocks == 1))
+		{
+			next_blocknum = stream->pending_read_blocknum;
+			stream->pending_read_nblocks = 0;
+		}
+		else
+			next_blocknum = read_stream_get_block(stream, NULL);
+		if (unlikely(next_blocknum == InvalidBlockNumber))
+		{
+			/* End of stream. */
+			stream->distance = 0;
+			stream->next_buffer_index = oldest_buffer_index;
+			/* Pin transferred to caller. */
+			stream->pinned_buffers = 0;
+			return buffer;
+		}
+		/* Call the special single block version, which is marginally faster. */
+		if (unlikely(StartReadBuffer(&stream->ios[0].op,
+									 &stream->buffers[oldest_buffer_index],
+									 next_blocknum,
+									 stream->advice_enabled ?
+									 READ_BUFFERS_ISSUE_ADVICE : 0)))
+		{
+			/* I/O needed.  We'll take the general path next time. */
+			stream->oldest_io_index = 0;
+			stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
+			stream->ios_in_progress = 1;
+			stream->ios[0].buffer_index = oldest_buffer_index;
+			stream->seq_blocknum = next_blocknum + 1;
+			/* Increase look ahead distance (move towards behavior B/C). */
+			stream->distance = Min(2, stream->max_pinned_buffers);
+		}
+		/* Pin transferred to caller, got another one, no net change. */
+		Assert(stream->pinned_buffers == 1);
+		return buffer;
+	}
+
+	if (stream->pinned_buffers == 0)
+	{
+		Assert(stream->oldest_buffer_index == stream->next_buffer_index);
+
+		/* End of stream reached?  */
+		if (stream->distance == 0)
+			return InvalidBuffer;
+
+		/*
+		 * The usual order of operations is that we look ahead at the bottom
+		 * of this function after potentially finishing an I/O and making
+		 * space for more, but if we're just starting up we'll need to crank
+		 * the handle to get started.
+		 */
+		read_stream_look_ahead(stream, true);
+
+		/* End of stream reached? */
+		if (stream->pinned_buffers == 0)
+		{
+			Assert(stream->distance == 0);
+			return InvalidBuffer;
+		}
+	}
+
+	/* Grab the oldest pinned buffer and associated per-buffer data. */
+	Assert(stream->pinned_buffers > 0);
+	oldest_buffer_index = stream->oldest_buffer_index;
+	Assert(oldest_buffer_index >= 0 &&
+		   oldest_buffer_index < stream->queue_size);
+	buffer = stream->buffers[oldest_buffer_index];
+	if (per_buffer_data)
+		*per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
+
+	Assert(BufferIsValid(buffer));
+
+	/* Do we have to wait for an associated I/O first? */
+	if (stream->ios_in_progress > 0 &&
+		stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
+	{
+		int16		io_index = stream->oldest_io_index;
+		int16		distance;
+
+		/* Sanity check that we still agree on the buffers. */
+		Assert(stream->ios[io_index].op.buffers ==
+			   &stream->buffers[oldest_buffer_index]);
+
+		WaitReadBuffers(&stream->ios[io_index].op);
+
+		Assert(stream->ios_in_progress > 0);
+		stream->ios_in_progress--;
+		if (++stream->oldest_io_index == stream->max_ios)
+			stream->oldest_io_index = 0;
+
+		if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
+		{
+			/* Distance ramps up fast (behavior C). */
+			distance = stream->distance * 2;
+			distance = Min(distance, stream->max_pinned_buffers);
+			stream->distance = distance;
+		}
+		else
+		{
+			/* No advice; move towards io_combine_limit (behavior B). */
+			if (stream->distance > io_combine_limit)
+			{
+				stream->distance--;
+			}
+			else
+			{
+				distance = stream->distance * 2;
+				distance = Min(distance, io_combine_limit);
+				distance = Min(distance, stream->max_pinned_buffers);
+				stream->distance = distance;
+			}
+		}
+	}
+
+#ifdef CLOBBER_FREED_MEMORY
+	/* Clobber old buffer and per-buffer data for debugging purposes. */
+	stream->buffers[oldest_buffer_index] = InvalidBuffer;
+
+	/*
+	 * The caller will get access to the per-buffer data, until the next call.
+	 * We wipe the one before, which is never occupied because queue_size
+	 * allowed one extra element.  This will hopefully trip up client code
+	 * that is holding a dangling pointer to it.
+	 */
+	if (stream->per_buffer_data)
+		wipe_mem(get_per_buffer_data(stream,
+									 oldest_buffer_index == 0 ?
+									 stream->queue_size - 1 :
+									 oldest_buffer_index - 1),
+				 stream->per_buffer_data_size);
+#endif
+
+	/* Pin transferred to caller. */
+	Assert(stream->pinned_buffers > 0);
+	stream->pinned_buffers--;
+
+	/* Advance oldest buffer, with wrap-around. */
+	stream->oldest_buffer_index++;
+	if (stream->oldest_buffer_index == stream->queue_size)
+		stream->oldest_buffer_index = 0;
+
+	/* Prepare for the next call. */
+	read_stream_look_ahead(stream, false);
+
+	return buffer;
+}
+
+/*
+ * Reset a read stream by releasing any queued up buffers, allowing the stream
+ * to be used again for different blocks.  This can be used to clear an
+ * end-of-stream condition and start again, or to throw away blocks that were
+ * speculatively read and read some different blocks instead.
+ */
+void
+read_stream_reset(ReadStream *stream)
+{
+	Buffer		buffer;
+
+	/* Stop looking ahead. */
+	stream->distance = 0;
+
+	/* Unpin anything that wasn't consumed. */
+	while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
+		ReleaseBuffer(buffer);
+
+	Assert(stream->pinned_buffers == 0);
+	Assert(stream->ios_in_progress == 0);
+
+	/* Start off assuming data is cached. */
+	stream->distance = 1;
+}
+
+/*
+ * Release and free a read stream.
+ */
+void
+read_stream_end(ReadStream *stream)
+{
+	read_stream_reset(stream);
+	pfree(stream);
+}
diff --git a/src/backend/storage/meson.build b/src/backend/storage/meson.build
index 40345bdca2..739d13293f 100644
--- a/src/backend/storage/meson.build
+++ b/src/backend/storage/meson.build
@@ -1,5 +1,6 @@
 # Copyright (c) 2022-2024, PostgreSQL Global Development Group
 
+subdir('aio')
 subdir('buffer')
 subdir('file')
 subdir('freespace')
diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
new file mode 100644
index 0000000000..f5dbc087b0
--- /dev/null
+++ b/src/include/storage/read_stream.h
@@ -0,0 +1,63 @@
+/*-------------------------------------------------------------------------
+ *
+ * read_stream.h
+ *	  Mechanism for accessing buffered relation data with look-ahead
+ *
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/read_stream.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef READ_STREAM_H
+#define READ_STREAM_H
+
+#include "storage/bufmgr.h"
+
+/* Default tuning, reasonable for many users. */
+#define READ_STREAM_DEFAULT 0x00
+
+/*
+ * I/O streams that are performing maintenance work on behalf of potentially
+ * many users, and thus should be governed by maintenance_io_concurrency
+ * instead of effective_io_concurrency.  For example, VACUUM or CREATE INDEX.
+ */
+#define READ_STREAM_MAINTENANCE 0x01
+
+/*
+ * We usually avoid issuing prefetch advice automatically when sequential
+ * access is detected, but this flag explicitly disables it, for cases that
+ * might not be correctly detected.  Explicit advice is known to perform worse
+ * than letting the kernel (at least Linux) detect sequential access.
+ */
+#define READ_STREAM_SEQUENTIAL 0x02
+
+/*
+ * We usually ramp up from smaller reads to larger ones, to support users who
+ * don't know if it's worth reading lots of buffers yet.  This flag disables
+ * that, declaring ahead of time that we'll be reading all available buffers.
+ */
+#define READ_STREAM_FULL 0x04
+
+struct ReadStream;
+typedef struct ReadStream ReadStream;
+
+/* Callback that returns the next block number to read. */
+typedef BlockNumber (*ReadStreamBlockNumberCB) (ReadStream *stream,
+												void *callback_private_data,
+												void *per_buffer_data);
+
+extern ReadStream *read_stream_begin_relation(int flags,
+											  BufferAccessStrategy strategy,
+											  BufferManagerRelation bmr,
+											  ForkNumber forknum,
+											  ReadStreamBlockNumberCB callback,
+											  void *callback_private_data,
+											  size_t per_buffer_data_size);
+extern Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_private);
+extern void read_stream_reset(ReadStream *stream);
+extern void read_stream_end(ReadStream *stream);
+
+#endif							/* READ_STREAM_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 97edd1388e..82fa6c1297 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1214,6 +1214,7 @@ InjectionPointCacheEntry
 InjectionPointEntry
 InjectionPointSharedState
 InlineCodeBlock
+InProgressIO
 InsertStmt
 Instrumentation
 Int128AggState
@@ -2292,6 +2293,7 @@ ReadExtraTocPtrType
 ReadFunc
 ReadLocalXLogPageNoWaitPrivate
 ReadReplicationSlotCmd
+ReadStream
 ReassignOwnedStmt
 RecheckForeignScan_function
 RecordCacheArrayEntry
-- 
2.39.3 (Apple Git-146)

