On Wed, Apr 2, 2025 at 1:20 PM Nazir Bilal Yavuz <byavu...@gmail.com> wrote: > > On Wed, 2 Apr 2025 at 18:54, Melanie Plageman <melanieplage...@gmail.com> > wrote: > > > > On Wed, Apr 2, 2025 at 6:26 AM Nazir Bilal Yavuz <byavu...@gmail.com> wrote: > > > > I don't have an example code right now. But what I mean is we may call > ReadBufferExtended() in a loop for the blocks in the same fork. We > don't need to call smgrexists() and RelationGetNumberOfBlocksInFork() > for each block, we will call these for each fork not for each block. > However, like I said before, this is not important when the read > stream code is applied.
Ah, you are so right. That was totally messed up in the last version. I've fixed it in attached v10. I think having it correct in the 0002 patch makes it easier to understand how the read stream callback is replacing it. > > > We don't skip blocks whose blocknum is more than nblocks_in_fork. We > > > can add that either to a stream callback like you did before or after > > > the read_stream_end. I prefer stream callback because of the reason > > > below [1]. > > > > Yep, I also thought we had to have that logic, but because we sort by > > db,rel,fork,blkno, I think blocks with blocknumber >= nblocks_in_fork > > will be last and so we just want to move on to the next fork. > > I agree that they will be the last, but won't we end up creating a > read stream object for each block? Ah, yes, you are right. That would have been really broken. I think I fixed it. See attached. Now we'll only do that for the first block if it is invalid (which is probably okay IMO). > > I was also wondering about the other patch in your earlier set that > > set stop_idx from get_number_of_free_buffers(). Could you tell me more > > about that? What does it do and why is it needed with the read stream > > but wasn't needed before? > > In the read stream code, we use callbacks to create bigger I/Os. These > I/Os aren't processed until the io_combine_limit or we hit not > sequential blocknum. In other words, when the have_free_buffer() > function returns false in the callback; there are still queued blocks > in the stream, although there are no free buffers in the buffer pool. > We can end up creating I/Os bigger than free buffers in the shared > buffers. > > To solve that a bit, we try to get a number of free buffers in the > shared buffers. So, we try to minimize the problem above by using the > actual free buffer count. That optimization has problems like if other > processes fill shared buffers at the same time while the read stream > is running, then this optimization will not work well. Hmm. Yea, I do find it confusing that it will get so easily out of date. Let's circle back to this after getting the other patches to a good place (but before committing all of this). - Melanie
From 96f754d1c65d95cefaf3568d1f1ea0f8f0a0b69c Mon Sep 17 00:00:00 2001 From: Melanie Plageman <melanieplage...@gmail.com> Date: Mon, 31 Mar 2025 22:02:25 -0400 Subject: [PATCH v10 2/3] Refactor autoprewarm_database_main() in preparation for read stream Autoprewarm prewarms blocks from a dump file representing the contents of shared buffers last time it was dumped. It uses a sorted array of BlockInfoRecords, each representing a block from one of the cluster's databases and tables. autoprewarm_database_main() prewarms all the blocks from a single database. It is optimized to ensure we don't try to open the same relation or fork over and over again if it has been dropped or is invalid. The main loop handled this by carefully setting various local variables to sentinel values when a run of blocks should be skipped. This method won't work with the read stream API. A read stream can only be created for a single relation and fork combination. The callback has to be able to advance the position in the array to allow for reading ahead additional blocks, however the callback cannot try to open another relation or close the current relation. So, the main loop in autoprewarm_database_main() must also advance the position in the array of BlockInfoRecords. To make it compatible with the read stream API, change autoprewarm_database_main() to explicitly fast-forward in the array past the blocks belonging to an invalid relation or fork. This commit only implements the new control flow -- it does not use the read stream API. Co-authored-by: Nazir Bilal Yavuz <byavu...@gmail.com> Co-authored-by: Melanie Plageman <melanieplage...@gmail.com> Discussion: https://postgr.es/m/flat/CAN55FZ3n8Gd%2BhajbL%3D5UkGzu_aHGRqnn%2BxktXq2fuds%3D1AOR6Q%40mail.gmail.com --- contrib/pg_prewarm/autoprewarm.c | 174 ++++++++++++++++++------------- 1 file changed, 101 insertions(+), 73 deletions(-) diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c index 172654fee25..0722d7bf457 100644 --- a/contrib/pg_prewarm/autoprewarm.c +++ b/contrib/pg_prewarm/autoprewarm.c @@ -426,12 +426,11 @@ apw_load_buffers(void) void autoprewarm_database_main(Datum main_arg) { - int pos; BlockInfoRecord *block_info; - Relation rel = NULL; - BlockNumber nblocks = 0; - BlockInfoRecord *old_blk = NULL; + int i; + BlockInfoRecord *blk; dsm_segment *seg; + Oid database; /* Establish signal handlers; once that's done, unblock signals. */ pqsignal(SIGTERM, die); @@ -446,104 +445,133 @@ autoprewarm_database_main(Datum main_arg) errmsg("could not map dynamic shared memory segment"))); BackgroundWorkerInitializeConnectionByOid(apw_state->database, InvalidOid, 0); block_info = (BlockInfoRecord *) dsm_segment_address(seg); - pos = apw_state->prewarm_start_idx; + + i = apw_state->prewarm_start_idx; + blk = &block_info[i]; + + /* + * apw_state->database may differ from blk->database if we are prewarming + * blocks from global objects. + */ + database = blk->database; /* * Loop until we run out of blocks to prewarm or until we run out of free - * buffers. + * buffers. We'll quit if we've reached records for another database, */ - while (pos < apw_state->prewarm_stop_idx && have_free_buffer()) + while (i < apw_state->prewarm_stop_idx && + blk->database == database && + have_free_buffer()) { - BlockInfoRecord *blk = &block_info[pos++]; - Buffer buf; + RelFileNumber filenumber = blk->filenumber; + Oid reloid; + Relation rel; CHECK_FOR_INTERRUPTS(); - /* Quit if we've reached records for another database. */ - if (old_blk != NULL && old_blk->database != blk->database) - break; + StartTransactionCommand(); - /* - * As soon as we encounter a block of a new relation, close the old - * relation. Note that rel will be NULL if try_relation_open failed - * previously; in that case, there is nothing to close. - */ - if (old_blk != NULL && old_blk->filenumber != blk->filenumber && - rel != NULL) + reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber); + if (!OidIsValid(reloid) || + (rel = try_relation_open(reloid, AccessShareLock)) == NULL) { - relation_close(rel, AccessShareLock); - rel = NULL; + /* We failed to open the relation, so there is nothing to close. */ CommitTransactionCommand(); - } - /* - * Try to open each new relation, but only once, when we first - * encounter it. If it's been dropped, skip the associated blocks. - */ - if (old_blk == NULL || old_blk->filenumber != blk->filenumber) - { - Oid reloid; - - Assert(rel == NULL); - StartTransactionCommand(); - reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber); - if (OidIsValid(reloid)) - rel = try_relation_open(reloid, AccessShareLock); + /* + * Fast-forward to the next relation. We want to skip all of the + * other records referencing this relation since we know we can't + * open it. That way, we avoid repeatedly trying and failing to + * open the same relation. + */ + for (; i < apw_state->prewarm_stop_idx; i++) + { + blk = &block_info[i]; + if (blk->database != database || + blk->filenumber != filenumber) + break; + } - if (!rel) - CommitTransactionCommand(); - } - if (!rel) - { - old_blk = blk; + /* Time to try and open our new found relation */ continue; } - /* Once per fork, check for fork existence and size. */ - if (old_blk == NULL || - old_blk->filenumber != blk->filenumber || - old_blk->forknum != blk->forknum) + /* + * We have a relation; now let's loop until we find a valid fork of + * the relation or we run out of free buffers. Once we've read from + * all valid forks or run out of options, we'll close the relation and + * move on. + */ + while (i < apw_state->prewarm_stop_idx && + blk->database == database && + blk->filenumber == filenumber && + have_free_buffer()) { + ForkNumber forknum = blk->forknum; + BlockNumber nblocks; + Buffer buf; + /* * smgrexists is not safe for illegal forknum, hence check whether * the passed forknum is valid before using it in smgrexists. */ - if (blk->forknum > InvalidForkNumber && - blk->forknum <= MAX_FORKNUM && - smgrexists(RelationGetSmgr(rel), blk->forknum)) - nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum); - else - nblocks = 0; - } + if (blk->forknum <= InvalidForkNumber || + blk->forknum > MAX_FORKNUM || + !smgrexists(RelationGetSmgr(rel), blk->forknum)) + { + /* + * Fast-forward to the next fork. We want to skip all of the + * other records referencing this fork since we already know + * it's not valid. + */ + for (; i < apw_state->prewarm_stop_idx; i++) + { + blk = &block_info[i]; + if (blk->database != database || + blk->filenumber != filenumber || + blk->forknum != forknum) + break; + } + + /* Time to check if this newfound fork is valid */ + continue; + } - /* Check whether blocknum is valid and within fork file size. */ - if (blk->blocknum >= nblocks) - { - /* Move to next forknum. */ - old_blk = blk; - continue; - } + nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum); - /* Prewarm buffer. */ - buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL, - NULL); - if (BufferIsValid(buf)) - { - apw_state->prewarmed_blocks++; - ReleaseBuffer(buf); - } + /* Check whether blocknum is valid and within fork file size. */ + if (blk->blocknum >= nblocks) + { + blk = &block_info[++i]; + continue; + } - old_blk = blk; - } + /* Prewarm buffers. */ + while (i < apw_state->prewarm_stop_idx && + blk->database == database && + blk->filenumber == filenumber && + blk->forknum == forknum && + have_free_buffer()) + { + CHECK_FOR_INTERRUPTS(); - dsm_detach(seg); + buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL, + NULL); + + blk = &block_info[++i]; + if (!BufferIsValid(buf)) + break; + + apw_state->prewarmed_blocks++; + ReleaseBuffer(buf); + } + } - /* Release lock on previous relation. */ - if (rel) - { relation_close(rel, AccessShareLock); CommitTransactionCommand(); } + + dsm_detach(seg); } /* -- 2.34.1
From a945fd408a6c3e073394df5332790a6a0fcf5efe Mon Sep 17 00:00:00 2001 From: Melanie Plageman <melanieplage...@gmail.com> Date: Wed, 2 Apr 2025 09:52:54 -0400 Subject: [PATCH v10 1/3] Autoprewarm global objects separately Autoprewarm previously prewarmed global objects while prewarming blocks from objects from the first valid database it encountered. This was because you can't read in buffers without being connected to a database. Prewarming global objects and objects from a single database in the autoprewarm_database_main() function required a special case. Once we convert autoprewarm to use a read stream, this special case will have to be duplicated in multiple places. Instead, modify apw_load_buffers() to prewarm the shared objects in one invocation of autoprewarm_database_main() while connected to the first valid database. It is a bit fiddly but seems better than the alternative. Reviewed-by: Nazir Bilal Yavuz <byavu...@gmail.com> Discussion: https://postgr.es/m/CAN55FZ0TBBmrJ2vtMQ9rEk-NTL2BWQzavVp%3DiRLOUskm%2BzvNNw%40mail.gmail.com --- contrib/pg_prewarm/autoprewarm.c | 51 ++++++++++++++------------------ 1 file changed, 22 insertions(+), 29 deletions(-) diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c index 73485a2323c..172654fee25 100644 --- a/contrib/pg_prewarm/autoprewarm.c +++ b/contrib/pg_prewarm/autoprewarm.c @@ -347,44 +347,41 @@ apw_load_buffers(void) apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx = 0; apw_state->prewarmed_blocks = 0; - /* Get the info position of the first block of the next database. */ + /* + * Loop through the records and launch a database worker to process + * objects in each database. We'll stop at the boundary of each new + * database and prewarm those blocks before moving to the next. + */ while (apw_state->prewarm_start_idx < num_elements) { int j = apw_state->prewarm_start_idx; Oid current_db = blkinfo[j].database; /* - * Advance the prewarm_stop_idx to the first BlockInfoRecord that does - * not belong to this database. + * Advance the position to the first BlockInfoRecord that does not + * belong to the current database. */ - j++; - while (j < num_elements) + for (; j < num_elements; j++) { - if (current_db != blkinfo[j].database) - { - /* - * Combine BlockInfoRecords for global objects with those of - * the database. - */ - if (current_db != InvalidOid) - break; - current_db = blkinfo[j].database; - } - - j++; + if (blkinfo[j].database != current_db) + break; } /* - * If we reach this point with current_db == InvalidOid, then only - * BlockInfoRecords belonging to global objects exist. We can't - * prewarm without a database connection, so just bail out. + * We can't prewarm without a database connection, so if all of the + * records belong to global objects, we have to bail out. */ - if (current_db == InvalidOid) + if (current_db == InvalidOid && blkinfo[j].database == InvalidOid) break; + /* Connect to the first valid db to prewarm global objects. */ + if (current_db == InvalidOid) + current_db = blkinfo[j].database; + /* Configure stop point and database for next per-database worker. */ apw_state->prewarm_stop_idx = j; apw_state->database = current_db; + Assert(apw_state->prewarm_start_idx < apw_state->prewarm_stop_idx); /* If we've run out of free buffers, don't launch another worker. */ @@ -423,8 +420,8 @@ apw_load_buffers(void) } /* - * Prewarm all blocks for one database (and possibly also global objects, if - * those got grouped with this database). + * Prewarm all blocks for one database or global objects (while connected to a + * valid database). */ void autoprewarm_database_main(Datum main_arg) @@ -462,12 +459,8 @@ autoprewarm_database_main(Datum main_arg) CHECK_FOR_INTERRUPTS(); - /* - * Quit if we've reached records for another database. If previous - * blocks are of some global objects, then continue pre-warming. - */ - if (old_blk != NULL && old_blk->database != blk->database && - old_blk->database != 0) + /* Quit if we've reached records for another database. */ + if (old_blk != NULL && old_blk->database != blk->database) break; /* -- 2.34.1
From 3221de7b11f0b1df281b5c9076a639f0731d83bf Mon Sep 17 00:00:00 2001 From: Melanie Plageman <melanieplage...@gmail.com> Date: Tue, 1 Apr 2025 18:07:38 -0400 Subject: [PATCH v10 3/3] Use streaming read I/O in autoprewarm Make a read stream for each valid fork of each valid relation represented in the autoprewarm dump file and prewarm those blocks through the read stream API instead of by directly invoking ReadBuffer(). Co-authored-by: Nazir Bilal Yavuz <byavu...@gmail.com> Co-authored-by: Melanie Plageman <melanieplage...@gmail.com> Discussion: https://postgr.es/m/flat/CAN55FZ3n8Gd%2BhajbL%3D5UkGzu_aHGRqnn%2BxktXq2fuds%3D1AOR6Q%40mail.gmail.com --- contrib/pg_prewarm/autoprewarm.c | 125 +++++++++++++++++++++++++------ src/tools/pgindent/typedefs.list | 1 + 2 files changed, 104 insertions(+), 22 deletions(-) diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c index 0722d7bf457..5d21314120f 100644 --- a/contrib/pg_prewarm/autoprewarm.c +++ b/contrib/pg_prewarm/autoprewarm.c @@ -41,6 +41,7 @@ #include "storage/latch.h" #include "storage/lwlock.h" #include "storage/procsignal.h" +#include "storage/read_stream.h" #include "storage/smgr.h" #include "tcop/tcopprot.h" #include "utils/guc.h" @@ -75,6 +76,28 @@ typedef struct AutoPrewarmSharedState int prewarmed_blocks; } AutoPrewarmSharedState; +/* + * Private data passed through the read stream API for our use in the + * callback. + */ +typedef struct AutoPrewarmReadStreamData +{ + /* The array of records containing the blocks we should prewarm. */ + BlockInfoRecord *block_info; + + /* + * `pos` is the read stream callback's index into block_info. Because the + * read stream may read ahead, pos is likely to be ahead of the index in + * the main loop in autoprewarm_database_main(). + */ + int pos; + Oid database; + RelFileNumber filenumber; + ForkNumber forknum; + BlockNumber nblocks; +} AutoPrewarmReadStreamData; + + PGDLLEXPORT void autoprewarm_main(Datum main_arg); PGDLLEXPORT void autoprewarm_database_main(Datum main_arg); @@ -419,6 +442,55 @@ apw_load_buffers(void) apw_state->prewarmed_blocks, num_elements))); } +/* + * Return the next block number of a specific relation and fork to read + * according to the array of BlockInfoRecord. + */ +static BlockNumber +apw_read_stream_next_block(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + AutoPrewarmReadStreamData *p = callback_private_data; + + while (p->pos < apw_state->prewarm_stop_idx) + { + BlockInfoRecord blk = p->block_info[p->pos]; + + CHECK_FOR_INTERRUPTS(); + + if (!have_free_buffer()) + p->pos = apw_state->prewarm_stop_idx; + + if (p->pos >= apw_state->prewarm_stop_idx) + return InvalidBlockNumber; + + if (blk.database != p->database) + return InvalidBlockNumber; + + if (blk.filenumber != p->filenumber) + return InvalidBlockNumber; + + if (blk.forknum != p->forknum) + return InvalidBlockNumber; + + p->pos++; + + /* + * Check whether blocknum is valid and within fork file size. + * Fast-forward through any invalid blocks. We want `p->pos` to + * reflect the location of the next relation or fork before ending the + * stream. + */ + if (blk.blocknum >= p->nblocks) + continue; + + return blk.blocknum; + } + + return InvalidBlockNumber; +} + /* * Prewarm all blocks for one database or global objects (while connected to a * valid database). @@ -467,8 +539,6 @@ autoprewarm_database_main(Datum main_arg) Oid reloid; Relation rel; - CHECK_FOR_INTERRUPTS(); - StartTransactionCommand(); reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber); @@ -509,6 +579,8 @@ autoprewarm_database_main(Datum main_arg) { ForkNumber forknum = blk->forknum; BlockNumber nblocks; + struct AutoPrewarmReadStreamData p; + ReadStream *stream; Buffer buf; /* @@ -539,32 +611,41 @@ autoprewarm_database_main(Datum main_arg) nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum); - /* Check whether blocknum is valid and within fork file size. */ - if (blk->blocknum >= nblocks) + p = (struct AutoPrewarmReadStreamData) { - blk = &block_info[++i]; - continue; - } + .block_info = block_info, + .pos = i, + .database = database, + .filenumber = filenumber, + .forknum = forknum, + .nblocks = nblocks, + }; + + stream = read_stream_begin_relation(READ_STREAM_FULL, + NULL, + rel, + p.forknum, + apw_read_stream_next_block, + &p, + 0); - /* Prewarm buffers. */ - while (i < apw_state->prewarm_stop_idx && - blk->database == database && - blk->filenumber == filenumber && - blk->forknum == forknum && - have_free_buffer()) + /* + * Loop until we've prewarmed all the blocks from this fork. The + * read stream callback will check that we still have free buffers + * before requesting each block from the read stream API. + */ + while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) { - CHECK_FOR_INTERRUPTS(); - - buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL, - NULL); - - blk = &block_info[++i]; - if (!BufferIsValid(buf)) - break; - apw_state->prewarmed_blocks++; ReleaseBuffer(buf); } + + Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer); + read_stream_end(stream); + + /* Advance i past all the blocks just prewarmed. */ + i = p.pos; + blk = &block_info[i]; } relation_close(rel, AccessShareLock); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 8f28d8ff28e..5ac290fae78 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -175,6 +175,7 @@ AttributeOpts AuthRequest AuthToken AutoPrewarmSharedState +AutoPrewarmReadStreamData AutoVacOpts AutoVacuumShmemStruct AutoVacuumWorkItem -- 2.34.1