On Mon, Mar 31, 2025 at 3:45 PM Melanie Plageman <melanieplage...@gmail.com> wrote: > > Whoops, this isn't right. It does work. I'm going to draft a version > suggesting slightly different variable naming and a couple comments to > make this more clear.
Okay, so after further study, I think there are multiple issues still with the code. We could end up comparing a blocknumber to nblocks calculated from a different fork. To address this, you'll need to keep track of the last fork_number. At that point, you kind of have to bring back old_blk -- because that is what we are recreating with multiple local variables. But, I think, overall, what we actually want to do is actually be really explicit about fast-forwarding in the failure cases (when we want to skip ahead because a relation is invalid or a fork is invalid). We were trying to use the main loop control and just add special cases to allow us to do this fast-forwarding. But, I think instead, we want to just go to a function or loop somewhere and fast forward through those bad blocks until we get to the next run of blocks from a different relation or fork. I've sketched out an idea like this in the attached. I don't think it is 100% correct. It does pass tests, but I think we might incorrectly advance pos twice after skipping a run of blocks belonging to a bad fork or relation -- and thus skip the first good block after some bad blocks. It also needs some more refactoring. maybe instead of having the skip code like this skip_forknumber:; while ((blk = next_record(block_info, &i)) != NULL && blk->database == database && blk->filenumber == filenumber && blk->forknum == forknum); we make it a function? to avoid the back-to-back while loop conditions (because of the outer do while loop). But the explicit looping for skipping the bad blocks and the nested loops for rel and fork -- I think these are less error prone. What do you think? - Melanie
From 438f13072af060b30485b0dd871c1b26ee503513 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <melanieplage...@gmail.com> Date: Mon, 31 Mar 2025 22:02:25 -0400 Subject: [PATCH] pgsr autoprewarm --- contrib/pg_prewarm/autoprewarm.c | 218 +++++++++++++++++++------------ 1 file changed, 131 insertions(+), 87 deletions(-) diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c index 73485a2323c..459d36a75a9 100644 --- a/contrib/pg_prewarm/autoprewarm.c +++ b/contrib/pg_prewarm/autoprewarm.c @@ -41,6 +41,7 @@ #include "storage/latch.h" #include "storage/lwlock.h" #include "storage/procsignal.h" +#include "storage/read_stream.h" #include "storage/smgr.h" #include "tcop/tcopprot.h" #include "utils/guc.h" @@ -421,6 +422,63 @@ apw_load_buffers(void) (errmsg("autoprewarm successfully prewarmed %d of %d previously-loaded blocks", apw_state->prewarmed_blocks, num_elements))); } +struct apw_read_stream_private +{ + BlockInfoRecord *block_info; + int pos; + Oid database; + RelFileNumber filenumber; + ForkNumber forknum; + BlockNumber nblocks; +}; + +static BlockNumber +awp_read_stream_next_block(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + struct apw_read_stream_private *p = callback_private_data; + + for (int i; (i = p->pos++) < apw_state->prewarm_stop_idx;) + { + BlockInfoRecord cur_blk = p->block_info[i]; + + CHECK_FOR_INTERRUPTS(); + + if (!have_free_buffer()) + { + p->pos = apw_state->prewarm_stop_idx; + return InvalidBlockNumber; + } + + if (cur_blk.database != p->database) + return InvalidBlockNumber; + + if (cur_blk.filenumber != p->filenumber) + return InvalidBlockNumber; + + if (cur_blk.forknum != p->forknum) + return InvalidBlockNumber; + + if (cur_blk.blocknum >= p->nblocks) + continue; + + return cur_blk.blocknum; + } + + return InvalidBlockNumber; +} + +static inline BlockInfoRecord *next_record(BlockInfoRecord *block_info, int *pos) +{ + int oldpos; + if (*pos >= apw_state->prewarm_stop_idx) + return NULL; + + oldpos = *pos; + (*pos)++; + return &block_info[oldpos]; +} /* * Prewarm all blocks for one database (and possibly also global objects, if @@ -429,12 +487,11 @@ apw_load_buffers(void) void autoprewarm_database_main(Datum main_arg) { - int pos; BlockInfoRecord *block_info; - Relation rel = NULL; - BlockNumber nblocks = 0; - BlockInfoRecord *old_blk = NULL; + int i; + BlockInfoRecord *blk; dsm_segment *seg; + Oid database; /* Establish signal handlers; once that's done, unblock signals. */ pqsignal(SIGTERM, die); @@ -449,108 +506,95 @@ autoprewarm_database_main(Datum main_arg) errmsg("could not map dynamic shared memory segment"))); BackgroundWorkerInitializeConnectionByOid(apw_state->database, InvalidOid, 0); block_info = (BlockInfoRecord *) dsm_segment_address(seg); - pos = apw_state->prewarm_start_idx; - /* - * Loop until we run out of blocks to prewarm or until we run out of free - * buffers. - */ - while (pos < apw_state->prewarm_stop_idx && have_free_buffer()) - { - BlockInfoRecord *blk = &block_info[pos++]; - Buffer buf; + i = apw_state->prewarm_start_idx; + blk = &block_info[i]; + database = blk->database; - CHECK_FOR_INTERRUPTS(); - - /* - * Quit if we've reached records for another database. If previous - * blocks are of some global objects, then continue pre-warming. - */ - if (old_blk != NULL && old_blk->database != blk->database && - old_blk->database != 0) - break; + do + { + RelFileNumber filenumber = blk->filenumber; + Oid reloid; + Relation rel; - /* - * As soon as we encounter a block of a new relation, close the old - * relation. Note that rel will be NULL if try_relation_open failed - * previously; in that case, there is nothing to close. - */ - if (old_blk != NULL && old_blk->filenumber != blk->filenumber && - rel != NULL) - { - relation_close(rel, AccessShareLock); - rel = NULL; - CommitTransactionCommand(); - } + StartTransactionCommand(); - /* - * Try to open each new relation, but only once, when we first - * encounter it. If it's been dropped, skip the associated blocks. - */ - if (old_blk == NULL || old_blk->filenumber != blk->filenumber) - { - Oid reloid; + reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber); + if (!OidIsValid(reloid)) + goto skip_relation; - Assert(rel == NULL); - StartTransactionCommand(); - reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber); - if (OidIsValid(reloid)) - rel = try_relation_open(reloid, AccessShareLock); + if ((rel = try_relation_open(reloid, AccessShareLock)) == NULL) + goto skip_relation; - if (!rel) - CommitTransactionCommand(); - } - if (!rel) + do { - old_blk = blk; - continue; - } + ForkNumber forknum = blk->forknum; + BlockNumber nblocks; + struct apw_read_stream_private p; + ReadStream *stream; + Buffer buf; - /* Once per fork, check for fork existence and size. */ - if (old_blk == NULL || - old_blk->filenumber != blk->filenumber || - old_blk->forknum != blk->forknum) - { /* * smgrexists is not safe for illegal forknum, hence check whether * the passed forknum is valid before using it in smgrexists. */ - if (blk->forknum > InvalidForkNumber && - blk->forknum <= MAX_FORKNUM && - smgrexists(RelationGetSmgr(rel), blk->forknum)) - nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum); - else - nblocks = 0; - } + if (blk->forknum <= InvalidForkNumber) + goto skip_forknumber; - /* Check whether blocknum is valid and within fork file size. */ - if (blk->blocknum >= nblocks) - { - /* Move to next forknum. */ - old_blk = blk; - continue; - } + if (blk->forknum > MAX_FORKNUM) + goto skip_forknumber; - /* Prewarm buffer. */ - buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL, - NULL); - if (BufferIsValid(buf)) - { - apw_state->prewarmed_blocks++; - ReleaseBuffer(buf); - } + if (!smgrexists(RelationGetSmgr(rel), blk->forknum)) + goto skip_forknumber; - old_blk = blk; - } + nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum); - dsm_detach(seg); + p = (struct apw_read_stream_private) + { + .block_info = block_info, + .pos = i, + .database = database, + .filenumber = filenumber, + .forknum = forknum, + .nblocks = nblocks, + }; + + stream = read_stream_begin_relation(READ_STREAM_FULL, + NULL, + rel, + p.forknum, + awp_read_stream_next_block, + &p, + 0); + + while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) + { + apw_state->prewarmed_blocks++; + ReleaseBuffer(buf); + } - /* Release lock on previous relation. */ - if (rel) - { + i = p.pos; + + read_stream_end(stream); + + continue; + + skip_forknumber:; + while ((blk = next_record(block_info, &i)) != NULL && blk->database == database && blk->filenumber == filenumber && blk->forknum == forknum); + } while ((blk = next_record(block_info, &i)) && blk->database == database && blk->filenumber == filenumber); + + /* Release lock on previous relation. */ relation_close(rel, AccessShareLock); CommitTransactionCommand(); - } + + continue; + + skip_relation: + CommitTransactionCommand(); + while ((blk = next_record(block_info, &i)) != NULL && blk->database == database && blk->filenumber == filenumber); + } while ((blk = next_record(block_info, &i)) && blk->database == database); + + dsm_detach(seg); } /* -- 2.34.1