On Wed, Apr 2, 2025 at 6:26 AM Nazir Bilal Yavuz <byavu...@gmail.com> wrote:
>
> On Wed, 2 Apr 2025 at 01:36, Melanie Plageman <melanieplage...@gmail.com> 
> wrote:
> >
> > I know you mentioned off-list that you don't like the handling of
> > global objects in my version, but I prefer doing it this way (even
> > though we have to check for in the loop condition) to having to set
> > the current database once we reach non-shared objects. It feels too
> > fiddly. This way seems less error prone. Looking at this version, what
> > do you think? Could we do it better?
>
> I think there might be a problem with that approach. Let's say that we
> are able to open relation when database oid = 0 and filenumber = 18.
> Then we are trying to find a valid fork now. We couldn't find a valid
> fork immediately, so we continued looping. Then database oid is
> changed from 0 to let's say 1 but filenumber remains the same. We are
> still in the valid fork loop, so relation remains from the database
> oid = 0. Isn't that wrong?

Yep, you are totally right. The code was wrong. We could fix it by
setting current_db to the valid database once we've prewarmed the
global objects, but we need that logic in three places, so that seems
quite undesirable.

In attached v9, I've added a patch to apw_load_buffers() which invokes
autoprewarm_database_main() for the global objects alone but while
connected to the first valid database. It's not the best solution but
I think it is better than having that fiddly logic everywhere about
database 0.

This made me think, I wonder if we could connect to template0 or
template1 to prewarm the global objects. Then we could also prewarm if
only global objects are present (that doesn't seem very important but
it would be a side effect). It might be more clear to connect to
template0/1 instead of the first valid database to prewarm global
objects. I don't know if there is some reason not to do this -- like
maybe bg workers aren't allowed or something?

> 0001:
>
> ReadBufferExtended() can be called in its own minimal loop, otherwise
> we end up doing unnecessary checks for each ReadBufferExtended() call.
> This is not a problem when the 0002 is applied.

Could you provide a snippet of example code? If we call
ReadBufferExtended() in a loop, the block won't be changing, so I
don't see how that will help.

> 0002:
>
> We don't skip blocks whose blocknum is more than nblocks_in_fork. We
> can add that either to a stream callback like you did before or after
> the read_stream_end. I prefer stream callback because of the reason
> below [1].

Yep, I also thought we had to have that logic, but because we sort by
db,rel,fork,blkno, I think blocks with blocknumber >= nblocks_in_fork
will be last and so we just want to move on to the next fork.

> > On another topic, what are the minimal places we need to call
> > have_free_buffers() (in this version)? I haven't even started looking
> > at the last patch you've been sending that is about checking the
> > freelist. I'll have to do that next.
>
> I think its current places are good enough. We may add one after the
> read_stream_end if we want to handle blk->blocknum >= nblocks_in_fork
> after the read stream finishes. If we handle that in the stream
> callback then no need to add have_free_buffers() [1].

As long as we have it in the callback, I don't think that we need
have_free_buffers() after read_stream_end() since it is in the while
loop condition which we will immediately execute after
read_stream_end().

I was also wondering about the other patch in your earlier set that
set stop_idx from get_number_of_free_buffers(). Could you tell me more
about that? What does it do and why is it needed with the read stream
but wasn't needed before?

- Melanie
From b6e8454d632e22ed39db0b343e7be4e9b91900ea Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Wed, 2 Apr 2025 09:52:54 -0400
Subject: [PATCH v9 1/3] Autoprewarm global objects separately

Autoprewarm previously prewarmed global objects while prewarming blocks
from objects from the first valid database it encountered. This was
because you can't read in buffers without being connected to a database.

Prewarming global objects and objects from a single database in the
autoprewarm_database_main() function required a special case. Once we
convert autoprewarm to use a read stream, this special case will have to
be duplicated in multiple places.

Instead, modify apw_load_buffers() to prewarm the shared objects in one
invocation of autoprewarm_database_main() while connected to the first
valid database.

It is a bit fiddly but seems better than the alternative.
---
 contrib/pg_prewarm/autoprewarm.c | 51 ++++++++++++++------------------
 1 file changed, 22 insertions(+), 29 deletions(-)

diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c
index 73485a2323c..172654fee25 100644
--- a/contrib/pg_prewarm/autoprewarm.c
+++ b/contrib/pg_prewarm/autoprewarm.c
@@ -347,44 +347,41 @@ apw_load_buffers(void)
 	apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx = 0;
 	apw_state->prewarmed_blocks = 0;
 
-	/* Get the info position of the first block of the next database. */
+	/*
+	 * Loop through the records and launch a database worker to process
+	 * objects in each database. We'll stop at the boundary of each new
+	 * database and prewarm those blocks before moving to the next.
+	 */
 	while (apw_state->prewarm_start_idx < num_elements)
 	{
 		int			j = apw_state->prewarm_start_idx;
 		Oid			current_db = blkinfo[j].database;
 
 		/*
-		 * Advance the prewarm_stop_idx to the first BlockInfoRecord that does
-		 * not belong to this database.
+		 * Advance the position to the first BlockInfoRecord that does not
+		 * belong to the current database.
 		 */
-		j++;
-		while (j < num_elements)
+		for (; j < num_elements; j++)
 		{
-			if (current_db != blkinfo[j].database)
-			{
-				/*
-				 * Combine BlockInfoRecords for global objects with those of
-				 * the database.
-				 */
-				if (current_db != InvalidOid)
-					break;
-				current_db = blkinfo[j].database;
-			}
-
-			j++;
+			if (blkinfo[j].database != current_db)
+				break;
 		}
 
 		/*
-		 * If we reach this point with current_db == InvalidOid, then only
-		 * BlockInfoRecords belonging to global objects exist.  We can't
-		 * prewarm without a database connection, so just bail out.
+		 * We can't prewarm without a database connection, so if all of the
+		 * records belong to global objects, we have to bail out.
 		 */
-		if (current_db == InvalidOid)
+		if (current_db == InvalidOid && blkinfo[j].database == InvalidOid)
 			break;
 
+		/* Connect to the first valid db to prewarm global objects. */
+		if (current_db == InvalidOid)
+			current_db = blkinfo[j].database;
+
 		/* Configure stop point and database for next per-database worker. */
 		apw_state->prewarm_stop_idx = j;
 		apw_state->database = current_db;
+
 		Assert(apw_state->prewarm_start_idx < apw_state->prewarm_stop_idx);
 
 		/* If we've run out of free buffers, don't launch another worker. */
@@ -423,8 +420,8 @@ apw_load_buffers(void)
 }
 
 /*
- * Prewarm all blocks for one database (and possibly also global objects, if
- * those got grouped with this database).
+ * Prewarm all blocks for one database or global objects (while connected to a
+ * valid database).
  */
 void
 autoprewarm_database_main(Datum main_arg)
@@ -462,12 +459,8 @@ autoprewarm_database_main(Datum main_arg)
 
 		CHECK_FOR_INTERRUPTS();
 
-		/*
-		 * Quit if we've reached records for another database. If previous
-		 * blocks are of some global objects, then continue pre-warming.
-		 */
-		if (old_blk != NULL && old_blk->database != blk->database &&
-			old_blk->database != 0)
+		/* Quit if we've reached records for another database. */
+		if (old_blk != NULL && old_blk->database != blk->database)
 			break;
 
 		/*
-- 
2.34.1

From e21c0bf3f16dfdd531d22e7da7167ae9a864c6cd Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Tue, 1 Apr 2025 18:07:38 -0400
Subject: [PATCH v9 3/3] Use streaming read I/O in autoprewarm

Make a read stream for each valid fork of each valid relation
represented in the autoprewarm dump file and prewarm those blocks
through the read stream API instead of by directly invoking
ReadBuffer().

Co-authored-by: Nazir Bilal Yavuz <byavu...@gmail.com>
Co-authored-by: Melanie Plageman <melanieplage...@gmail.com>
Discussion: https://postgr.es/m/flat/CAN55FZ3n8Gd%2BhajbL%3D5UkGzu_aHGRqnn%2BxktXq2fuds%3D1AOR6Q%40mail.gmail.com
---
 contrib/pg_prewarm/autoprewarm.c | 105 +++++++++++++++++++++++++++----
 src/tools/pgindent/typedefs.list |   1 +
 2 files changed, 93 insertions(+), 13 deletions(-)

diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c
index fb132fbb533..e85931e9c1a 100644
--- a/contrib/pg_prewarm/autoprewarm.c
+++ b/contrib/pg_prewarm/autoprewarm.c
@@ -41,6 +41,7 @@
 #include "storage/latch.h"
 #include "storage/lwlock.h"
 #include "storage/procsignal.h"
+#include "storage/read_stream.h"
 #include "storage/smgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/guc.h"
@@ -75,6 +76,28 @@ typedef struct AutoPrewarmSharedState
 	int			prewarmed_blocks;
 } AutoPrewarmSharedState;
 
+/*
+ * Private data passed through the read stream API for our use in the
+ * callback.
+ */
+typedef struct AutoPrewarmReadStreamData
+{
+	/* The array of records containing the blocks we should prewarm. */
+	BlockInfoRecord *block_info;
+
+	/*
+	 * `pos` is the read stream callback's index into block_info. Because the
+	 * read stream may read ahead, pos is likely to be ahead of the index in
+	 * the main loop in autoprewarm_database_main().
+	 */
+	int			pos;
+	Oid			database;
+	RelFileNumber filenumber;
+	ForkNumber	forknum;
+	BlockNumber nblocks;
+} AutoPrewarmReadStreamData;
+
+
 PGDLLEXPORT void autoprewarm_main(Datum main_arg);
 PGDLLEXPORT void autoprewarm_database_main(Datum main_arg);
 
@@ -419,6 +442,45 @@ apw_load_buffers(void)
 						apw_state->prewarmed_blocks, num_elements)));
 }
 
+/*
+ * Return the next block number of a specific relation and fork to read
+ * according to the array of BlockInfoRecord.
+ */
+static BlockNumber
+apw_read_stream_next_block(ReadStream *stream,
+						   void *callback_private_data,
+						   void *per_buffer_data)
+{
+	AutoPrewarmReadStreamData *p = callback_private_data;
+	BlockInfoRecord blk;
+
+	CHECK_FOR_INTERRUPTS();
+
+	if (!have_free_buffer())
+		p->pos = apw_state->prewarm_stop_idx;
+
+	if (p->pos >= apw_state->prewarm_stop_idx)
+		return InvalidBlockNumber;
+
+	blk = p->block_info[p->pos];
+
+	if (blk.database != p->database)
+		return InvalidBlockNumber;
+
+	if (blk.filenumber != p->filenumber)
+		return InvalidBlockNumber;
+
+	if (blk.forknum != p->forknum)
+		return InvalidBlockNumber;
+
+	/* Check whether blocknum is valid and within fork file size. */
+	if (blk.blocknum >= p->nblocks)
+		return InvalidBlockNumber;
+
+	p->pos++;
+	return blk.blocknum;
+}
+
 /*
  * Prewarm all blocks for one database or global objects (while connected to a
  * valid database).
@@ -467,8 +529,6 @@ autoprewarm_database_main(Datum main_arg)
 		Oid			reloid;
 		Relation	rel;
 
-		CHECK_FOR_INTERRUPTS();
-
 		StartTransactionCommand();
 
 		reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber);
@@ -509,6 +569,8 @@ autoprewarm_database_main(Datum main_arg)
 		{
 			ForkNumber	forknum = blk->forknum;
 			BlockNumber nblocks;
+			struct AutoPrewarmReadStreamData p;
+			ReadStream *stream;
 			Buffer		buf;
 
 			/*
@@ -538,24 +600,41 @@ autoprewarm_database_main(Datum main_arg)
 
 			nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
 
-			/* Check whether blocknum is valid and within fork file size. */
-			if (blk->blocknum >= nblocks)
+			p = (struct AutoPrewarmReadStreamData)
 			{
-				blk = &block_info[++i];
-				continue;
-			}
+				.block_info = block_info,
+					.pos = i,
+					.database = database,
+					.filenumber = filenumber,
+					.forknum = forknum,
+					.nblocks = nblocks,
+			};
+
+			stream = read_stream_begin_relation(READ_STREAM_FULL,
+												NULL,
+												rel,
+												p.forknum,
+												apw_read_stream_next_block,
+												&p,
+												0);
 
-			/* Prewarm buffer. */
-			buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL,
-									 NULL);
-
-			if (BufferIsValid(buf))
+			/*
+			 * Loop until we've prewarmed all the blocks from this fork. The
+			 * read stream callback will check that we still have free buffers
+			 * before requesting each block from the read stream API.
+			 */
+			while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
 			{
 				apw_state->prewarmed_blocks++;
 				ReleaseBuffer(buf);
 			}
 
-			blk = &block_info[++i];
+			Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+			read_stream_end(stream);
+
+			/* Advance i past all the blocks just prewarmed. */
+			i = p.pos;
+			blk = &block_info[i];
 		}
 
 		relation_close(rel, AccessShareLock);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 449bafc123c..a36e372526c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -175,6 +175,7 @@ AttributeOpts
 AuthRequest
 AuthToken
 AutoPrewarmSharedState
+AutoPrewarmReadStreamData
 AutoVacOpts
 AutoVacuumShmemStruct
 AutoVacuumWorkItem
-- 
2.34.1

From 8c539abb2a3d9d23fb97e28bd722b85453cf2897 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Mon, 31 Mar 2025 22:02:25 -0400
Subject: [PATCH v9 2/3] Refactor autoprewarm_database_main() in preparation
 for read stream

The read stream API requires the user to provide a callback that can be
invoked multiple times returning the next block to read.

Autoprewarm uses a sorted array of BlockInfORecords representing each block we
want to try and prewarm from all databases and tables.

autoprewarm_database_main() prewarms all the blocks from a single
database. It is optimized to ensure we don't try to open the same
relation or fork over and over again if it has been dropped or is
invalid. The main loop handles this by carefully setting various local
variables to sentinel values when a run of blocks should be skipped,
allowing it to still only advance one spot in the array on each
iteration.

This method no longer works when using the read stream API. The callback
must be able to advance the position in the array multiple times. Even
duplicating the entirety of the logic in autoprewarm_database_main() in
the read stream callback was not enough to make this continue to work.

Instead, change autoprewarm_database_main() to explicitly fast-forward
in the array past the blocks belonging to an invalid relation or
fork. This makes the structure of autoprewarm_database_main()

loop through each valid relation
   loop through each valid fork
      loop through each valid buffer

This commit only implements the new control flow -- it does not use the
read stream API.

Co-authored-by: Nazir Bilal Yavuz <byavu...@gmail.com>
Co-authored-by: Melanie Plageman <melanieplage...@gmail.com>
Discussion: https://postgr.es/m/flat/CAN55FZ3n8Gd%2BhajbL%3D5UkGzu_aHGRqnn%2BxktXq2fuds%3D1AOR6Q%40mail.gmail.com
---
 contrib/pg_prewarm/autoprewarm.c | 165 +++++++++++++++++--------------
 1 file changed, 92 insertions(+), 73 deletions(-)

diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c
index 172654fee25..fb132fbb533 100644
--- a/contrib/pg_prewarm/autoprewarm.c
+++ b/contrib/pg_prewarm/autoprewarm.c
@@ -426,12 +426,11 @@ apw_load_buffers(void)
 void
 autoprewarm_database_main(Datum main_arg)
 {
-	int			pos;
 	BlockInfoRecord *block_info;
-	Relation	rel = NULL;
-	BlockNumber nblocks = 0;
-	BlockInfoRecord *old_blk = NULL;
+	int			i;
+	BlockInfoRecord *blk;
 	dsm_segment *seg;
+	Oid			database;
 
 	/* Establish signal handlers; once that's done, unblock signals. */
 	pqsignal(SIGTERM, die);
@@ -446,104 +445,124 @@ autoprewarm_database_main(Datum main_arg)
 				 errmsg("could not map dynamic shared memory segment")));
 	BackgroundWorkerInitializeConnectionByOid(apw_state->database, InvalidOid, 0);
 	block_info = (BlockInfoRecord *) dsm_segment_address(seg);
-	pos = apw_state->prewarm_start_idx;
+
+	i = apw_state->prewarm_start_idx;
+	blk = &block_info[i];
+
+	/*
+	 * apw_state->database may differ from blk->database if we are prewarming
+	 * blocks from global objects.
+	 */
+	database = blk->database;
 
 	/*
 	 * Loop until we run out of blocks to prewarm or until we run out of free
-	 * buffers.
+	 * buffers. We'll quit if we've reached records for another database,
 	 */
-	while (pos < apw_state->prewarm_stop_idx && have_free_buffer())
+	while (i < apw_state->prewarm_stop_idx &&
+		   blk->database == database &&
+		   have_free_buffer())
 	{
-		BlockInfoRecord *blk = &block_info[pos++];
-		Buffer		buf;
+		RelFileNumber filenumber = blk->filenumber;
+		Oid			reloid;
+		Relation	rel;
 
 		CHECK_FOR_INTERRUPTS();
 
-		/* Quit if we've reached records for another database. */
-		if (old_blk != NULL && old_blk->database != blk->database)
-			break;
+		StartTransactionCommand();
 
-		/*
-		 * As soon as we encounter a block of a new relation, close the old
-		 * relation. Note that rel will be NULL if try_relation_open failed
-		 * previously; in that case, there is nothing to close.
-		 */
-		if (old_blk != NULL && old_blk->filenumber != blk->filenumber &&
-			rel != NULL)
+		reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber);
+		if (!OidIsValid(reloid) ||
+			(rel = try_relation_open(reloid, AccessShareLock)) == NULL)
 		{
-			relation_close(rel, AccessShareLock);
-			rel = NULL;
+			/* We failed to open the relation, so there is nothing to close. */
 			CommitTransactionCommand();
-		}
 
-		/*
-		 * Try to open each new relation, but only once, when we first
-		 * encounter it. If it's been dropped, skip the associated blocks.
-		 */
-		if (old_blk == NULL || old_blk->filenumber != blk->filenumber)
-		{
-			Oid			reloid;
-
-			Assert(rel == NULL);
-			StartTransactionCommand();
-			reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber);
-			if (OidIsValid(reloid))
-				rel = try_relation_open(reloid, AccessShareLock);
+			/*
+			 * Fast-forward to the next relation. We want to skip all of the
+			 * other records referencing this relation since we know we can't
+			 * open it. That way, we avoid repeatedly trying and failing to
+			 * open the same relation.
+			 */
+			for (; i < apw_state->prewarm_stop_idx; i++)
+			{
+				blk = &block_info[i];
+				if (blk->database != database ||
+					blk->filenumber != filenumber)
+					break;
+			}
 
-			if (!rel)
-				CommitTransactionCommand();
-		}
-		if (!rel)
-		{
-			old_blk = blk;
+			/* Time to try and open our new found relation */
 			continue;
 		}
 
-		/* Once per fork, check for fork existence and size. */
-		if (old_blk == NULL ||
-			old_blk->filenumber != blk->filenumber ||
-			old_blk->forknum != blk->forknum)
+		/*
+		 * We have a relation; now let's loop until we find a valid fork of
+		 * the relation or we run out of free buffers. Once we've read from
+		 * all valid forks or run out of options, we'll close the relation and
+		 * move on.
+		 */
+		while (i < apw_state->prewarm_stop_idx &&
+			   blk->database == database &&
+			   blk->filenumber == filenumber &&
+			   have_free_buffer())
 		{
+			ForkNumber	forknum = blk->forknum;
+			BlockNumber nblocks;
+			Buffer		buf;
+
 			/*
 			 * smgrexists is not safe for illegal forknum, hence check whether
 			 * the passed forknum is valid before using it in smgrexists.
 			 */
-			if (blk->forknum > InvalidForkNumber &&
-				blk->forknum <= MAX_FORKNUM &&
-				smgrexists(RelationGetSmgr(rel), blk->forknum))
-				nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
-			else
-				nblocks = 0;
-		}
+			if (blk->forknum <= InvalidForkNumber ||
+				blk->forknum > MAX_FORKNUM ||
+				!smgrexists(RelationGetSmgr(rel), blk->forknum))
+			{
+				/*
+				 * Fast-forward to the next fork. We want to skip all of the
+				 * other records referencing this fork since we already know
+				 * it's not valid.
+				 */
+				for (; i < apw_state->prewarm_stop_idx; i++)
+				{
+					blk = &block_info[i];
+					if (blk->database != database ||
+						blk->filenumber != filenumber ||
+						blk->forknum != forknum)
+						break;
+				}
 
-		/* Check whether blocknum is valid and within fork file size. */
-		if (blk->blocknum >= nblocks)
-		{
-			/* Move to next forknum. */
-			old_blk = blk;
-			continue;
-		}
+				continue;
+			}
 
-		/* Prewarm buffer. */
-		buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL,
-								 NULL);
-		if (BufferIsValid(buf))
-		{
-			apw_state->prewarmed_blocks++;
-			ReleaseBuffer(buf);
-		}
+			nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
 
-		old_blk = blk;
-	}
+			/* Check whether blocknum is valid and within fork file size. */
+			if (blk->blocknum >= nblocks)
+			{
+				blk = &block_info[++i];
+				continue;
+			}
 
-	dsm_detach(seg);
+			/* Prewarm buffer. */
+			buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL,
+									 NULL);
+
+			if (BufferIsValid(buf))
+			{
+				apw_state->prewarmed_blocks++;
+				ReleaseBuffer(buf);
+			}
+
+			blk = &block_info[++i];
+		}
 
-	/* Release lock on previous relation. */
-	if (rel)
-	{
 		relation_close(rel, AccessShareLock);
 		CommitTransactionCommand();
 	}
+
+	dsm_detach(seg);
 }
 
 /*
-- 
2.34.1

Reply via email to