On Mon, Mar 31, 2025 at 3:45 PM Melanie Plageman
<melanieplage...@gmail.com> wrote:
>
> Whoops, this isn't right. It does work. I'm going to draft a version
> suggesting slightly different variable naming and a couple comments to
> make this more clear.

Okay, so after further study, I think there are multiple issues still
with the code. We could end up comparing a blocknumber to nblocks
calculated from a different fork. To address this, you'll need to keep
track of the last fork_number. At that point, you kind of have to
bring back old_blk -- because that is what we are recreating with
multiple local variables.

But, I think, overall, what we actually want to do is actually be
really explicit about fast-forwarding in the failure cases (when we
want to skip ahead because a relation is invalid or a fork is
invalid). We were trying to use the main loop control and just add
special cases to allow us to do this fast-forwarding. But, I think
instead, we want to just go to a function or loop somewhere and fast
forward through those bad blocks until we get to the next run of
blocks from a different relation or fork.

I've sketched out an idea like this in the attached. I don't think it
is 100% correct. It does pass tests, but I think we might incorrectly
advance pos twice after skipping a run of blocks belonging to a bad
fork or relation -- and thus skip the first good block after some bad
blocks.

It also needs some more refactoring.

maybe instead of having the skip code like this
        skip_forknumber:;
            while ((blk = next_record(block_info, &i)) != NULL &&
blk->database == database && blk->filenumber == filenumber &&
blk->forknum == forknum);

we make it a function? to avoid the back-to-back while loop conditions
(because of the outer do while loop).

But the explicit looping for skipping the bad blocks and the nested
loops for rel and fork -- I think these are less error prone.

What do you think?

- Melanie
From 438f13072af060b30485b0dd871c1b26ee503513 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Mon, 31 Mar 2025 22:02:25 -0400
Subject: [PATCH] pgsr autoprewarm

---
 contrib/pg_prewarm/autoprewarm.c | 218 +++++++++++++++++++------------
 1 file changed, 131 insertions(+), 87 deletions(-)

diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c
index 73485a2323c..459d36a75a9 100644
--- a/contrib/pg_prewarm/autoprewarm.c
+++ b/contrib/pg_prewarm/autoprewarm.c
@@ -41,6 +41,7 @@
 #include "storage/latch.h"
 #include "storage/lwlock.h"
 #include "storage/procsignal.h"
+#include "storage/read_stream.h"
 #include "storage/smgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/guc.h"
@@ -421,6 +422,63 @@ apw_load_buffers(void)
 				(errmsg("autoprewarm successfully prewarmed %d of %d previously-loaded blocks",
 						apw_state->prewarmed_blocks, num_elements)));
 }
+struct apw_read_stream_private
+{
+	BlockInfoRecord *block_info;
+	int pos;
+	Oid			database;
+	RelFileNumber filenumber;
+	ForkNumber	forknum;
+	BlockNumber nblocks;
+};
+
+static BlockNumber
+awp_read_stream_next_block(ReadStream *stream,
+						   void *callback_private_data,
+						   void *per_buffer_data)
+{
+	struct apw_read_stream_private *p = callback_private_data;
+
+	for (int i; (i = p->pos++) < apw_state->prewarm_stop_idx;)
+	{
+		BlockInfoRecord cur_blk = p->block_info[i];
+
+		CHECK_FOR_INTERRUPTS();
+
+		if (!have_free_buffer())
+		{
+			p->pos = apw_state->prewarm_stop_idx;
+			return InvalidBlockNumber;
+		}
+
+		if (cur_blk.database != p->database)
+			return InvalidBlockNumber;
+
+		if (cur_blk.filenumber != p->filenumber)
+			return InvalidBlockNumber;
+
+		if (cur_blk.forknum != p->forknum)
+			return InvalidBlockNumber;
+
+		if (cur_blk.blocknum >= p->nblocks)
+			continue;
+
+		return cur_blk.blocknum;
+	}
+
+	return InvalidBlockNumber;
+}
+
+static inline BlockInfoRecord *next_record(BlockInfoRecord *block_info, int *pos)
+{
+	int oldpos;
+	if (*pos >= apw_state->prewarm_stop_idx)
+		return NULL;
+
+	oldpos = *pos;
+	(*pos)++;
+	return &block_info[oldpos];
+}
 
 /*
  * Prewarm all blocks for one database (and possibly also global objects, if
@@ -429,12 +487,11 @@ apw_load_buffers(void)
 void
 autoprewarm_database_main(Datum main_arg)
 {
-	int			pos;
 	BlockInfoRecord *block_info;
-	Relation	rel = NULL;
-	BlockNumber nblocks = 0;
-	BlockInfoRecord *old_blk = NULL;
+	int i;
+	BlockInfoRecord *blk;
 	dsm_segment *seg;
+	Oid			database;
 
 	/* Establish signal handlers; once that's done, unblock signals. */
 	pqsignal(SIGTERM, die);
@@ -449,108 +506,95 @@ autoprewarm_database_main(Datum main_arg)
 				 errmsg("could not map dynamic shared memory segment")));
 	BackgroundWorkerInitializeConnectionByOid(apw_state->database, InvalidOid, 0);
 	block_info = (BlockInfoRecord *) dsm_segment_address(seg);
-	pos = apw_state->prewarm_start_idx;
 
-	/*
-	 * Loop until we run out of blocks to prewarm or until we run out of free
-	 * buffers.
-	 */
-	while (pos < apw_state->prewarm_stop_idx && have_free_buffer())
-	{
-		BlockInfoRecord *blk = &block_info[pos++];
-		Buffer		buf;
+	i = apw_state->prewarm_start_idx;
+	blk = &block_info[i];
+	database = blk->database;
 
-		CHECK_FOR_INTERRUPTS();
-
-		/*
-		 * Quit if we've reached records for another database. If previous
-		 * blocks are of some global objects, then continue pre-warming.
-		 */
-		if (old_blk != NULL && old_blk->database != blk->database &&
-			old_blk->database != 0)
-			break;
+	do
+	{
+		RelFileNumber filenumber = blk->filenumber;
+		Oid reloid;
+		Relation rel;
 
-		/*
-		 * As soon as we encounter a block of a new relation, close the old
-		 * relation. Note that rel will be NULL if try_relation_open failed
-		 * previously; in that case, there is nothing to close.
-		 */
-		if (old_blk != NULL && old_blk->filenumber != blk->filenumber &&
-			rel != NULL)
-		{
-			relation_close(rel, AccessShareLock);
-			rel = NULL;
-			CommitTransactionCommand();
-		}
+		StartTransactionCommand();
 
-		/*
-		 * Try to open each new relation, but only once, when we first
-		 * encounter it. If it's been dropped, skip the associated blocks.
-		 */
-		if (old_blk == NULL || old_blk->filenumber != blk->filenumber)
-		{
-			Oid			reloid;
+		reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber);
+		if (!OidIsValid(reloid))
+			goto skip_relation;
 
-			Assert(rel == NULL);
-			StartTransactionCommand();
-			reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber);
-			if (OidIsValid(reloid))
-				rel = try_relation_open(reloid, AccessShareLock);
+		if ((rel = try_relation_open(reloid, AccessShareLock)) == NULL)
+			goto skip_relation;
 
-			if (!rel)
-				CommitTransactionCommand();
-		}
-		if (!rel)
+		do
 		{
-			old_blk = blk;
-			continue;
-		}
+			ForkNumber forknum = blk->forknum;
+			BlockNumber nblocks;
+			struct apw_read_stream_private p;
+			ReadStream *stream;
+			Buffer buf;
 
-		/* Once per fork, check for fork existence and size. */
-		if (old_blk == NULL ||
-			old_blk->filenumber != blk->filenumber ||
-			old_blk->forknum != blk->forknum)
-		{
 			/*
 			 * smgrexists is not safe for illegal forknum, hence check whether
 			 * the passed forknum is valid before using it in smgrexists.
 			 */
-			if (blk->forknum > InvalidForkNumber &&
-				blk->forknum <= MAX_FORKNUM &&
-				smgrexists(RelationGetSmgr(rel), blk->forknum))
-				nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
-			else
-				nblocks = 0;
-		}
+			if (blk->forknum <= InvalidForkNumber)
+				goto skip_forknumber;
 
-		/* Check whether blocknum is valid and within fork file size. */
-		if (blk->blocknum >= nblocks)
-		{
-			/* Move to next forknum. */
-			old_blk = blk;
-			continue;
-		}
+			if (blk->forknum > MAX_FORKNUM)
+				goto skip_forknumber;
 
-		/* Prewarm buffer. */
-		buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL,
-								 NULL);
-		if (BufferIsValid(buf))
-		{
-			apw_state->prewarmed_blocks++;
-			ReleaseBuffer(buf);
-		}
+			if (!smgrexists(RelationGetSmgr(rel), blk->forknum))
+				goto skip_forknumber;
 
-		old_blk = blk;
-	}
+			nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
 
-	dsm_detach(seg);
+			p = (struct apw_read_stream_private)
+			{
+				.block_info = block_info,
+				.pos = i,
+				.database = database,
+				.filenumber = filenumber,
+				.forknum = forknum,
+				.nblocks = nblocks,
+			};
+
+			stream = read_stream_begin_relation(READ_STREAM_FULL,
+												NULL,
+												rel,
+												p.forknum,
+												awp_read_stream_next_block,
+												&p,
+												0);
+
+			while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
+			{
+				apw_state->prewarmed_blocks++;
+				ReleaseBuffer(buf);
+			}
 
-	/* Release lock on previous relation. */
-	if (rel)
-	{
+			i = p.pos;
+
+			read_stream_end(stream);
+
+			continue;
+
+		skip_forknumber:;
+			while ((blk = next_record(block_info, &i)) != NULL && blk->database == database && blk->filenumber == filenumber && blk->forknum == forknum);
+		} while ((blk = next_record(block_info, &i)) && blk->database == database && blk->filenumber == filenumber);
+
+		/* Release lock on previous relation. */
 		relation_close(rel, AccessShareLock);
 		CommitTransactionCommand();
-	}
+
+		continue;
+
+	skip_relation:
+		CommitTransactionCommand();
+		while ((blk = next_record(block_info, &i)) != NULL && blk->database == database && blk->filenumber == filenumber);
+	} while ((blk = next_record(block_info, &i)) && blk->database == database);
+
+	dsm_detach(seg);
 }
 
 /*
-- 
2.34.1

Reply via email to