Hi, This is the patch to put multiple workers into autovacuum. This patch applies after the recheck patch I just posted.
The main change is to have an array of Worker structs in shared memory; each worker checks the current table of all other Workers, and skips a table that's being vacuumed by any of them. It also rechecks the table before vacuuming, which removes the problem of redundant vacuuming. It also introduces the business of SIGUSR1 between workers and launcher. The launcher keeps a database list in memory and schedules workers to vacuum databases depending on that list. The actual database selected may differ from what was in the schedule; in that case, the list is reconstructed. There are two main FIXMEs in this code: 1. have the list reconstruction and scheduling be smarter so that databases are not ganged together in the schedule. The only difficulty is keeping the sort order that the databases had. 2. have a way to clean up after failed workers filling up the Worker array and thus starving other databases from vacuuming. I don't really know a way to do this that works in all cases. The only idea I have so far is that workers that started more than autovacuum_naptime seconds ago are considered failed to start. Neither of these is really minor, but I think they are solvable. -- Alvaro Herrera http://www.CommandPrompt.com/ PostgreSQL Replication, Consulting, Custom Development, 24x7 support
*** 14recheck/src/backend/postmaster/autovacuum.c 2007-03-27 16:43:31.000000000 -0400 --- 12vacuum/src/backend/postmaster/autovacuum.c 2007-03-27 17:40:19.000000000 -0400 *************** *** 52,57 **** --- 52,58 ---- #include "utils/syscache.h" + static volatile sig_atomic_t got_SIGUSR1 = false; static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t avlauncher_shutdown_request = false; *************** *** 59,64 **** --- 60,66 ---- * GUC parameters */ bool autovacuum_start_daemon = false; + int autovacuum_max_workers; int autovacuum_naptime; int autovacuum_vac_thresh; double autovacuum_vac_scale; *************** *** 69,75 **** int autovacuum_vac_cost_delay; int autovacuum_vac_cost_limit; ! /* Flag to tell if we are in the autovacuum daemon process */ static bool am_autovacuum_launcher = false; static bool am_autovacuum_worker = false; --- 71,77 ---- int autovacuum_vac_cost_delay; int autovacuum_vac_cost_limit; ! /* Flags to tell if we are in an autovacuum process */ static bool am_autovacuum_launcher = false; static bool am_autovacuum_worker = false; *************** *** 86,91 **** --- 88,94 ---- typedef struct autovac_dbase { Oid ad_datid; + TimestampTz ad_next_worker; char *ad_name; TransactionId ad_frozenxid; PgStat_StatDBEntry *ad_entry; *************** *** 110,123 **** int at_vacuum_cost_limit; } autovac_table; typedef struct { ! Oid process_db; /* OID of database to process */ ! int worker_pid; /* PID of the worker process, if any */ } AutoVacuumShmemStruct; static AutoVacuumShmemStruct *AutoVacuumShmem; #ifdef EXEC_BACKEND static pid_t avlauncher_forkexec(void); static pid_t avworker_forkexec(void); --- 113,158 ---- int at_vacuum_cost_limit; } autovac_table; + /*------------- + * This struct holds information about a single worker's whereabouts. We keep + * an array of these in shared memory, sized according to + * autovacuum_max_workers. + * + * wi_dboid OID of the database this worker is supposed to work on + * wi_tableoid OID of the table currently being vacuumed + * wi_workerpid PID of the running worker, 0 if not yet started + * wi_finished True when the worker is done and about to exit + *------------- + */ + typedef struct + { + Oid wi_dboid; + Oid wi_tableoid; + int wi_workerpid; + bool wi_finished; + } WorkerInfo; + typedef struct { ! pid_t av_launcherpid; ! WorkerInfo av_workers[1]; ! /* VARIABLE LENGTH STRUCT */ } AutoVacuumShmemStruct; + /* Macro to iterate over all workers. Beware multiple evaluation of args! */ + #define foreach_worker(_i, _worker) \ + _worker = (WorkerInfo *) (AutoVacuumShmem + \ + offsetof(AutoVacuumShmemStruct, av_workers)); \ + for (_i = 0; _i < autovacuum_max_workers; _i++, _worker += sizeof(WorkerInfo)) + static AutoVacuumShmemStruct *AutoVacuumShmem; + /* number of currently free worker slots; only valid in the launcher */ + static int free_workers; + /* the database list in the launcher, and the context that contains it */ + static Dllist *DatabaseList = NULL; + static MemoryContext DatabaseListCxt = NULL; + #ifdef EXEC_BACKEND static pid_t avlauncher_forkexec(void); static pid_t avworker_forkexec(void); *************** *** 125,133 **** NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]); NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]); ! static void do_start_worker(void); ! static void do_autovacuum(Oid dbid); ! static List *autovac_get_database_list(void); static void relation_check_autovac(Oid relid, Form_pg_class classForm, Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry, --- 160,174 ---- NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]); NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]); ! static Oid do_start_worker(void); ! static int launcher_determine_sleep(void); ! static void launch_worker(TimestampTz now); ! static List *get_database_list(void); ! static void rebuild_database_list(Oid newdb); ! static int avdb_comparator(const void *a, const void *b); ! ! static void do_autovacuum(WorkerInfo *worker, Oid dbid); ! static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid); static void relation_check_autovac(Oid relid, Form_pg_class classForm, Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry, *************** *** 138,144 **** PgStat_StatDBEntry *dbentry); static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum, bool doanalyze, int freeze_min_age); - static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid); static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared, PgStat_StatDBEntry *shared, PgStat_StatDBEntry *dbentry); --- 179,184 ---- *************** *** 149,154 **** --- 189,195 ---- static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid); static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid); static void avl_sighup_handler(SIGNAL_ARGS); + static void avl_sigusr1_handler(SIGNAL_ARGS); static void avlauncher_shutdown(SIGNAL_ARGS); static void avl_quickdie(SIGNAL_ARGS); *************** *** 232,237 **** --- 273,312 ---- /* * Main loop for the autovacuum launcher process. + * + * The signalling between launcher and worker is as follows: + * + * When the worker has finished starting up, it stores its PID in wi_workerpid + * and sends a SIGUSR1 signal to the launcher. The launcher then knows that + * the postmaster is ready to start a new worker. We do it this way because + * otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't + * yet processed the last one, in which case the second signal would be lost. + * This is only useful when two workers need to be started close to one + * another, which should be rare but it's possible. + * + * Additionally, when the worker is finished with the vacuum work, it sets the + * wi_finished flag and sends a SIGUSR1 signal to the launcher. Upon receipt + * of this signal, the launcher then clears the entry for future use and may + * start another worker right away, if need be. + * + * Note that there is still a race condition here, because a worker may finish + * just after the launcher sent the signal to postmaster, but before postmaster + * processes it. At this point, the launcher receives a signal, sees the empty + * slot, so it sends the postmaster the signal again to start another worker. + * But the postmaster flag was already set, so the signal is lost. To avoid + * this problem, the launcher will not try to start a new worker until all + * WorkerInfo entries that have the wi_dboid field set have a PID assigned. + * (FIXME -- this part is not yet implemented.) + * + * There is an additional problem if, for some reason, a worker starts and + * is not able to finish its task correctly. It will not be able to set its + * finished flag, so the launcher will believe that it's still starting up. + * To prevent this problem, we check the PGPROCs of worker processes, and + * clean them up if we find they are not actually running (or they correspond + * to processes that are not autovacuum workers.) We only do it if all + * WorkerInfo structures are in use, thus frequently enough so that this + * problem doesn't cause any starvation, but seldom enough so that it's not a + * performance hit. (FIXME -- this part is not yet implemented.) */ NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]) *************** *** 266,274 **** * Set up signal handlers. Since this is an auxiliary process, it has * particular signal requirements -- no deadlock checker or sinval * catchup, for example. - * - * XXX It may be a good idea to receive signals when an avworker process - * finishes. */ pqsignal(SIGHUP, avl_sighup_handler); --- 341,346 ---- *************** *** 278,284 **** pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); ! pqsignal(SIGUSR1, SIG_IGN); /* We don't listen for async notifies */ pqsignal(SIGUSR2, SIG_IGN); pqsignal(SIGFPE, FloatExceptionHandler); --- 350,356 ---- pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); ! pqsignal(SIGUSR1, avl_sigusr1_handler); /* We don't listen for async notifies */ pqsignal(SIGUSR2, SIG_IGN); pqsignal(SIGFPE, FloatExceptionHandler); *************** *** 360,380 **** /* We can now handle ereport(ERROR) */ PG_exception_stack = &local_sigjmp_buf; ereport(LOG, (errmsg("autovacuum launcher started"))); ! PG_SETMASK(&UnBlockSig); /* ! * take a nap before executing the first iteration, unless we were ! * requested an emergency run. */ ! if (autovacuum_start_daemon) ! pg_usleep(autovacuum_naptime * 1000000L); for (;;) { ! int worker_pid; /* * Emergency bailout if postmaster has died. This is to avoid the --- 432,464 ---- /* We can now handle ereport(ERROR) */ PG_exception_stack = &local_sigjmp_buf; + /* in emergency mode, just start a worker and go away */ + if (!autovacuum_start_daemon) + { + do_start_worker(); + proc_exit(0); /* done */ + } + ereport(LOG, (errmsg("autovacuum launcher started"))); ! AutoVacuumShmem->av_launcherpid = MyProcPid; /* ! * Create the initial database list. The invariant we want this list to ! * keep is that it's ordered by decreasing next_time; initially all times ! * are zero, so order doesn't matter. As soon as an entry is updated to ! * a higher time, it will be moved to the front (which is correct because ! * the only operation is to add autovacuum_naptime to the entry, and time ! * always increases). */ ! rebuild_database_list(InvalidOid); ! free_workers = autovacuum_max_workers; + PG_SETMASK(&UnBlockSig); for (;;) { ! int millis; /* * Emergency bailout if postmaster has died. This is to avoid the *************** *** 383,388 **** --- 467,478 ---- if (!PostmasterIsAlive(true)) exit(1); + millis = launcher_determine_sleep(); + + /* Sleep for a while according to schedule */ + pg_usleep(millis * 1000); + + /* the normal shutdown case */ if (avlauncher_shutdown_request) break; *************** *** 392,471 **** ProcessConfigFile(PGC_SIGHUP); } /* ! * if there's a worker already running, sleep until it ! * disappears. */ ! LWLockAcquire(AutovacuumLock, LW_SHARED); ! worker_pid = AutoVacuumShmem->worker_pid; ! LWLockRelease(AutovacuumLock); ! if (worker_pid != 0) { ! PGPROC *proc = BackendPidGetProc(worker_pid); ! if (proc != NULL && proc->isAutovacuum) ! goto sleep; else { /* ! * if the worker is not really running (or it's a process ! * that's not an autovacuum worker), remove the PID from shmem. ! * This should not happen, because either the worker exits ! * cleanly, in which case it'll remove the PID, or it dies, in ! * which case postmaster will cause a system reset cycle. */ ! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); ! worker_pid = 0; ! LWLockRelease(AutovacuumLock); } } - do_start_worker(); - - sleep: - /* - * in emergency mode, exit immediately so that the postmaster can - * request another run right away if needed. - * - * XXX -- maybe it would be better to handle this inside the launcher - * itself. - */ - if (!autovacuum_start_daemon) - break; - /* have pgstat read the file again next time */ pgstat_clear_snapshot(); - - /* now sleep until the next autovac iteration */ - pg_usleep(autovacuum_naptime * 1000000L); } /* Normal exit from the autovac launcher is here */ ereport(LOG, (errmsg("autovacuum launcher shutting down"))); proc_exit(0); /* done */ } /* * do_start_worker * * Bare-bones procedure for starting an autovacuum worker from the launcher. * It determines what database to work on, sets up shared memory stuff and ! * signals postmaster to start the worker. */ ! static void do_start_worker(void) { List *dblist; ! bool for_xid_wrap; ! autovac_dbase *db; ! ListCell *cell; TransactionId xidForceLimit; /* Get a list of databases */ ! dblist = autovac_get_database_list(); /* * Determine the oldest datfrozenxid/relfrozenxid that we will allow --- 482,829 ---- ProcessConfigFile(PGC_SIGHUP); } + /* a worker started up or finished */ + if (got_SIGUSR1) + { + WorkerInfo *worker; + int i; + + got_SIGUSR1 = false; + + /* Walk the workers and clean up finished entries. */ + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + foreach_worker(i, worker) + { + if (worker->wi_finished) + { + worker->wi_tableoid = InvalidOid; + worker->wi_dboid = InvalidOid; + worker->wi_workerpid = 0; + worker->wi_finished = false; + free_workers++; + } + } + LWLockRelease(AutovacuumLock); + } + + #if 0 /* ! * Find and remove all entries corresponding to workers that failed to ! * start. Problem: how do we detect that it failed to start, yet leave ! * alone those that are still really starting up? ! * ! * Idea: if we find that a database is listed twice, and none of the ! * workers has registered, then something is wrong. This fails if ! * all workers failed in different databases however. ! * ! * Another idea: wreak havoc if a worker was started longer than ! * autovac_naptime seconds ago and still hasn't registered. */ ! if (free_workers == 0) ! { ! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); ! foreach_worker(i, worker) ! { ! if (worker->wi_workerpid == 0 && ...) ! clear the worker entry ! } ! LWLockRelease(AutovacuumLock); ! } ! #endif ! /* ! * See if there's need to start a new worker, and do so if possible. ! * If there are no free worker slots, avoid doing all this work, as ! * we will not be able to start the worker anyway. ! */ ! if (free_workers > 0) { ! TimestampTz current_time; ! Dlelem *elem; ! ! elem = DLGetTail(DatabaseList); ! current_time = GetCurrentTimestamp(); ! ! if (elem != NULL) ! { ! autovac_dbase *avdb = DLE_VAL(elem); ! long secs; ! int usecs; ! ! TimestampDifference(current_time, avdb->ad_next_worker, &secs, &usecs); ! ! /* we have to start a worker now */ ! if (secs <= 0 && usecs <= 0) ! launch_worker(current_time); ! } else { /* ! * Special case when the list is empty: start a worker right ! * away. This covers the initial case, when no database is in ! * pgstats (thus the list is empty). */ ! launch_worker(current_time); } } /* have pgstat read the file again next time */ pgstat_clear_snapshot(); } /* Normal exit from the autovac launcher is here */ ereport(LOG, (errmsg("autovacuum launcher shutting down"))); + AutoVacuumShmem->av_launcherpid = 0; proc_exit(0); /* done */ } + + /* + * Determine the time to sleep based on the database list. + */ + static int + launcher_determine_sleep(void) + { + long secs; + int usecs; + Dlelem *elem; + + /* + * We sleep until the next scheduled vacuum. We trust that when the + * database list was built, care was taken so that no entries have + * negative times; if the first entry has a too close next_worker value, + * we will sleep a small nominal time. + */ + elem = DLGetTail(DatabaseList); + if (elem != NULL) + { + autovac_dbase *avdb = DLE_VAL(elem); + TimestampTz current_time = GetCurrentTimestamp(); + TimestampTz next_wakeup; + + next_wakeup = avdb->ad_next_worker; + TimestampDifference(current_time, next_wakeup, &secs, &usecs); + } + else + { + /* list is empty, sleep for whole autovacuum_naptime seconds */ + secs = autovacuum_naptime; + usecs = 0; + } + /* + * someone screwed up (invalid entry on the list); sleep a nominal amount + */ + if (secs <= 0L && usecs <= 0) + { + secs = 0; + usecs = 500000; /* 500 ms */ + } + + return secs * 1000 + usecs / 1000; + } + + /* + * Build an updated DatabaseList. It must only contain databases that appear + * in pgstats, and must be sorted by next_worker from highest to lowest, distributed + * regularly across the next autovacuum_naptime interval. + * + * Receives the Oid of the database that made this list be generated (we call + * this the "new" database, because when the database was already present on + * the list, we expect that this function is not called at all). The + * preexisting list, if any, will be used to preserve the order of the + * databases in the autovacuum_naptime period. The new database is put at the + * end of the interval. The actual values are not saved. + */ + static void + rebuild_database_list(Oid newdb) + { + Dllist *newdllist; + List *dblist; + ListCell *cell; + autovac_dbase *dbary; + int i; + TimestampTz current_time; + TimestampTz initial_time; + int millis_increment = 0; + int total_dbs; + int unset_dbs = 0; + MemoryContext newcxt; + MemoryContext oldcxt; + + newcxt = AllocSetContextCreate(TopMemoryContext, + "dblist context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + oldcxt = MemoryContextSwitchTo(newcxt); + + /* + * To build a sorted dllist, we first store the elements in a fixed-size + * array, which we sort, and finally we store the individual elements in + * the doubly linked list. + */ + dblist = get_database_list(); + current_time = GetCurrentTimestamp(); + + /* + * The new database array. We must not free it: these will become the + * elements of the Dllist. + */ + dbary = palloc(sizeof(autovac_dbase) * list_length(dblist)); + + i = 0; + foreach(cell, dblist) + { + autovac_dbase *avdb = lfirst(cell); + PgStat_StatDBEntry *dbentry; + Dlelem *elm; + + Assert(avdb->ad_next_worker == 0); + + dbentry = pgstat_fetch_stat_dbentry(avdb->ad_datid); + /* skip DBs without pgstat entry */ + if (dbentry == NULL) + continue; + + /* We set the new database to "now + autovacuum_naptime" */ + if (avdb->ad_datid == newdb) + { + avdb->ad_next_worker = + TimestampTzPlusMilliseconds(current_time, + autovacuum_naptime * 1000); + } + else + { + /* + * Otherwise, if the database has an entry on the old list, copy + * the next_worker field into the new list. + */ + elm = DatabaseList ? DLGetHead(DatabaseList) : NULL; + while (elm != NULL) + { + autovac_dbase *tmp = DLE_VAL(elm); + + if (tmp->ad_datid == avdb->ad_datid) + { + avdb->ad_next_worker = tmp->ad_next_worker; + break; + } + elm = DLGetSucc(elm); + } + } + + /* other databases will have the time set later */ + if (avdb->ad_next_worker == 0) + unset_dbs++; + + /* copy the entry into the array */ + memcpy(&(dbary[i++]), avdb, sizeof(autovac_dbase)); + } + + total_dbs = i; + if (unset_dbs > 0) + millis_increment = autovacuum_naptime * 1000 / unset_dbs; + initial_time = TimestampTzPlusMilliseconds(current_time, millis_increment); + + /* now set the time to the unknown databases */ + for (i = 0; i < total_dbs; i++) + { + autovac_dbase *avdb = &dbary[i]; + + if (avdb->ad_next_worker == 0) + { + avdb->ad_next_worker = initial_time; + initial_time = TimestampTzPlusMilliseconds(initial_time, + millis_increment); + } + } + + /* sort the array */ + qsort(dbary, i, sizeof(autovac_dbase), avdb_comparator); + + /* enter each array element into the new dl list */ + newdllist = DLNewList(); + for (i = 0; i < total_dbs; i++) + { + Dlelem *elem; + + /* insert the the i-th array element as a dlelem into the new list */ + elem = DLNewElem(&dbary[i]); + DLAddHead(newdllist, elem); + } + + /* Free the old list, and set the new one up */ + if (DatabaseListCxt != NULL) + MemoryContextDelete(DatabaseListCxt); + DatabaseListCxt = newcxt; + DatabaseList = newdllist; + + MemoryContextSwitchTo(oldcxt); + } + + /* qsort comparator for autovac_dbase, using next_worker */ + static int + avdb_comparator(const void *a, const void *b) + { + if (((autovac_dbase *) a)->ad_next_worker == ((autovac_dbase *) b)->ad_next_worker) + return 0; + else + return (((autovac_dbase *) a)->ad_next_worker > ((autovac_dbase *) b)->ad_next_worker) ? 1 : -1; + } + + + + /* * do_start_worker * * Bare-bones procedure for starting an autovacuum worker from the launcher. * It determines what database to work on, sets up shared memory stuff and ! * signals postmaster to start the worker. It fails gracefully if invoked when ! * autovacuum_workers are already active. ! * ! * Return value is the OID of the database that the worker is going to process, ! * or InvalidOid if no worker was actually started. */ ! static Oid do_start_worker(void) { List *dblist; ! WorkerInfo *worker; ! int i; ! ListCell *cell; TransactionId xidForceLimit; + bool for_xid_wrap; + autovac_dbase *avdb; + TimestampTz current_time; + + /* + * Find an unused WorkerInfo entry to set up. If there is none, go to + * sleep. + * + * NB: we only read the array here, and save a pointer where we'll + * write the entry later. Since this is the only process that creates + * new entries into the array, there's no risk that somebody else will + * use that pointer while we weren't looking. + */ + LWLockAcquire(AutovacuumLock, LW_SHARED); + foreach_worker(i, worker) + { + /* Invalid database OID means unused worker entry; use it */ + if (!OidIsValid(worker->wi_dboid)) + break; + } + LWLockRelease(AutovacuumLock); + + /* they're all used up */ + if (i >= autovacuum_max_workers) + return InvalidOid; /* Get a list of databases */ ! dblist = get_database_list(); /* * Determine the oldest datfrozenxid/relfrozenxid that we will allow *************** *** 497,507 **** * isn't clear how to construct a metric that measures that and not cause * starvation for less busy databases. */ ! db = NULL; for_xid_wrap = false; foreach(cell, dblist) { autovac_dbase *tmp = lfirst(cell); /* Find pgstat entry if any */ tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid); --- 855,868 ---- * isn't clear how to construct a metric that measures that and not cause * starvation for less busy databases. */ ! avdb = NULL; for_xid_wrap = false; + current_time = GetCurrentTimestamp(); foreach(cell, dblist) { autovac_dbase *tmp = lfirst(cell); + bool skipit; + Dlelem *elem; /* Find pgstat entry if any */ tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid); *************** *** 509,517 **** /* Check to see if this one is at risk of wraparound */ if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit)) { ! if (db == NULL || ! TransactionIdPrecedes(tmp->ad_frozenxid, db->ad_frozenxid)) ! db = tmp; for_xid_wrap = true; continue; } --- 870,878 ---- /* Check to see if this one is at risk of wraparound */ if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit)) { ! if (avdb == NULL || ! TransactionIdPrecedes(tmp->ad_frozenxid, avdb->ad_frozenxid)) ! avdb = tmp; for_xid_wrap = true; continue; } *************** *** 526,548 **** continue; /* * Remember the db with oldest autovac time. (If we are here, * both tmp->entry and db->entry must be non-null.) */ ! if (db == NULL || ! tmp->ad_entry->last_autovac_time < db->ad_entry->last_autovac_time) ! db = tmp; } /* Found a database -- process it */ ! if (db != NULL) { LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); ! AutoVacuumShmem->process_db = db->ad_datid; LWLockRelease(AutovacuumLock); SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER); } } /* SIGHUP: set flag to re-read config file at next convenient time */ --- 887,1012 ---- continue; /* + * Also, skip a database that appears on the passed database list as + * having been processed recently (less than autovacuum_naptime seconds + * ago). We do this so that we don't select a database which we just + * selected, but that pgstat hasn't gotten around to updating the + * last autovacuum time yet. + */ + skipit = false; + elem = DatabaseList ? DLGetTail(DatabaseList) : NULL; + while (elem != NULL) + { + autovac_dbase *dbp = DLE_VAL(elem); + + if (dbp->ad_datid == tmp->ad_datid) + { + TimestampTz curr_plus_naptime; + TimestampTz next = dbp->ad_next_worker; + + curr_plus_naptime = + TimestampTzPlusMilliseconds(current_time, + autovacuum_naptime * 1000); + + /* + * What we want here if to skip if next_worker falls between + * the current time and the current time plus naptime. + */ + if (timestamp_cmp_internal(current_time, next) > 0) + skipit = false; + else if (timestamp_cmp_internal(next, curr_plus_naptime) > 0) + skipit = false; + else + skipit = true; + + break; + } + elem = DLGetPred(elem); + } + if (skipit) + continue; + + /* * Remember the db with oldest autovac time. (If we are here, * both tmp->entry and db->entry must be non-null.) */ ! if (avdb == NULL || ! tmp->ad_entry->last_autovac_time < avdb->ad_entry->last_autovac_time) ! avdb = tmp; } /* Found a database -- process it */ ! if (avdb != NULL) { LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); ! Assert(!OidIsValid(worker->wi_dboid)); ! worker->wi_dboid = avdb->ad_datid; ! worker->wi_workerpid = 0; LWLockRelease(AutovacuumLock); SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER); + + return avdb->ad_datid; + } + + return InvalidOid; + } + + /* + * launch_worker + * + * Wrapper for starting a worker from the launcher. Besides actually starting + * it, update the database list to reflect the next time that another one will + * need to be started on the selected database. The actual database choice is + * left to do_start_worker. + * + * This routine is also expected to insert an entry into the database list if + * the selected database was previously absent from the list. It returns the + * new database list. + */ + static void + launch_worker(TimestampTz now) + { + Oid dbid; + Dlelem *elem; + + dbid = do_start_worker(); + if (OidIsValid(dbid)) + free_workers--; + + /* + * Walk the database list and update corresponding entry. If it's not on + * the list, we'll recreate the list. + */ + elem = DLGetHead(DatabaseList); + while (elem != NULL) + { + autovac_dbase *avdb = DLE_VAL(elem); + + if (avdb->ad_datid == dbid) + { + /* + * add autovacuum_naptime seconds to the current time, and use that + * as the new "next_worker" field for this database. + */ + avdb->ad_next_worker = + TimestampTzPlusMilliseconds(now, autovacuum_naptime * 1000); + + DLMoveToFront(elem); + break; + } + elem = DLGetSucc(elem); } + + /* + * If the database was not present in the database list, we rebuild the + * list. It's possible that the database does not get into the list + * anyway, for example if it's a database that doesn't have a pgstat entry, + * but this is not a problem because we don't want to schedule workers + * regularly into those in any case. + */ + if (elem == NULL) + rebuild_database_list(dbid); } /* SIGHUP: set flag to re-read config file at next convenient time */ *************** *** 552,557 **** --- 1016,1028 ---- got_SIGHUP = true; } + /* SIGUSR1: a worker is up and running, or just finished */ + static void + avl_sigusr1_handler(SIGNAL_ARGS) + { + got_SIGUSR1 = true; + } + static void avlauncher_shutdown(SIGNAL_ARGS) { *************** *** 667,673 **** AutoVacWorkerMain(int argc, char *argv[]) { sigjmp_buf local_sigjmp_buf; ! Oid dbid; /* we are a postmaster subprocess now */ IsUnderPostmaster = true; --- 1138,1146 ---- AutoVacWorkerMain(int argc, char *argv[]) { sigjmp_buf local_sigjmp_buf; ! Oid dbid = InvalidOid; ! WorkerInfo *worker; ! int i; /* we are a postmaster subprocess now */ IsUnderPostmaster = true; *************** *** 765,780 **** SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE); /* ! * Get the database Id we're going to work on, and announce our PID ! * in the shared memory area. We remove the database OID immediately ! * from the shared memory area. */ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); ! ! dbid = AutoVacuumShmem->process_db; ! AutoVacuumShmem->process_db = InvalidOid; ! AutoVacuumShmem->worker_pid = MyProcPid; ! LWLockRelease(AutovacuumLock); if (OidIsValid(dbid)) --- 1238,1259 ---- SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE); /* ! * Walk the WorkerInfo array, and get the database OID we're going to work ! * on. Use the first entry with PID 0 in the list, and advertise our PID ! * on it, thus marking it used. */ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); ! foreach_worker(i, worker) ! { ! if (worker->wi_workerpid == 0) ! { ! dbid = worker->wi_dboid; ! worker->wi_workerpid = MyProcPid; ! break; ! } ! } ! if (AutoVacuumShmem->av_launcherpid != 0) ! kill(AutoVacuumShmem->av_launcherpid, SIGUSR1); LWLockRelease(AutovacuumLock); if (OidIsValid(dbid)) *************** *** 812,826 **** /* And do an appropriate amount of work */ recentXid = ReadNewTransactionId(); ! do_autovacuum(dbid); } - /* - * Now remove our PID from shared memory, so that the launcher can start - * another worker as soon as appropriate. - */ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); ! AutoVacuumShmem->worker_pid = 0; LWLockRelease(AutovacuumLock); /* All done, go away */ --- 1291,1318 ---- /* And do an appropriate amount of work */ recentXid = ReadNewTransactionId(); ! do_autovacuum(worker, dbid); } LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); ! if (!autovacuum_start_daemon) ! { ! /* in emergency mode we must cleanup after ourselves */ ! worker->wi_workerpid = 0; ! worker->wi_dboid = InvalidOid; ! worker->wi_finished = false; ! } ! else ! { ! /* ! * Otherwise, let the launcher know we're done. Warning: must set the ! * flag before sending the signal. XXX do we need to prevent compiler ! * overenthusiastic optimization here? ! */ ! worker->wi_finished = true; ! if (AutoVacuumShmem->av_launcherpid != 0) ! kill(AutoVacuumShmem->av_launcherpid, SIGUSR1); ! } LWLockRelease(AutovacuumLock); /* All done, go away */ *************** *** 828,840 **** } /* ! * autovac_get_database_list * * Return a list of all databases. Note we cannot use pg_database, * because we aren't connected; we use the flat database file. */ static List * ! autovac_get_database_list(void) { char *filename; List *dblist = NIL; --- 1320,1332 ---- } /* ! * get_database_list * * Return a list of all databases. Note we cannot use pg_database, * because we aren't connected; we use the flat database file. */ static List * ! get_database_list(void) { char *filename; List *dblist = NIL; *************** *** 862,867 **** --- 1354,1360 ---- avdb->ad_name = pstrdup(thisname); avdb->ad_frozenxid = db_frozenxid; /* this gets set later: */ + avdb->ad_next_worker = 0; avdb->ad_entry = NULL; dblist = lappend(dblist, avdb); *************** *** 880,886 **** * order not to ignore shutdown commands for too long. */ static void ! do_autovacuum(Oid dbid) { Relation classRel, avRel; --- 1373,1379 ---- * order not to ignore shutdown commands for too long. */ static void ! do_autovacuum(WorkerInfo *worker, Oid dbid) { Relation classRel, avRel; *************** *** 1027,1035 **** --- 1520,1550 ---- { Oid relid = lfirst_oid(cell); autovac_table *tab; + WorkerInfo *other_worker; + int i; + bool skipit; CHECK_FOR_INTERRUPTS(); + LWLockAcquire(AutovacuumScheduleLock, LW_EXCLUSIVE); + + /* + * Check whether the table is being vacuumed concurrently by another + * worker. + */ + skipit = false; + foreach_worker(i, other_worker) + { + if (other_worker->wi_tableoid == relid) + { + LWLockRelease(AutovacuumScheduleLock); + skipit = true; + break; + } + } + if (skipit) + continue; + /* * Check whether pgstat data still says we need to vacuum this table. * It could have changed if other worker processed the table while we *************** *** 1046,1054 **** if (tab == NULL) { /* someone else vacuumed the table */ continue; } ! /* Ok, good to go! */ /* Set the vacuum cost parameters for this table */ VacuumCostDelay = tab->at_vacuum_cost_delay; --- 1561,1576 ---- if (tab == NULL) { /* someone else vacuumed the table */ + LWLockRelease(AutovacuumScheduleLock); continue; } ! ! /* ! * Ok, good to go. Store the table in shared memory before releasing ! * the lock so that other workers don't vacuum it concurrently. ! */ ! worker->wi_tableoid = relid; ! LWLockRelease(AutovacuumScheduleLock); /* Set the vacuum cost parameters for this table */ VacuumCostDelay = tab->at_vacuum_cost_delay; *************** *** 1615,1621 **** Size AutoVacuumShmemSize(void) { ! return sizeof(AutoVacuumShmemStruct); } /* --- 2137,2144 ---- Size AutoVacuumShmemSize(void) { ! return add_size(offsetof(AutoVacuumShmemStruct, av_workers), ! mul_size(autovacuum_max_workers, sizeof(WorkerInfo))); } /* *************** *** 1638,1642 **** if (found) return; /* already initialized */ ! MemSet(AutoVacuumShmem, 0, sizeof(AutoVacuumShmemStruct)); } --- 2161,2165 ---- if (found) return; /* already initialized */ ! MemSet(AutoVacuumShmem, 0, AutoVacuumShmemSize()); }
---------------------------(end of broadcast)--------------------------- TIP 2: Don't 'kill -9' the postmaster