On Tue, Apr 1, 2025 at 8:50 AM Nazir Bilal Yavuz <byavu...@gmail.com> wrote:
>
> I am attaching v8, which is an updated version of the v7. I tried to
> get rid of these local variables and refactored code to make logic
> more straightforward instead of going back and forth.
>
> 0001 and 0002 are v8. 0003 is another refactoring attempt to make code
> more straightforward. I did not squash 0003 to previous patches as you
> might not like it.

I looked at the code on your github branch that has all three of these
squashed together.
I think our approaches are converging. I like that you are
fast-forwarding to the next filenumber or fork number explicitly when
there is a bad relation or fork. I've changed my version (see newest
one attached) to do the fast-forwarding inline instead of in a
separate function like yours (the function didn't save many LOC and
actually may have added to cognitive overhead).

Compared to my version, I think you avoided one level of loop nesting with your

        if (!rel)
        else if (smgrexists(RelationGetSmgr(rel), blk->forknum))
        else

but for starters, I don't think you can do this:

        else if (smgrexists(RelationGetSmgr(rel), blk->forknum))

because you didn't check if you have a legal forknum first

And, I actually kind of prefer the explicitly nested structure

loop through all relations
      loop through all forks
            loop through all buffers

While in the old structure, I liked your
autoprewarm_prewarm_relation() function, but I think it is nicer
inlined like in my version. It makes the loop through all buffers
explicit too.

I know you mentioned off-list that you don't like the handling of
global objects in my version, but I prefer doing it this way (even
though we have to check for in the loop condition) to having to set
the current database once we reach non-shared objects. It feels too
fiddly. This way seems less error prone. Looking at this version, what
do you think? Could we do it better?

Let me know what you think of this version. I think it is the best of
both our approaches. I've separated it into two commits -- the first
does all the refactoring without using the read stream API and the
second one uses the read stream API.

On another topic, what are the minimal places we need to call
have_free_buffers() (in this version)? I haven't even started looking
at the last patch you've been sending that is about checking the
freelist. I'll have to do that next.

- Melanie
From 2d14f37ea824a6ce3605f53b0a9c5622f08bf6e7 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Mon, 31 Mar 2025 22:02:25 -0400
Subject: [PATCH 1/2] Refactor autoprewarm_database_main() in preparation for
 read stream

TODO: write a commit message

Co-authored-by: Nazir Bilal Yavuz <byavu...@gmail.com>
Co-authored-by: Melanie Plageman <melanieplage...@gmail.com>
Discussion: https://postgr.es/m/flat/CAN55FZ3n8Gd%2BhajbL%3D5UkGzu_aHGRqnn%2BxktXq2fuds%3D1AOR6Q%40mail.gmail.com
---
 contrib/pg_prewarm/autoprewarm.c | 166 +++++++++++++++++--------------
 1 file changed, 89 insertions(+), 77 deletions(-)

diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c
index 73485a2323c..84072587ea0 100644
--- a/contrib/pg_prewarm/autoprewarm.c
+++ b/contrib/pg_prewarm/autoprewarm.c
@@ -429,12 +429,11 @@ apw_load_buffers(void)
 void
 autoprewarm_database_main(Datum main_arg)
 {
-	int			pos;
 	BlockInfoRecord *block_info;
-	Relation	rel = NULL;
-	BlockNumber nblocks = 0;
-	BlockInfoRecord *old_blk = NULL;
+	int			i;
+	BlockInfoRecord *blk;
 	dsm_segment *seg;
+	Oid			database;
 
 	/* Establish signal handlers; once that's done, unblock signals. */
 	pqsignal(SIGTERM, die);
@@ -449,108 +448,121 @@ autoprewarm_database_main(Datum main_arg)
 				 errmsg("could not map dynamic shared memory segment")));
 	BackgroundWorkerInitializeConnectionByOid(apw_state->database, InvalidOid, 0);
 	block_info = (BlockInfoRecord *) dsm_segment_address(seg);
-	pos = apw_state->prewarm_start_idx;
+
+	i = apw_state->prewarm_start_idx;
+	database = apw_state->database;
+
+	blk = &block_info[i];
 
 	/*
 	 * Loop until we run out of blocks to prewarm or until we run out of free
-	 * buffers.
+	 * buffers. We'll quit if we've reached records for another database,
+	 * however we do want to prewarm any global objects.
 	 */
-	while (pos < apw_state->prewarm_stop_idx && have_free_buffer())
+	while (i < apw_state->prewarm_stop_idx &&
+		   (blk->database == database || blk->database == 0) &&
+		   have_free_buffer())
 	{
-		BlockInfoRecord *blk = &block_info[pos++];
-		Buffer		buf;
+		RelFileNumber filenumber = blk->filenumber;
+		Oid			reloid;
+		Relation	rel;
 
 		CHECK_FOR_INTERRUPTS();
 
-		/*
-		 * Quit if we've reached records for another database. If previous
-		 * blocks are of some global objects, then continue pre-warming.
-		 */
-		if (old_blk != NULL && old_blk->database != blk->database &&
-			old_blk->database != 0)
-			break;
+		StartTransactionCommand();
 
-		/*
-		 * As soon as we encounter a block of a new relation, close the old
-		 * relation. Note that rel will be NULL if try_relation_open failed
-		 * previously; in that case, there is nothing to close.
-		 */
-		if (old_blk != NULL && old_blk->filenumber != blk->filenumber &&
-			rel != NULL)
+		reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber);
+		if (!OidIsValid(reloid) ||
+			(rel = try_relation_open(reloid, AccessShareLock)) == NULL)
 		{
-			relation_close(rel, AccessShareLock);
-			rel = NULL;
+			/* We failed to open the relation, so there is nothing to close. */
 			CommitTransactionCommand();
-		}
-
-		/*
-		 * Try to open each new relation, but only once, when we first
-		 * encounter it. If it's been dropped, skip the associated blocks.
-		 */
-		if (old_blk == NULL || old_blk->filenumber != blk->filenumber)
-		{
-			Oid			reloid;
 
-			Assert(rel == NULL);
-			StartTransactionCommand();
-			reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber);
-			if (OidIsValid(reloid))
-				rel = try_relation_open(reloid, AccessShareLock);
+			/*
+			 * Fast-forward to the next relation. We want to skip all of the
+			 * other records referencing this relation since we know we can't
+			 * open it. That way, we avoid repeatedly trying and failing to
+			 * open the same relation.
+			 */
+			for (; i < apw_state->prewarm_stop_idx; i++)
+			{
+				blk = &block_info[i];
+				if ((blk->database != database && blk->database != 0) ||
+					blk->filenumber != filenumber)
+					break;
+			}
 
-			if (!rel)
-				CommitTransactionCommand();
-		}
-		if (!rel)
-		{
-			old_blk = blk;
+			/* Time to try and open our new found relation */
 			continue;
 		}
 
-		/* Once per fork, check for fork existence and size. */
-		if (old_blk == NULL ||
-			old_blk->filenumber != blk->filenumber ||
-			old_blk->forknum != blk->forknum)
+		/*
+		 * We have a relation; now let's loop until we find a valid fork of
+		 * the relation or we run out of free buffers. Once we've read from
+		 * all valid forks or run out of options, we'll close the relation and
+		 * move on.
+		 */
+		while (i < apw_state->prewarm_stop_idx &&
+			   (blk->database == database || blk->database == 0) &&
+			   blk->filenumber == filenumber &&
+			   have_free_buffer())
 		{
+			ForkNumber	forknum = blk->forknum;
+			BlockNumber nblocks;
+			Buffer		buf;
+
 			/*
 			 * smgrexists is not safe for illegal forknum, hence check whether
 			 * the passed forknum is valid before using it in smgrexists.
 			 */
-			if (blk->forknum > InvalidForkNumber &&
-				blk->forknum <= MAX_FORKNUM &&
-				smgrexists(RelationGetSmgr(rel), blk->forknum))
-				nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
-			else
-				nblocks = 0;
-		}
+			if (blk->forknum <= InvalidForkNumber ||
+				blk->forknum > MAX_FORKNUM ||
+				!smgrexists(RelationGetSmgr(rel), blk->forknum))
+			{
+				/*
+				 * Fast-forward to the next fork. We want to skip all of the
+				 * other records referencing this fork since we already know
+				 * it's not valid.
+				 */
+				for (; i < apw_state->prewarm_stop_idx; i++)
+				{
+					blk = &block_info[i];
+					if ((blk->database != database && blk->database != 0) ||
+						blk->filenumber != filenumber ||
+						blk->forknum != forknum)
+						break;
+				}
 
-		/* Check whether blocknum is valid and within fork file size. */
-		if (blk->blocknum >= nblocks)
-		{
-			/* Move to next forknum. */
-			old_blk = blk;
-			continue;
-		}
+				continue;
+			}
 
-		/* Prewarm buffer. */
-		buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL,
-								 NULL);
-		if (BufferIsValid(buf))
-		{
-			apw_state->prewarmed_blocks++;
-			ReleaseBuffer(buf);
-		}
+			nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
 
-		old_blk = blk;
-	}
+			/* Check whether blocknum is valid and within fork file size. */
+			if (blk->blocknum >= nblocks)
+			{
+				blk = &block_info[++i];
+				continue;
+			}
 
-	dsm_detach(seg);
+			/* Prewarm buffer. */
+			buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL,
+									NULL);
+
+			if (BufferIsValid(buf))
+			{
+				apw_state->prewarmed_blocks++;
+				ReleaseBuffer(buf);
+			}
+
+			blk = &block_info[++i];
+		}
 
-	/* Release lock on previous relation. */
-	if (rel)
-	{
 		relation_close(rel, AccessShareLock);
 		CommitTransactionCommand();
 	}
+
+	dsm_detach(seg);
 }
 
 /*
-- 
2.34.1

From ff77f60d670f1c25699e969a48f7bbd5a577a405 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Tue, 1 Apr 2025 18:07:38 -0400
Subject: [PATCH 2/2] streaming read autoprewarm

TODO: write a commit message

Co-authored-by: Nazir Bilal Yavuz <byavu...@gmail.com>
Co-authored-by: Melanie Plageman <melanieplage...@gmail.com>
Discussion: https://postgr.es/m/flat/CAN55FZ3n8Gd%2BhajbL%3D5UkGzu_aHGRqnn%2BxktXq2fuds%3D1AOR6Q%40mail.gmail.com
---
 contrib/pg_prewarm/autoprewarm.c | 95 +++++++++++++++++++++++++++-----
 src/tools/pgindent/typedefs.list |  1 +
 2 files changed, 82 insertions(+), 14 deletions(-)

diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c
index 84072587ea0..667aa895b18 100644
--- a/contrib/pg_prewarm/autoprewarm.c
+++ b/contrib/pg_prewarm/autoprewarm.c
@@ -41,6 +41,7 @@
 #include "storage/latch.h"
 #include "storage/lwlock.h"
 #include "storage/procsignal.h"
+#include "storage/read_stream.h"
 #include "storage/smgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/guc.h"
@@ -75,6 +76,21 @@ typedef struct AutoPrewarmSharedState
 	int			prewarmed_blocks;
 } AutoPrewarmSharedState;
 
+/*
+ * Private data passed through the read stream API for our use in the
+ * callaback.
+ */
+typedef struct AutoPrewarmReadStreamData
+{
+	BlockInfoRecord *block_info;
+	int			pos;
+	Oid			database;
+	RelFileNumber filenumber;
+	ForkNumber	forknum;
+	BlockNumber nblocks;
+} AutoPrewarmReadStreamData;
+
+
 PGDLLEXPORT void autoprewarm_main(Datum main_arg);
 PGDLLEXPORT void autoprewarm_database_main(Datum main_arg);
 
@@ -422,6 +438,45 @@ apw_load_buffers(void)
 						apw_state->prewarmed_blocks, num_elements)));
 }
 
+/*
+ * Return the next block number of a specific relation and fork to read
+ * according to the array of BlockInfoRecord.
+ */
+static BlockNumber
+apw_read_stream_next_block(ReadStream *stream,
+						   void *callback_private_data,
+						   void *per_buffer_data)
+{
+	AutoPrewarmReadStreamData *p = callback_private_data;
+	BlockInfoRecord blk;
+
+	CHECK_FOR_INTERRUPTS();
+
+	if (!have_free_buffer())
+		p->pos = apw_state->prewarm_stop_idx;
+
+	if (p->pos >= apw_state->prewarm_stop_idx)
+		return InvalidBlockNumber;
+
+	blk = p->block_info[p->pos];
+
+	if (blk.database != p->database && blk.database != 0)
+		return InvalidBlockNumber;
+
+	if (blk.filenumber != p->filenumber)
+		return InvalidBlockNumber;
+
+	if (blk.forknum != p->forknum)
+		return InvalidBlockNumber;
+
+	/* Check whether blocknum is valid and within fork file size. */
+	if (blk.blocknum >= p->nblocks)
+		return InvalidBlockNumber;
+
+	p->pos++;
+	return blk.blocknum;
+}
+
 /*
  * Prewarm all blocks for one database (and possibly also global objects, if
  * those got grouped with this database).
@@ -467,8 +522,6 @@ autoprewarm_database_main(Datum main_arg)
 		Oid			reloid;
 		Relation	rel;
 
-		CHECK_FOR_INTERRUPTS();
-
 		StartTransactionCommand();
 
 		reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber);
@@ -509,6 +562,8 @@ autoprewarm_database_main(Datum main_arg)
 		{
 			ForkNumber	forknum = blk->forknum;
 			BlockNumber nblocks;
+			struct AutoPrewarmReadStreamData p;
+			ReadStream *stream;
 			Buffer		buf;
 
 			/*
@@ -538,24 +593,36 @@ autoprewarm_database_main(Datum main_arg)
 
 			nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
 
-			/* Check whether blocknum is valid and within fork file size. */
-			if (blk->blocknum >= nblocks)
+			p = (struct AutoPrewarmReadStreamData)
 			{
-				blk = &block_info[++i];
-				continue;
-			}
-
-			/* Prewarm buffer. */
-			buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL,
-									NULL);
-
-			if (BufferIsValid(buf))
+				.block_info = block_info,
+					.pos = i,
+					.database = database,
+					.filenumber = filenumber,
+					.forknum = forknum,
+					.nblocks = nblocks,
+			};
+
+			stream = read_stream_begin_relation(READ_STREAM_FULL,
+												NULL,
+												rel,
+												p.forknum,
+												apw_read_stream_next_block,
+												&p,
+												0);
+
+			/* Receive our prewarmed buffers */
+			while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
 			{
 				apw_state->prewarmed_blocks++;
 				ReleaseBuffer(buf);
 			}
 
-			blk = &block_info[++i];
+			Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+			read_stream_end(stream);
+
+			i = p.pos;
+			blk = &block_info[i];
 		}
 
 		relation_close(rel, AccessShareLock);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 449bafc123c..a36e372526c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -175,6 +175,7 @@ AttributeOpts
 AuthRequest
 AuthToken
 AutoPrewarmSharedState
+AutoPrewarmReadStreamData
 AutoVacOpts
 AutoVacuumShmemStruct
 AutoVacuumWorkItem
-- 
2.34.1

Reply via email to