On 2017-03-13 18:45:00 +0530, Mithun Cy wrote:
> I have implemented a similar logic now. The prewarm bgworker will
> launch a sub-worker per database in the dump file. And, each
> sub-worker will load its database block info. The sub-workers will be
> launched only after previous one is finished. All of this will only
> start if the database has reached a consistent state.

Hm. For replay performance it'd possibly be good to start earlier,
before reaching consistency.  Is there an issue starting earlier?


> diff --git a/contrib/pg_prewarm/autoprewarm.c 
> b/contrib/pg_prewarm/autoprewarm.c
> new file mode 100644
> index 0000000..f4b34ca
> --- /dev/null
> +++ b/contrib/pg_prewarm/autoprewarm.c
> @@ -0,0 +1,1137 @@
> +/*-------------------------------------------------------------------------
> + *
> + * autoprewarm.c
> + *
> + * -- Automatically prewarm the shared buffer pool when server restarts.

Don't think we ususally use -- here.


> + *   Copyright (c) 2013-2017, PostgreSQL Global Development Group

Hm, that's a bit of a weird date range.


> + *   IDENTIFICATION
> + *           contrib/pg_prewarm.c/autoprewarm.c
> + *-------------------------------------------------------------------------
> + */

The pg_prewarm.c in there looks like some search & replace gone awry.



> +#include "postgres.h"
> +#include <unistd.h>
> +
> +/* These are always necessary for a bgworker. */
> +#include "miscadmin.h"
> +#include "postmaster/bgworker.h"
> +#include "storage/ipc.h"
> +#include "storage/latch.h"
> +#include "storage/lwlock.h"
> +#include "storage/proc.h"
> +#include "storage/shmem.h"
> +
> +/* These are necessary for prewarm utilities. */
> +#include "pgstat.h"
> +#include "storage/buf_internals.h"
> +#include "storage/smgr.h"
> +#include "utils/memutils.h"
> +#include "utils/resowner.h"
> +#include "utils/guc.h"
> +#include "catalog/pg_class.h"
> +#include "catalog/pg_type.h"
> +#include "executor/spi.h"
> +#include "access/xact.h"
> +#include "utils/rel.h"
> +#include "port/atomics.h"

I'd rather just sort these alphabetically.




I think this should rather be in the initial header.

> +/*
> + * autoprewarm :
> + *
> + * What is it?
> + * ===========
> + * A bgworker which automatically records information about blocks which were
> + * present in buffer pool before server shutdown and then prewarm the buffer
> + * pool upon server restart with those blocks.
> + *
> + * How does it work?
> + * =================
> + * When the shared library "pg_prewarm" is preloaded, a
> + * bgworker "autoprewarm" is launched immediately after the server has 
> reached
> + * consistent state. The bgworker will start loading blocks recorded in the
> + * format BlockInfoRecord
> + * <<DatabaseId,TableSpaceId,RelationId,Forknum,BlockNum>> in
> + * $PGDATA/AUTOPREWARM_FILE, until there is a free buffer left in the buffer
> + * pool. This way we do not replace any new blocks which were loaded either 
> by
> + * the recovery process or the querying clients.

s/until there is a/until there is no/?


> +/*
> + * 
> ============================================================================
> + * ===========================        SIGNAL HANDLERS        
> ===========================
> + * 
> ============================================================================
> + */

Hm...

> +static void sigtermHandler(SIGNAL_ARGS);
> +static void sighupHandler(SIGNAL_ARGS);

I don't think that's a casing we commonly use.  We mostly use CamelCase
or underscore_case.


> +/*
> + *   Signal handler for SIGUSR1.
> + */
> +static void
> +sigusr1Handler(SIGNAL_ARGS)
> +{
> +     int                     save_errno = errno;
> +
> +     if (MyProc)
> +             SetLatch(&MyProc->procLatch);
> +
> +     errno = save_errno;
> +}

Hm, what's this one for?


> +/*
> + * Shared state information about the running autoprewarm bgworker.
> + */
> +typedef struct AutoPrewarmSharedState
> +{
> +     pg_atomic_uint32 current_task;          /* current tasks performed by
> +                                                                             
>  * autoprewarm workers. */
> +} AutoPrewarmSharedState;

Hm.  Why do we need atomics here?  I thought there's no concurrency?


> +/*
> + * sort_cmp_func - compare function used for qsort().
> + */
> +static int
> +sort_cmp_func(const void *p, const void *q)
> +{

rename to blockinfo_cmp?



> +static AutoPrewarmTask
> +get_autoprewarm_task(AutoPrewarmTask todo_task)
> +{
> +     bool            found;
> +
> +     state = NULL;
> +
> +     LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
> +     state = ShmemInitStruct("autoprewarm",
> +                                                     
> sizeof(AutoPrewarmSharedState),
> +                                                     &found);
> +     if (!found)
> +             pg_atomic_write_u32(&(state->current_task), todo_task);

Superflous parens (repeated a lot).


> +     LWLockRelease(AddinShmemInitLock);
> +
> +     /* If found check if we can go ahead. */
> +     if (found)
> +     {
> +             if (pg_atomic_read_u32(&(state->current_task)) ==
> +                     TASK_PREWARM_BUFFERPOOL)

You repeat the read in every branch - why don't you store it in a
variable instead?

That aside, the use of an atomic doesn't seem to actually gain us
anything here.  If we need control over concurrency it seems a lot
better to instead use a lwlock or spinlock.  There's no contention here,
using lock-free stuff just increases complexity without a corresponding
benefit.


> +             {
> +                     if (todo_task == TASK_PREWARM_BUFFERPOOL)
> +                     {
> +                             /*
> +                              * we were prewarming and we are back to do 
> same, time to
> +                              * abort prewarming and move to dumping.
> +                              */

I'm not sure what "back to do same" should mean here - changing to a
different type of task surely is not the same.


> +                             pg_atomic_write_u32(&(state->current_task),
> +                                                                     
> TASK_DUMP_BUFFERPOOL_INFO);
> +                             return TASK_DUMP_BUFFERPOOL_INFO;
> +                     }
> +                     else
> +                             return TASK_END;        /* rest all cannot 
> proceed further. */

What does that comment mean?


> +             }
> +             else if (pg_atomic_read_u32(&(state->current_task)) ==
> +                              TASK_DUMP_IMMEDIATE_ONCE)
> +             {
> +                     uint32          current_state = 
> TASK_DUMP_IMMEDIATE_ONCE;
> +
> +                     /* We cannot do a TASK_PREWARM_BUFFERPOOL but rest can 
> go ahead */
> +                     if (todo_task == TASK_DUMP_IMMEDIATE_ONCE)
> +                             return TASK_DUMP_IMMEDIATE_ONCE;
> +
> +                     if (todo_task == TASK_PREWARM_BUFFERPOOL)
> +                             todo_task = TASK_DUMP_BUFFERPOOL_INFO;  /* skip 
> to do dump only */
> +
> +                     /*
> +                      * first guy who can atomically set the current_task 
> get the
> +                      * opportunity to proceed further
> +                      */
> +                     if 
> (pg_atomic_compare_exchange_u32(&(state->current_task),
> +                                                                             
>            &current_state,
> +                                                                             
>            TASK_DUMP_BUFFERPOOL_INFO))
> +                     {
> +                             /* Wow! We won the race proceed with the task. 
> */
> +                             return TASK_DUMP_BUFFERPOOL_INFO;
> +                     }
> +                     else
> +                             return TASK_END;

Note that it's not generally guaranteed that any
pg_atomic_compare_exchange_u32 actually wins, it could temporarily fail
for all.


> +/*
> + * getnextblockinfo -- given a BlkType get its next BlockInfoRecord from the
> + *                                      dump file.
> + */
> +static BlkType
> +getnextblockinfo(FILE *file, BlockInfoRecord *currblkinfo, BlkType reqblock,
> +                              BlockInfoRecord *newblkinfo)
> +{
> +     BlkType         nextblk;
> +
> +     while (true)
> +     {
> +             /* get next block. */
> +             if (5 != fscanf(file, "%u,%u,%u,%u,%u\n", 
> &(newblkinfo->database),
> +                                             &(newblkinfo->spcNode), 
> &(newblkinfo->filenode),
> +                                             (uint32 *) 
> &(newblkinfo->forknum),
> +                                             &(newblkinfo->blocknum)))
> +                     return BLKTYPE_END; /* No more valid entry hence stop 
> processing. */

Hm.  Is it actually helpful to store the file as text?  That's commonly
going to increase the size of the file quite considerably, no?

> +/*
> + * GetRelOid -- given a filenode get its relation oid.
> + */
> +static Oid
> +get_reloid(Oid filenode)
> +{

Function and comment don't agree on naming.


But what is this actually used for?  I thought Robert, in
http://archives.postgresql.org/message-id/CA%2BTgmoa%3DUqCL2mR%2B9WTq05tB3Up-z4Sv2wkzkDxDwBP7Mj_2_w%40mail.gmail.com
suggested storing the filenode in the dump, and then to use
RelidByRelfilenode to get the corresponding relation?

It seems a lot better to use relfilenodes, because otherwise table
rewrites will lead to reloading wrong things.


> +     int                     ret;
> +     Oid                     relationid;
> +     bool            isnull;
> +     Datum           value[1] = {ObjectIdGetDatum(filenode)};
> +     StringInfoData buf;
> +     Oid                     ptype[1] = {OIDOID};
> +
> +     initStringInfo(&buf);
> +     appendStringInfo(&buf,
> +                     "select oid from pg_class where 
> pg_relation_filenode(oid) = $1");
> +
> +     ret = SPI_execute_with_args(buf.data, 1, (Oid *) &ptype, (Datum *) 
> &value,
> +                                                             NULL, true, 1);
> +
> +     if (ret != SPI_OK_SELECT)
> +             ereport(FATAL, (errmsg("SPI_execute failed: error code %d", 
> ret)));
> +
> +     if (SPI_processed < 1)
> +             return InvalidOid;
> +
> +     relationid = DatumGetObjectId(SPI_getbinval(SPI_tuptable->vals[0],
> +                                                                             
>                 SPI_tuptable->tupdesc,
> +                                                                             
>                 1, &isnull));
> +     if (isnull)
> +             return InvalidOid;
> +
> +     return relationid;
> +}

Doing this via SPI doesn't strike me as a good idea - that's really
quite expensive.  Why not call the underlying function directly?


> +/*
> + * load_one_database -- start of prewarm sub-worker, this will try to load
> + * blocks of one database starting from block info position passed by main
> + * prewarm worker.
> + */
> +void
> +load_one_database(Datum main_arg)
> +{

> +     /* check if file exists and open file in read mode. */
> +     snprintf(dump_file_path, sizeof(dump_file_path), "%s", 
> AUTOPREWARM_FILE);
> +     file = fopen(dump_file_path, PG_BINARY_R);
> +     if (!file)
> +             return;                                 /* No file to load. */

Shouldn't this be an error case?  In which case is it ok for the file to
be gone after we launched the worker?


> +     /*
> +      * It should be a block info belonging to a new database. Or else dump
> +      * file is corrupted better to end the loading of bocks now.
> +      */
> +     if (loadblocktype != BLKTYPE_NEW_DATABASE)
> +             goto end_load;                  /* should we raise a voice 
> here? */

Yes, this should raise an error.



> +                     case BLKTYPE_NEW_RELATION:
> +
> +                             /*
> +                              * release lock on previous relation.
> +                              */
> +                             if (rel)
> +                             {
> +                                     relation_close(rel, AccessShareLock);
> +                                     rel = NULL;
> +                             }
> +
> +                             loadblocktype = BLKTYPE_NEW_RELATION;
> +
> +                             /*
> +                              * lock new relation.
> +                              */
> +                             reloid = get_reloid(toload_block.filenode);
> +
> +                             if (!OidIsValid(reloid))
> +                                     break;
> +
> +                             rel = try_relation_open(reloid, 
> AccessShareLock);
> +                             if (!rel)
> +                                     break;
> +                             RelationOpenSmgr(rel);

Now I'm confused.  Your get_reloid used pg_relation_filenode() to map
from relation oid to filenode - and then you're using it to lock the
relation?  Something's wrong.


> +                     case BLKTYPE_NEW_FORK:
> +
> +                             /*
> +                              * check if fork exists and if block is within 
> the range
> +                              */
> +                             loadblocktype = BLKTYPE_NEW_FORK;
> +                             if (                    /* toload_block.forknum 
> > InvalidForkNumber &&
> +                                                              * 
> toload_block.forknum <= MAX_FORKNUM && */
> +                                     !smgrexists(rel->rd_smgr, 
> toload_block.forknum))
> +                                     break;

Huh? What's with that commented out section of code?


> +                     case BLKTYPE_NEW_BLOCK:
> +
> +                             /* check if blocknum is valid and with in fork 
> file size. */
> +                             if (toload_block.blocknum >= nblocks)
> +                             {
> +                                     /* move to next forknum. */
> +                                     loadblocktype = BLKTYPE_NEW_FORK;
> +                                     break;
> +                             }

Hm. Why does the size of the underlying file allow us to skip to the
next fork? Don't we have to read all the pending dump records?


> +                             buf = ReadBufferExtended(rel, 
> toload_block.forknum,
> +                                                                             
>  toload_block.blocknum, RBM_NORMAL,
> +                                                                             
>  NULL);
> +                             if (BufferIsValid(buf))
> +                             {
> +                                     ReleaseBuffer(buf);
> +                             }
> +
> +                             loadblocktype = BLKTYPE_NEW_BLOCK;
> +                             break;

Hm. RBM_NORMAL will error out in a bunch of cases, is that ok?

> +     if (have_dbconnection)
> +     {
> +             SPI_finish();
> +             PopActiveSnapshot();
> +             CommitTransactionCommand();
> +     }
> +     return;
> +}

Are we really ok keeping open a transaction through all of this? That
could potentially be quite long, no?  How about doing that on a per-file
basis, or even moving to session locks alltogether?



> +/* This sub-module is for periodically dumping buffer pool's block info into
> + * a dump file AUTOPREWARM_FILE.
> + * Each entry of block info looks like this:
> + * <DatabaseId,TableSpaceId,RelationId,Forknum,BlockNum> and we shall call it
> + * as BlockInfoRecord.
> + *
> + * Contents of AUTOPREWARM_FILE has been formated such a way that
> + * blockInfoRecord of each database can be given to different prewarm 
> workers.
> + *
> + *   format of AUTOPREWAM_FILE
> + *   =======================================
> + *   [offset position of database map table]
> + *   [sorted BlockInfoRecords..............]
> + *   [database map table]
> + *   =======================================

This doesn't mention storing things as ascii, instead of binary...


> + *   The [database map table] is sequence of offset in file which will point 
> to
> + *   first BlockInfoRecords of each database in the dump. The prewarm worker
> + *   will read this offset one by one in sequence and ask its subworker to 
> seek
> + *   to this position and then start loading the BlockInfoRecords one by one
> + *   until it see a BlockInfoRecords of a different database than it is 
> actually
> + *   connected to.
> + *   NOTE : We store off_t inside file so the dump file will not be portable 
> to
> + *   be used across systems where sizeof off_t is different from each other.
> + */

Why are we using off_t? Shouldn't this just be BlockNumber?


> +static uint32
> +dump_now(void)
> +{
> +     static char dump_file_path[MAXPGPATH],

> +
> +     for (num_blocks = 0, i = 0; i < NBuffers; i++)
> +     {
> +             uint32          buf_state;
> +
> +             bufHdr = GetBufferDescriptor(i);
> +
> +             /* lock each buffer header before inspecting. */
> +             buf_state = LockBufHdr(bufHdr);
> +
> +             if (buf_state & BM_TAG_VALID)
> +             {
> +                     block_info_array[num_blocks].database = 
> bufHdr->tag.rnode.dbNode;
> +                     block_info_array[num_blocks].spcNode = 
> bufHdr->tag.rnode.spcNode;
> +                     block_info_array[num_blocks].filenode = 
> bufHdr->tag.rnode.relNode;
> +                     block_info_array[num_blocks].forknum = 
> bufHdr->tag.forkNum;
> +                     block_info_array[num_blocks].blocknum = 
> bufHdr->tag.blockNum;
> +                     ++num_blocks;
> +             }
> +
> +             UnlockBufHdr(bufHdr, buf_state);

> +     }
> +
> +     /* sorting now only to avoid sorting while loading. */

"sorting while loading"? You mean random accesses?

> +     pg_qsort(block_info_array, num_blocks, sizeof(BlockInfoRecord),
> +                      sort_cmp_func);



> +     snprintf(transient_dump_file_path, sizeof(dump_file_path),
> +                      "%s.%d", AUTOPREWARM_FILE, MyProcPid);
> +     file = fopen(transient_dump_file_path, "w");
> +     if (file == NULL)
> +             ereport(ERROR,
> +                             (errcode_for_file_access(),
> +                              errmsg("autoprewarm: could not open \"%s\": 
> %m",
> +                                             dump_file_path)));

What if that file already exists? You're not truncating it.  Also, what
if we error out in the middle of this? We'll leak an fd.  I think this
needs to use OpenTransientFile etc.

> +     snprintf(dump_file_path, sizeof(dump_file_path),
> +                      "%s", AUTOPREWARM_FILE);
> +     ret = fprintf(file, "%020jd\n", (intmax_t) 0);
> +     if (ret < 0)
> +     {
> +             fclose(file);
> +             ereport(ERROR,
> +                             (errcode_for_file_access(),
> +                              errmsg("autoprewarm: error writing to \"%s\" : 
> %m",
> +                                             dump_file_path)));
> +     }
> +
> +     database_map_table[num_db++] = ftello(file);
> +
> +     for (i = 0; i < num_blocks; i++)
> +     {
> +             if (i > 0 && block_info_array[i].database != prev_database)
> +             {
> +                     if (num_db == database_map_table_size)
> +                     {
> +                             database_map_table_size *= 2;   /* double and 
> repalloc. */
> +                             database_map_table =
> +                                     (off_t *) repalloc(database_map_table,
> +                                                                     
> sizeof(off_t) * database_map_table_size);
> +                     }
> +                     fflush(file);
> +                     database_map_table[num_db++] = ftello(file);
> +             }
> +
> +             ret = fprintf(file, "%u,%u,%u,%u,%u\n",
> +                                       block_info_array[i].database,
> +                                       block_info_array[i].spcNode,
> +                                       block_info_array[i].filenode,
> +                                       (uint32) block_info_array[i].forknum,
> +                                       block_info_array[i].blocknum);
> +             if (ret < 0)
> +             {
> +                     fclose(file);
> +                     ereport(ERROR,
> +                                     (errcode_for_file_access(),
> +                                      errmsg("autoprewarm: error writing to 
> \"%s\" : %m",
> +                                                     dump_file_path)));
> +             }
> +
> +             prev_database = block_info_array[i].database;
> +     }

I think we should check for interrupts somewhere in that (and the
preceding) loop.

> +/*
> + * dump_block_info_periodically - at regular intervals, which is defined by 
> GUC
> + * dump_interval, dump the info of blocks which are present in buffer pool.
> + */
> +void
> +dump_block_info_periodically()
> +{

Suggest adding void to the parameter list.


> +     pg_time_t       last_dump_time = (pg_time_t) time(NULL);
> +
> +     while (!got_sigterm)
> +     {
> +             int                     rc;
> +             pg_time_t       now;
> +             int                     elapsed_secs = 0,
> +                                     timeout = 
> AT_PWARM_DEFAULT_DUMP_INTERVAL;
> +
> +             if (dump_interval > AT_PWARM_DUMP_AT_SHUTDOWN_ONLY)
> +             {
> +                     now = (pg_time_t) time(NULL);
> +                     elapsed_secs = now - last_dump_time;
> +
> +                     if (elapsed_secs > dump_interval)
> +                     {
> +                             dump_now();
> +                             if (got_sigterm)
> +                                     return;         /* got shutdown signal 
> just after a dump. And,
> +                                                              * I think 
> better to return now. */
> +                             last_dump_time = (pg_time_t) time(NULL);
> +                             elapsed_secs = 0;
> +                     }
> +
> +                     timeout = dump_interval - elapsed_secs;
> +             }

I suggest using GetCurrenttimstamp() and TimestampDifferenceExceeds()
instead.


> +             /* Has been set not to dump. Nothing more to do. */
> +             if (dump_interval == AT_PWARM_OFF)
> +                     return;
> +
> +             ResetLatch(&MyProc->procLatch);
> +             rc = WaitLatch(&MyProc->procLatch,
> +                                        WL_LATCH_SET | WL_TIMEOUT | 
> WL_POSTMASTER_DEATH,
> +                                        timeout * 1000, PG_WAIT_EXTENSION);
> +
> +             if (rc & WL_POSTMASTER_DEATH)
> +                     proc_exit(1);
> +
> +             /*
> +              * In case of a SIGHUP, just reload the configuration.
> +              */
> +             if (got_sighup)
> +             {
> +                     got_sighup = false;
> +                     ProcessConfigFile(PGC_SIGHUP);
> +             }
> +     }
> +
> +     /* One last block meta info dump while postmaster shutdown. */
> +     if (dump_interval != AT_PWARM_OFF)
> +             dump_now();

Uh, afaics we'll also do this if somebody SIGTERMed the process
interactively?


> +/* Extension's entry point. */
> +void
> +_PG_init(void)
> +{
> +     BackgroundWorker autoprewarm;
> +
> +     /* Define custom GUC variables. */
> +     DefineCustomIntVariable("pg_prewarm.dump_interval",
> +                                        "Sets the maximum time between two 
> buffer pool dumps",
> +                                                     "If set to Zero, timer 
> based dumping is disabled."
> +                                                     " If set to -1, stops 
> the running autoprewarm.",
> +                                                     &dump_interval,
> +                                                     
> AT_PWARM_DEFAULT_DUMP_INTERVAL,
> +                                                     AT_PWARM_OFF, INT_MAX / 
> 1000,
> +                                                     PGC_SIGHUP,
> +                                                     GUC_UNIT_S,
> +                                                     NULL,
> +                                                     NULL,
> +                                                     NULL);
> +
> +     /* if not run as a preloaded library, nothing more to do here! */
> +     if (!process_shared_preload_libraries_in_progress)
> +             return;
> +
> +     DefineCustomStringVariable("pg_prewarm.default_database",
> +                             "default database to connect if dump has not 
> recorded same.",
> +                                                        NULL,
> +                                                        &default_database,
> +                                                        "postgres",
> +                                                        PGC_POSTMASTER,
> +                                                        0,
> +                                                        NULL,
> +                                                        NULL,
> +                                                        NULL);

I don't think it's a good idea to make guc registration depending on
process_shared_preload_libraries_in_progress.


You should also use EmitWarningsOnPlaceholders() somewhere here.




I also wonder whether we don't need to use prefetch to actually make
this fast enough.


I think it's pretty clear that this needs a bit more work and thus won't
be ready for v10.  Moved to the next CF.


- Andres


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to