On Tue, Apr 1, 2025 at 7:21 AM Nazir Bilal Yavuz <byavu...@gmail.com> wrote:
>
> On Tue, 1 Apr 2025 at 05:14, Melanie Plageman <melanieplage...@gmail.com> 
> wrote:
> >
> +1 for using the functions. I think it is hard to follow / maintain
> this with the do-while loops and goto statements.

I'll take a look at your downthread proposal in a bit.

But the attached patch is a new version of what I proposed with the
functions. It's still not totally correct, but I wanted to see what
you thought.

> > But the explicit looping for skipping the bad blocks and the nested
> > loops for rel and fork -- I think these are less error prone.
>
> One question in my mind is, the outermost loop stops when the database
> changes, we do not check if it is changed from the database oid = 0.
> Handling this might require some structural changes.

I don't understand why each database has global objects at the
beginning. If there are global objects, they are global to all
databases, so surely the sort function would have put them all at the
beginning? One problem is we need a database connection to prewarm
these, but if the global objects are all at the beginning, then maybe
we can handle those with a special case and not force ourselves to
check for them when trying to load blocks from every database.

- Melanie
From f1c98580825acc63538372e9f67f6ed2bf68b540 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Mon, 31 Mar 2025 22:02:25 -0400
Subject: [PATCH] pgsr autoprewarm

---
 contrib/pg_prewarm/autoprewarm.c | 244 ++++++++++++++++++++-----------
 1 file changed, 160 insertions(+), 84 deletions(-)

diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c
index 73485a2323c..fd3fb228715 100644
--- a/contrib/pg_prewarm/autoprewarm.c
+++ b/contrib/pg_prewarm/autoprewarm.c
@@ -41,6 +41,7 @@
 #include "storage/latch.h"
 #include "storage/lwlock.h"
 #include "storage/procsignal.h"
+#include "storage/read_stream.h"
 #include "storage/smgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/guc.h"
@@ -421,6 +422,96 @@ apw_load_buffers(void)
 				(errmsg("autoprewarm successfully prewarmed %d of %d previously-loaded blocks",
 						apw_state->prewarmed_blocks, num_elements)));
 }
+struct apw_read_stream_private
+{
+	BlockInfoRecord *block_info;
+	int			pos;
+	Oid			database;
+	RelFileNumber filenumber;
+	ForkNumber	forknum;
+	BlockNumber nblocks;
+};
+
+static BlockNumber
+awp_read_stream_next_block(ReadStream *stream,
+						   void *callback_private_data,
+						   void *per_buffer_data)
+{
+	struct apw_read_stream_private *p = callback_private_data;
+
+	for (int i; (i = p->pos++) < apw_state->prewarm_stop_idx;)
+	{
+		BlockInfoRecord cur_blk = p->block_info[i];
+
+		CHECK_FOR_INTERRUPTS();
+
+		if (!have_free_buffer())
+		{
+			p->pos = apw_state->prewarm_stop_idx;
+			return InvalidBlockNumber;
+		}
+
+		if (cur_blk.database != p->database)
+			return InvalidBlockNumber;
+
+		if (cur_blk.filenumber != p->filenumber)
+			return InvalidBlockNumber;
+
+		if (cur_blk.forknum != p->forknum)
+			return InvalidBlockNumber;
+
+		if (cur_blk.blocknum >= p->nblocks)
+			continue;
+
+		return cur_blk.blocknum;
+	}
+
+	return InvalidBlockNumber;
+}
+
+static int
+next_fork_idx(BlockInfoRecord *block_info, int start, int stop,
+			  Oid database, RelFileNumber filenumber, ForkNumber forknum)
+{
+	Assert(block_info[start].forknum == forknum);
+
+	for (int i = start; i < stop; i++)
+	{
+		BlockInfoRecord *blk = &block_info[i];
+
+		if (blk->database != database)
+			return i;
+
+		if (blk->filenumber != filenumber)
+			return i;
+
+		if (blk->forknum != forknum)
+			return i;
+	}
+
+	return stop;
+}
+
+static int
+next_rel_idx(BlockInfoRecord *block_info, int start, int stop,
+			 Oid database, RelFileNumber filenumber)
+{
+	Assert(block_info[start].filenumber == filenumber);
+
+	for (int i = start; i < stop; i++)
+	{
+		BlockInfoRecord *blk = &block_info[i];
+
+		if (blk->database != database)
+			return i;
+
+		if (blk->filenumber != filenumber)
+			return i;
+	}
+
+	return stop;
+}
+
 
 /*
  * Prewarm all blocks for one database (and possibly also global objects, if
@@ -429,12 +520,11 @@ apw_load_buffers(void)
 void
 autoprewarm_database_main(Datum main_arg)
 {
-	int			pos;
 	BlockInfoRecord *block_info;
-	Relation	rel = NULL;
-	BlockNumber nblocks = 0;
-	BlockInfoRecord *old_blk = NULL;
+	int			i;
+	BlockInfoRecord *blk;
 	dsm_segment *seg;
+	Oid			database;
 
 	/* Establish signal handlers; once that's done, unblock signals. */
 	pqsignal(SIGTERM, die);
@@ -449,108 +539,94 @@ autoprewarm_database_main(Datum main_arg)
 				 errmsg("could not map dynamic shared memory segment")));
 	BackgroundWorkerInitializeConnectionByOid(apw_state->database, InvalidOid, 0);
 	block_info = (BlockInfoRecord *) dsm_segment_address(seg);
-	pos = apw_state->prewarm_start_idx;
 
-	/*
-	 * Loop until we run out of blocks to prewarm or until we run out of free
-	 * buffers.
-	 */
-	while (pos < apw_state->prewarm_stop_idx && have_free_buffer())
-	{
-		BlockInfoRecord *blk = &block_info[pos++];
-		Buffer		buf;
+	i = apw_state->prewarm_start_idx;
+	blk = &block_info[i];
+	database = blk->database;
 
-		CHECK_FOR_INTERRUPTS();
+	while (i < apw_state->prewarm_stop_idx &&
+		   blk->database == database)
+	{
+		RelFileNumber filenumber = blk->filenumber;
+		Oid			reloid;
+		Relation	rel;
 
-		/*
-		 * Quit if we've reached records for another database. If previous
-		 * blocks are of some global objects, then continue pre-warming.
-		 */
-		if (old_blk != NULL && old_blk->database != blk->database &&
-			old_blk->database != 0)
-			break;
+		StartTransactionCommand();
 
-		/*
-		 * As soon as we encounter a block of a new relation, close the old
-		 * relation. Note that rel will be NULL if try_relation_open failed
-		 * previously; in that case, there is nothing to close.
-		 */
-		if (old_blk != NULL && old_blk->filenumber != blk->filenumber &&
-			rel != NULL)
+		reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber);
+		if (!OidIsValid(reloid) ||
+			(OidIsValid(reloid) &&
+			 (rel = try_relation_open(reloid, AccessShareLock)) == NULL))
 		{
-			relation_close(rel, AccessShareLock);
-			rel = NULL;
 			CommitTransactionCommand();
-		}
-
-		/*
-		 * Try to open each new relation, but only once, when we first
-		 * encounter it. If it's been dropped, skip the associated blocks.
-		 */
-		if (old_blk == NULL || old_blk->filenumber != blk->filenumber)
-		{
-			Oid			reloid;
-
-			Assert(rel == NULL);
-			StartTransactionCommand();
-			reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber);
-			if (OidIsValid(reloid))
-				rel = try_relation_open(reloid, AccessShareLock);
-
-			if (!rel)
-				CommitTransactionCommand();
-		}
-		if (!rel)
-		{
-			old_blk = blk;
+			i = next_rel_idx(block_info, i, apw_state->prewarm_stop_idx,
+							 blk->database, blk->filenumber);
+			blk = &block_info[i];
 			continue;
 		}
 
-		/* Once per fork, check for fork existence and size. */
-		if (old_blk == NULL ||
-			old_blk->filenumber != blk->filenumber ||
-			old_blk->forknum != blk->forknum)
+		while (i < apw_state->prewarm_stop_idx &&
+			   blk->database == database &&
+			   blk->filenumber == filenumber)
 		{
+			ForkNumber	forknum = blk->forknum;
+			BlockNumber nblocks;
+			struct apw_read_stream_private p;
+			ReadStream *stream;
+			Buffer		buf;
+
 			/*
 			 * smgrexists is not safe for illegal forknum, hence check whether
 			 * the passed forknum is valid before using it in smgrexists.
 			 */
-			if (blk->forknum > InvalidForkNumber &&
-				blk->forknum <= MAX_FORKNUM &&
-				smgrexists(RelationGetSmgr(rel), blk->forknum))
-				nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
-			else
-				nblocks = 0;
-		}
+			if (blk->forknum <= InvalidForkNumber ||
+				blk->forknum > MAX_FORKNUM ||
+				!smgrexists(RelationGetSmgr(rel), blk->forknum))
+			{
+				i = next_fork_idx(block_info, i, apw_state->prewarm_stop_idx,
+								  blk->database, blk->filenumber, blk->forknum);
+				blk = &block_info[i];
+				continue;
+			}
 
-		/* Check whether blocknum is valid and within fork file size. */
-		if (blk->blocknum >= nblocks)
-		{
-			/* Move to next forknum. */
-			old_blk = blk;
-			continue;
-		}
+			nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
 
-		/* Prewarm buffer. */
-		buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL,
-								 NULL);
-		if (BufferIsValid(buf))
-		{
-			apw_state->prewarmed_blocks++;
-			ReleaseBuffer(buf);
-		}
+			p = (struct apw_read_stream_private)
+			{
+				.block_info = block_info,
+					.pos = i,
+					.database = database,
+					.filenumber = filenumber,
+					.forknum = forknum,
+					.nblocks = nblocks,
+			};
+
+			stream = read_stream_begin_relation(READ_STREAM_FULL,
+												NULL,
+												rel,
+												p.forknum,
+												awp_read_stream_next_block,
+												&p,
+												0);
+
+			while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
+			{
+				apw_state->prewarmed_blocks++;
+				ReleaseBuffer(buf);
+			}
 
-		old_blk = blk;
-	}
+			read_stream_end(stream);
 
-	dsm_detach(seg);
+			i = p.pos;
+			blk = &block_info[i];
+		}
 
-	/* Release lock on previous relation. */
-	if (rel)
-	{
+		/* Release lock on previous relation. */
 		relation_close(rel, AccessShareLock);
 		CommitTransactionCommand();
 	}
+
+	dsm_detach(seg);
 }
 
 /*
-- 
2.34.1

Reply via email to