On Mon, Mar 31, 2025 at 3:45 PM Melanie Plageman
<[email protected]> wrote:
>
> Whoops, this isn't right. It does work. I'm going to draft a version
> suggesting slightly different variable naming and a couple comments to
> make this more clear.
Okay, so after further study, I think there are multiple issues still
with the code. We could end up comparing a blocknumber to nblocks
calculated from a different fork. To address this, you'll need to keep
track of the last fork_number. At that point, you kind of have to
bring back old_blk -- because that is what we are recreating with
multiple local variables.
But, I think, overall, what we actually want to do is actually be
really explicit about fast-forwarding in the failure cases (when we
want to skip ahead because a relation is invalid or a fork is
invalid). We were trying to use the main loop control and just add
special cases to allow us to do this fast-forwarding. But, I think
instead, we want to just go to a function or loop somewhere and fast
forward through those bad blocks until we get to the next run of
blocks from a different relation or fork.
I've sketched out an idea like this in the attached. I don't think it
is 100% correct. It does pass tests, but I think we might incorrectly
advance pos twice after skipping a run of blocks belonging to a bad
fork or relation -- and thus skip the first good block after some bad
blocks.
It also needs some more refactoring.
maybe instead of having the skip code like this
skip_forknumber:;
while ((blk = next_record(block_info, &i)) != NULL &&
blk->database == database && blk->filenumber == filenumber &&
blk->forknum == forknum);
we make it a function? to avoid the back-to-back while loop conditions
(because of the outer do while loop).
But the explicit looping for skipping the bad blocks and the nested
loops for rel and fork -- I think these are less error prone.
What do you think?
- Melanie
From 438f13072af060b30485b0dd871c1b26ee503513 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Mon, 31 Mar 2025 22:02:25 -0400
Subject: [PATCH] pgsr autoprewarm
---
contrib/pg_prewarm/autoprewarm.c | 218 +++++++++++++++++++------------
1 file changed, 131 insertions(+), 87 deletions(-)
diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c
index 73485a2323c..459d36a75a9 100644
--- a/contrib/pg_prewarm/autoprewarm.c
+++ b/contrib/pg_prewarm/autoprewarm.c
@@ -41,6 +41,7 @@
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/procsignal.h"
+#include "storage/read_stream.h"
#include "storage/smgr.h"
#include "tcop/tcopprot.h"
#include "utils/guc.h"
@@ -421,6 +422,63 @@ apw_load_buffers(void)
(errmsg("autoprewarm successfully prewarmed %d of %d previously-loaded blocks",
apw_state->prewarmed_blocks, num_elements)));
}
+struct apw_read_stream_private
+{
+ BlockInfoRecord *block_info;
+ int pos;
+ Oid database;
+ RelFileNumber filenumber;
+ ForkNumber forknum;
+ BlockNumber nblocks;
+};
+
+static BlockNumber
+awp_read_stream_next_block(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ struct apw_read_stream_private *p = callback_private_data;
+
+ for (int i; (i = p->pos++) < apw_state->prewarm_stop_idx;)
+ {
+ BlockInfoRecord cur_blk = p->block_info[i];
+
+ CHECK_FOR_INTERRUPTS();
+
+ if (!have_free_buffer())
+ {
+ p->pos = apw_state->prewarm_stop_idx;
+ return InvalidBlockNumber;
+ }
+
+ if (cur_blk.database != p->database)
+ return InvalidBlockNumber;
+
+ if (cur_blk.filenumber != p->filenumber)
+ return InvalidBlockNumber;
+
+ if (cur_blk.forknum != p->forknum)
+ return InvalidBlockNumber;
+
+ if (cur_blk.blocknum >= p->nblocks)
+ continue;
+
+ return cur_blk.blocknum;
+ }
+
+ return InvalidBlockNumber;
+}
+
+static inline BlockInfoRecord *next_record(BlockInfoRecord *block_info, int *pos)
+{
+ int oldpos;
+ if (*pos >= apw_state->prewarm_stop_idx)
+ return NULL;
+
+ oldpos = *pos;
+ (*pos)++;
+ return &block_info[oldpos];
+}
/*
* Prewarm all blocks for one database (and possibly also global objects, if
@@ -429,12 +487,11 @@ apw_load_buffers(void)
void
autoprewarm_database_main(Datum main_arg)
{
- int pos;
BlockInfoRecord *block_info;
- Relation rel = NULL;
- BlockNumber nblocks = 0;
- BlockInfoRecord *old_blk = NULL;
+ int i;
+ BlockInfoRecord *blk;
dsm_segment *seg;
+ Oid database;
/* Establish signal handlers; once that's done, unblock signals. */
pqsignal(SIGTERM, die);
@@ -449,108 +506,95 @@ autoprewarm_database_main(Datum main_arg)
errmsg("could not map dynamic shared memory segment")));
BackgroundWorkerInitializeConnectionByOid(apw_state->database, InvalidOid, 0);
block_info = (BlockInfoRecord *) dsm_segment_address(seg);
- pos = apw_state->prewarm_start_idx;
- /*
- * Loop until we run out of blocks to prewarm or until we run out of free
- * buffers.
- */
- while (pos < apw_state->prewarm_stop_idx && have_free_buffer())
- {
- BlockInfoRecord *blk = &block_info[pos++];
- Buffer buf;
+ i = apw_state->prewarm_start_idx;
+ blk = &block_info[i];
+ database = blk->database;
- CHECK_FOR_INTERRUPTS();
-
- /*
- * Quit if we've reached records for another database. If previous
- * blocks are of some global objects, then continue pre-warming.
- */
- if (old_blk != NULL && old_blk->database != blk->database &&
- old_blk->database != 0)
- break;
+ do
+ {
+ RelFileNumber filenumber = blk->filenumber;
+ Oid reloid;
+ Relation rel;
- /*
- * As soon as we encounter a block of a new relation, close the old
- * relation. Note that rel will be NULL if try_relation_open failed
- * previously; in that case, there is nothing to close.
- */
- if (old_blk != NULL && old_blk->filenumber != blk->filenumber &&
- rel != NULL)
- {
- relation_close(rel, AccessShareLock);
- rel = NULL;
- CommitTransactionCommand();
- }
+ StartTransactionCommand();
- /*
- * Try to open each new relation, but only once, when we first
- * encounter it. If it's been dropped, skip the associated blocks.
- */
- if (old_blk == NULL || old_blk->filenumber != blk->filenumber)
- {
- Oid reloid;
+ reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber);
+ if (!OidIsValid(reloid))
+ goto skip_relation;
- Assert(rel == NULL);
- StartTransactionCommand();
- reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber);
- if (OidIsValid(reloid))
- rel = try_relation_open(reloid, AccessShareLock);
+ if ((rel = try_relation_open(reloid, AccessShareLock)) == NULL)
+ goto skip_relation;
- if (!rel)
- CommitTransactionCommand();
- }
- if (!rel)
+ do
{
- old_blk = blk;
- continue;
- }
+ ForkNumber forknum = blk->forknum;
+ BlockNumber nblocks;
+ struct apw_read_stream_private p;
+ ReadStream *stream;
+ Buffer buf;
- /* Once per fork, check for fork existence and size. */
- if (old_blk == NULL ||
- old_blk->filenumber != blk->filenumber ||
- old_blk->forknum != blk->forknum)
- {
/*
* smgrexists is not safe for illegal forknum, hence check whether
* the passed forknum is valid before using it in smgrexists.
*/
- if (blk->forknum > InvalidForkNumber &&
- blk->forknum <= MAX_FORKNUM &&
- smgrexists(RelationGetSmgr(rel), blk->forknum))
- nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
- else
- nblocks = 0;
- }
+ if (blk->forknum <= InvalidForkNumber)
+ goto skip_forknumber;
- /* Check whether blocknum is valid and within fork file size. */
- if (blk->blocknum >= nblocks)
- {
- /* Move to next forknum. */
- old_blk = blk;
- continue;
- }
+ if (blk->forknum > MAX_FORKNUM)
+ goto skip_forknumber;
- /* Prewarm buffer. */
- buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL,
- NULL);
- if (BufferIsValid(buf))
- {
- apw_state->prewarmed_blocks++;
- ReleaseBuffer(buf);
- }
+ if (!smgrexists(RelationGetSmgr(rel), blk->forknum))
+ goto skip_forknumber;
- old_blk = blk;
- }
+ nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
- dsm_detach(seg);
+ p = (struct apw_read_stream_private)
+ {
+ .block_info = block_info,
+ .pos = i,
+ .database = database,
+ .filenumber = filenumber,
+ .forknum = forknum,
+ .nblocks = nblocks,
+ };
+
+ stream = read_stream_begin_relation(READ_STREAM_FULL,
+ NULL,
+ rel,
+ p.forknum,
+ awp_read_stream_next_block,
+ &p,
+ 0);
+
+ while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
+ {
+ apw_state->prewarmed_blocks++;
+ ReleaseBuffer(buf);
+ }
- /* Release lock on previous relation. */
- if (rel)
- {
+ i = p.pos;
+
+ read_stream_end(stream);
+
+ continue;
+
+ skip_forknumber:;
+ while ((blk = next_record(block_info, &i)) != NULL && blk->database == database && blk->filenumber == filenumber && blk->forknum == forknum);
+ } while ((blk = next_record(block_info, &i)) && blk->database == database && blk->filenumber == filenumber);
+
+ /* Release lock on previous relation. */
relation_close(rel, AccessShareLock);
CommitTransactionCommand();
- }
+
+ continue;
+
+ skip_relation:
+ CommitTransactionCommand();
+ while ((blk = next_record(block_info, &i)) != NULL && blk->database == database && blk->filenumber == filenumber);
+ } while ((blk = next_record(block_info, &i)) && blk->database == database);
+
+ dsm_detach(seg);
}
/*
--
2.34.1