From 7db3df60698da30bfec4b8cca76342dc6e779f81 Mon Sep 17 00:00:00 2001
From: Nazir Bilal Yavuz <byavuz81@gmail.com>
Date: Sat, 29 Mar 2025 20:17:42 +0300
Subject: [PATCH v8 1/3] Optimize autoprewarm with read streams

We've measured 10% performance improvement, and this arranges to benefit
automatically from future optimizations to the read_stream subsystem.
---
 contrib/pg_prewarm/autoprewarm.c | 201 +++++++++++++++++++++++--------
 1 file changed, 148 insertions(+), 53 deletions(-)

diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c
index 73485a2323c..7154303f0ba 100644
--- a/contrib/pg_prewarm/autoprewarm.c
+++ b/contrib/pg_prewarm/autoprewarm.c
@@ -41,6 +41,7 @@
 #include "storage/latch.h"
 #include "storage/lwlock.h"
 #include "storage/procsignal.h"
+#include "storage/read_stream.h"
 #include "storage/smgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/guc.h"
@@ -421,6 +422,93 @@ apw_load_buffers(void)
 				(errmsg("autoprewarm successfully prewarmed %d of %d previously-loaded blocks",
 						apw_state->prewarmed_blocks, num_elements)));
 }
+struct apw_read_stream_private
+{
+	BlockInfoRecord *block_info;
+	BlockNumber nblocks;
+	BlockNumber max_pos;
+	BlockNumber pos;
+	Oid			cur_database;
+	ForkNumber	cur_forknum;
+	RelFileNumber cur_filenumber;
+};
+
+static BlockNumber
+awp_read_stream_next_block(ReadStream *stream,
+						   void *callback_private_data,
+						   void *per_buffer_data)
+{
+	struct apw_read_stream_private *p = callback_private_data;
+	BlockInfoRecord cur_blk = p->block_info[p->pos];
+	BlockNumber blocknum = InvalidBlockNumber;
+
+	if (have_free_buffer() &&
+		p->pos < p->max_pos &&
+		cur_blk.blocknum < p->nblocks &&
+		cur_blk.database == p->cur_database &&
+		cur_blk.filenumber == p->cur_filenumber &&
+		cur_blk.forknum == p->cur_forknum)
+	{
+		blocknum = cur_blk.blocknum;
+	}
+
+	(p->pos)++;
+	return blocknum;
+}
+
+/*
+ * Helper function to prewarm buffers in a relation by using read streams.
+ */
+static unsigned int
+autoprewarm_prewarm_relation(Relation rel,
+							 BlockNumber pos,
+							 BlockNumber max_pos,
+							 BlockNumber nblocks_in_fork,
+							 BlockInfoRecord *block_info)
+{
+	struct apw_read_stream_private p;
+	ReadStream *stream;
+	unsigned int blocks_done = 0;
+	BlockInfoRecord first_block = block_info[pos];
+
+	p.pos = pos;
+	p.max_pos = max_pos;
+	p.block_info = block_info;
+	p.nblocks = nblocks_in_fork;
+	p.cur_database = first_block.database;
+	p.cur_forknum = first_block.forknum;
+	p.cur_filenumber = first_block.filenumber;
+
+	stream = read_stream_begin_relation(READ_STREAM_FULL,
+										NULL,
+										rel,
+										first_block.forknum,
+										awp_read_stream_next_block,
+										&p,
+										0);
+
+	while (true)
+	{
+		Buffer		buf;
+
+		CHECK_FOR_INTERRUPTS();
+
+		buf = read_stream_next_buffer(stream, NULL);
+		if (BufferIsValid(buf))
+		{
+			ReleaseBuffer(buf);
+			++blocks_done;
+		}
+		else
+		{
+			Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+			read_stream_end(stream);
+			break;
+		}
+	}
+
+	return blocks_done;
+}
 
 /*
  * Prewarm all blocks for one database (and possibly also global objects, if
@@ -432,9 +520,9 @@ autoprewarm_database_main(Datum main_arg)
 	int			pos;
 	BlockInfoRecord *block_info;
 	Relation	rel = NULL;
-	BlockNumber nblocks = 0;
-	BlockInfoRecord *old_blk = NULL;
 	dsm_segment *seg;
+	Oid			cur_database;
+	BlockNumber nblocks_in_fork;
 
 	/* Establish signal handlers; once that's done, unblock signals. */
 	pqsignal(SIGTERM, die);
@@ -451,32 +539,34 @@ autoprewarm_database_main(Datum main_arg)
 	block_info = (BlockInfoRecord *) dsm_segment_address(seg);
 	pos = apw_state->prewarm_start_idx;
 
+	cur_database = block_info[pos].database;
+
 	/*
 	 * Loop until we run out of blocks to prewarm or until we run out of free
 	 * buffers.
 	 */
 	while (pos < apw_state->prewarm_stop_idx && have_free_buffer())
 	{
-		BlockInfoRecord *blk = &block_info[pos++];
-		Buffer		buf;
-
-		CHECK_FOR_INTERRUPTS();
+		BlockInfoRecord *blk = &block_info[pos];
 
 		/*
 		 * Quit if we've reached records for another database. If previous
 		 * blocks are of some global objects, then continue pre-warming.
 		 */
-		if (old_blk != NULL && old_blk->database != blk->database &&
-			old_blk->database != 0)
-			break;
+		if (cur_database != blk->database)
+		{
+			if (cur_database == 0)
+				cur_database = blk->database;
+			else
+				break;
+		}
 
 		/*
-		 * As soon as we encounter a block of a new relation, close the old
-		 * relation. Note that rel will be NULL if try_relation_open failed
-		 * previously; in that case, there is nothing to close.
+		 * Close the old relation. Note that rel will be NULL if
+		 * try_relation_open failed previously; in that case, there is nothing
+		 * to close.
 		 */
-		if (old_blk != NULL && old_blk->filenumber != blk->filenumber &&
-			rel != NULL)
+		if (rel)
 		{
 			relation_close(rel, AccessShareLock);
 			rel = NULL;
@@ -487,60 +577,65 @@ autoprewarm_database_main(Datum main_arg)
 		 * Try to open each new relation, but only once, when we first
 		 * encounter it. If it's been dropped, skip the associated blocks.
 		 */
-		if (old_blk == NULL || old_blk->filenumber != blk->filenumber)
+		if (!rel)
 		{
 			Oid			reloid;
 
-			Assert(rel == NULL);
 			StartTransactionCommand();
 			reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber);
 			if (OidIsValid(reloid))
 				rel = try_relation_open(reloid, AccessShareLock);
 
 			if (!rel)
+			{
 				CommitTransactionCommand();
-		}
-		if (!rel)
-		{
-			old_blk = blk;
-			continue;
-		}
 
-		/* Once per fork, check for fork existence and size. */
-		if (old_blk == NULL ||
-			old_blk->filenumber != blk->filenumber ||
-			old_blk->forknum != blk->forknum)
-		{
-			/*
-			 * smgrexists is not safe for illegal forknum, hence check whether
-			 * the passed forknum is valid before using it in smgrexists.
-			 */
-			if (blk->forknum > InvalidForkNumber &&
-				blk->forknum <= MAX_FORKNUM &&
-				smgrexists(RelationGetSmgr(rel), blk->forknum))
-				nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
-			else
-				nblocks = 0;
-		}
+				/* Move to next filenumber. */
+				while (true)
+				{
+					BlockInfoRecord cur_blk = block_info[pos++];
 
-		/* Check whether blocknum is valid and within fork file size. */
-		if (blk->blocknum >= nblocks)
-		{
-			/* Move to next forknum. */
-			old_blk = blk;
-			continue;
-		}
+					if (cur_blk.database != blk->database ||
+						cur_blk.filenumber != blk->filenumber)
+						break;
+				}
 
-		/* Prewarm buffer. */
-		buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL,
-								 NULL);
-		if (BufferIsValid(buf))
-		{
-			apw_state->prewarmed_blocks++;
-			ReleaseBuffer(buf);
+				continue;
+			}
+			else if (smgrexists(RelationGetSmgr(rel), blk->forknum))
+			{
+				unsigned int nblocks_processed;
+
+				nblocks_in_fork = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
+				nblocks_processed = autoprewarm_prewarm_relation(rel,
+																 pos,
+																 stop_idx,
+																 nblocks_in_fork,
+																 block_info);
+
+				apw_state->prewarmed_blocks += nblocks_processed;
+
+				/* Move pos forward by at least one. */
+				pos += Max(nblocks_processed, 1);
+
+				/* Move to next forknum. */
+				while (true)
+				{
+					BlockInfoRecord cur_blk = block_info[pos];
+
+					if (cur_blk.database == blk->database &&
+						cur_blk.filenumber == blk->filenumber &&
+						cur_blk.forknum == blk->forknum)
+						pos++;
+					else
+						break;
+				}
+
+				continue;
+			}
 		}
 
-		old_blk = blk;
+		pos++;
 	}
 
 	dsm_detach(seg);
-- 
2.43.0

