Hi,

This is the patch to put multiple workers into autovacuum.  This patch
applies after the recheck patch I just posted.

The main change is to have an array of Worker structs in shared memory;
each worker checks the current table of all other Workers, and skips a
table that's being vacuumed by any of them.  It also rechecks the table
before vacuuming, which removes the problem of redundant vacuuming.

It also introduces the business of SIGUSR1 between workers and launcher.
The launcher keeps a database list in memory and schedules workers to
vacuum databases depending on that list.  The actual database selected
may differ from what was in the schedule; in that case, the list is
reconstructed.

There are two main FIXMEs in this code:

1. have the list reconstruction and scheduling be smarter so that
databases are not ganged together in the schedule.  The only difficulty
is keeping the sort order that the databases had.

2. have a way to clean up after failed workers filling up the Worker
array and thus starving other databases from vacuuming.  I don't really
know a way to do this that works in all cases.  The only idea I have so
far is that workers that started more than autovacuum_naptime seconds
ago are considered failed to start.


Neither of these is really minor, but I think they are solvable.

-- 
Alvaro Herrera                                http://www.CommandPrompt.com/
PostgreSQL Replication, Consulting, Custom Development, 24x7 support
*** 14recheck/src/backend/postmaster/autovacuum.c	2007-03-27 16:43:31.000000000 -0400
--- 12vacuum/src/backend/postmaster/autovacuum.c	2007-03-27 17:40:19.000000000 -0400
***************
*** 52,57 ****
--- 52,58 ----
  #include "utils/syscache.h"
  
  
+ static volatile sig_atomic_t got_SIGUSR1 = false;
  static volatile sig_atomic_t got_SIGHUP = false;
  static volatile sig_atomic_t avlauncher_shutdown_request = false;
  
***************
*** 59,64 ****
--- 60,66 ----
   * GUC parameters
   */
  bool		autovacuum_start_daemon = false;
+ int			autovacuum_max_workers;
  int			autovacuum_naptime;
  int			autovacuum_vac_thresh;
  double		autovacuum_vac_scale;
***************
*** 69,75 ****
  int			autovacuum_vac_cost_delay;
  int			autovacuum_vac_cost_limit;
  
! /* Flag to tell if we are in the autovacuum daemon process */
  static bool am_autovacuum_launcher = false;
  static bool am_autovacuum_worker = false;
  
--- 71,77 ----
  int			autovacuum_vac_cost_delay;
  int			autovacuum_vac_cost_limit;
  
! /* Flags to tell if we are in an autovacuum process */
  static bool am_autovacuum_launcher = false;
  static bool am_autovacuum_worker = false;
  
***************
*** 86,91 ****
--- 88,94 ----
  typedef struct autovac_dbase
  {
  	Oid			ad_datid;
+ 	TimestampTz	ad_next_worker;
  	char	   *ad_name;
  	TransactionId ad_frozenxid;
  	PgStat_StatDBEntry *ad_entry;
***************
*** 110,123 ****
  	int			at_vacuum_cost_limit;
  } autovac_table;
  
  typedef struct
  {
! 	Oid		process_db;			/* OID of database to process */
! 	int		worker_pid;			/* PID of the worker process, if any */
  } AutoVacuumShmemStruct;
  
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
--- 113,158 ----
  	int			at_vacuum_cost_limit;
  } autovac_table;
  
+ /*-------------
+  * This struct holds information about a single worker's whereabouts.  We keep
+  * an array of these in shared memory, sized according to
+  * autovacuum_max_workers.
+  *
+  * wi_dboid		OID of the database this worker is supposed to work on
+  * wi_tableoid	OID of the table currently being vacuumed
+  * wi_workerpid	PID of the running worker, 0 if not yet started
+  * wi_finished	True when the worker is done and about to exit
+  *-------------
+  */
+ typedef struct
+ {
+ 	Oid			wi_dboid;
+ 	Oid			wi_tableoid;
+ 	int			wi_workerpid;
+ 	bool		wi_finished;
+ } WorkerInfo;
+ 
  typedef struct
  {
! 	pid_t		av_launcherpid;
! 	WorkerInfo	av_workers[1];
! 	/* VARIABLE LENGTH STRUCT */
  } AutoVacuumShmemStruct;
  
+ /* Macro to iterate over all workers.  Beware multiple evaluation of args! */
+ #define foreach_worker(_i, _worker) \
+ 	_worker = (WorkerInfo *) (AutoVacuumShmem + \
+ 							  offsetof(AutoVacuumShmemStruct, av_workers)); \
+ 	for (_i = 0; _i < autovacuum_max_workers; _i++, _worker += sizeof(WorkerInfo))
+ 
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
+ /* number of currently free worker slots; only valid in the launcher */
+ static int free_workers;
+ /* the database list in the launcher, and the context that contains it */
+ static Dllist *DatabaseList = NULL;
+ static MemoryContext DatabaseListCxt = NULL;
+ 
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
***************
*** 125,133 ****
  NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static void do_start_worker(void);
! static void do_autovacuum(Oid dbid);
! static List *autovac_get_database_list(void);
  
  static void relation_check_autovac(Oid relid, Form_pg_class classForm,
  					   Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
--- 160,174 ----
  NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static Oid do_start_worker(void);
! static int launcher_determine_sleep(void);
! static void launch_worker(TimestampTz now);
! static List *get_database_list(void);
! static void rebuild_database_list(Oid newdb);
! static int avdb_comparator(const void *a, const void *b);
! 
! static void do_autovacuum(WorkerInfo *worker, Oid dbid);
! static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
  
  static void relation_check_autovac(Oid relid, Form_pg_class classForm,
  					   Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
***************
*** 138,144 ****
  											PgStat_StatDBEntry *dbentry);
  static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
  						  bool doanalyze, int freeze_min_age);
- static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
  static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
  						  PgStat_StatDBEntry *shared,
  						  PgStat_StatDBEntry *dbentry);
--- 179,184 ----
***************
*** 149,154 ****
--- 189,195 ----
  static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
  static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
  static void avl_sighup_handler(SIGNAL_ARGS);
+ static void avl_sigusr1_handler(SIGNAL_ARGS);
  static void avlauncher_shutdown(SIGNAL_ARGS);
  static void avl_quickdie(SIGNAL_ARGS);
  
***************
*** 232,237 ****
--- 273,312 ----
  
  /*
   * Main loop for the autovacuum launcher process.
+  *
+  * The signalling between launcher and worker is as follows:
+  *
+  * When the worker has finished starting up, it stores its PID in wi_workerpid
+  * and sends a SIGUSR1 signal to the launcher.  The launcher then knows that
+  * the postmaster is ready to start a new worker.  We do it this way because
+  * otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't
+  * yet processed the last one, in which case the second signal would be lost.
+  * This is only useful when two workers need to be started close to one
+  * another, which should be rare but it's possible.
+  *
+  * Additionally, when the worker is finished with the vacuum work, it sets the
+  * wi_finished flag and sends a SIGUSR1 signal to the launcher.  Upon receipt
+  * of this signal, the launcher then clears the entry for future use and may
+  * start another worker right away, if need be.
+  *
+  * Note that there is still a race condition here, because a worker may finish
+  * just after the launcher sent the signal to postmaster, but before postmaster
+  * processes it.  At this point, the launcher receives a signal, sees the empty
+  * slot, so it sends the postmaster the signal again to start another worker.
+  * But the postmaster flag was already set, so the signal is lost.  To avoid
+  * this problem, the launcher will not try to start a new worker until all
+  * WorkerInfo entries that have the wi_dboid field set have a PID assigned.
+  * (FIXME -- this part is not yet implemented.)
+  *
+  * There is an additional problem if, for some reason, a worker starts and
+  * is not able to finish its task correctly.  It will not be able to set its
+  * finished flag, so the launcher will believe that it's still starting up.
+  * To prevent this problem, we check the PGPROCs of worker processes, and
+  * clean them up if we find they are not actually running (or they correspond
+  * to processes that are not autovacuum workers.)  We only do it if all 
+  * WorkerInfo structures are in use, thus frequently enough so that this
+  * problem doesn't cause any starvation, but seldom enough so that it's not a
+  * performance hit.  (FIXME -- this part is not yet implemented.)
   */
  NON_EXEC_STATIC void
  AutoVacLauncherMain(int argc, char *argv[])
***************
*** 266,274 ****
  	 * Set up signal handlers.	Since this is an auxiliary process, it has
  	 * particular signal requirements -- no deadlock checker or sinval
  	 * catchup, for example.
- 	 *
- 	 * XXX It may be a good idea to receive signals when an avworker process
- 	 * finishes.
  	 */
  	pqsignal(SIGHUP, avl_sighup_handler);
  
--- 341,346 ----
***************
*** 278,284 ****
  	pqsignal(SIGALRM, SIG_IGN);
  
  	pqsignal(SIGPIPE, SIG_IGN);
! 	pqsignal(SIGUSR1, SIG_IGN);
  	/* We don't listen for async notifies */
  	pqsignal(SIGUSR2, SIG_IGN);
  	pqsignal(SIGFPE, FloatExceptionHandler);
--- 350,356 ----
  	pqsignal(SIGALRM, SIG_IGN);
  
  	pqsignal(SIGPIPE, SIG_IGN);
! 	pqsignal(SIGUSR1, avl_sigusr1_handler);
  	/* We don't listen for async notifies */
  	pqsignal(SIGUSR2, SIG_IGN);
  	pqsignal(SIGFPE, FloatExceptionHandler);
***************
*** 360,380 ****
  	/* We can now handle ereport(ERROR) */
  	PG_exception_stack = &local_sigjmp_buf;
  
  	ereport(LOG,
  			(errmsg("autovacuum launcher started")));
  
! 	PG_SETMASK(&UnBlockSig);
  
  	/*
! 	 * take a nap before executing the first iteration, unless we were
! 	 * requested an emergency run.
  	 */
! 	if (autovacuum_start_daemon)
! 		pg_usleep(autovacuum_naptime * 1000000L); 
  
  	for (;;)
  	{
! 		int		worker_pid;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
--- 432,464 ----
  	/* We can now handle ereport(ERROR) */
  	PG_exception_stack = &local_sigjmp_buf;
  
+ 	/* in emergency mode, just start a worker and go away */
+ 	if (!autovacuum_start_daemon)
+ 	{
+ 		do_start_worker();
+ 		proc_exit(0);		/* done */
+ 	}
+ 
  	ereport(LOG,
  			(errmsg("autovacuum launcher started")));
  
! 	AutoVacuumShmem->av_launcherpid = MyProcPid;
  
  	/*
! 	 * Create the initial database list.  The invariant we want this list to
! 	 * keep is that it's ordered by decreasing next_time; initially all times
! 	 * are zero, so order doesn't matter.  As soon as an entry is updated to
! 	 * a higher time, it will be moved to the front (which is correct because
! 	 * the only operation is to add autovacuum_naptime to the entry, and time
! 	 * always increases).
  	 */
! 	rebuild_database_list(InvalidOid);
! 	free_workers = autovacuum_max_workers;
  
+ 	PG_SETMASK(&UnBlockSig);
  	for (;;)
  	{
! 		int		millis;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
***************
*** 383,388 ****
--- 467,478 ----
  		if (!PostmasterIsAlive(true))
  			exit(1);
  
+ 		millis = launcher_determine_sleep();
+ 
+ 		/* Sleep for a while according to schedule */
+ 		pg_usleep(millis * 1000);
+ 
+ 		/* the normal shutdown case */
  		if (avlauncher_shutdown_request)
  			break;
  
***************
*** 392,471 ****
  			ProcessConfigFile(PGC_SIGHUP);
  		}
  
  		/*
! 		 * if there's a worker already running, sleep until it
! 		 * disappears.
  		 */
! 		LWLockAcquire(AutovacuumLock, LW_SHARED);
! 		worker_pid = AutoVacuumShmem->worker_pid;
! 		LWLockRelease(AutovacuumLock);
  
! 		if (worker_pid != 0)
  		{
! 			PGPROC *proc = BackendPidGetProc(worker_pid);
  
! 			if (proc != NULL && proc->isAutovacuum)
! 				goto sleep;
  			else
  			{
  				/*
! 				 * if the worker is not really running (or it's a process
! 				 * that's not an autovacuum worker), remove the PID from shmem.
! 				 * This should not happen, because either the worker exits
! 				 * cleanly, in which case it'll remove the PID, or it dies, in
! 				 * which case postmaster will cause a system reset cycle.
  				 */
! 				LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 				worker_pid = 0;
! 				LWLockRelease(AutovacuumLock);
  			}
  		}
  
- 		do_start_worker();
- 
- sleep:
- 		/*
- 		 * in emergency mode, exit immediately so that the postmaster can
- 		 * request another run right away if needed.
- 		 *
- 		 * XXX -- maybe it would be better to handle this inside the launcher
- 		 * itself.
- 		 */
- 		if (!autovacuum_start_daemon)
- 			break;
- 
  		/* have pgstat read the file again next time */
  		pgstat_clear_snapshot();
- 
- 		/* now sleep until the next autovac iteration */
- 		pg_usleep(autovacuum_naptime * 1000000L); 
  	}
  
  	/* Normal exit from the autovac launcher is here */
  	ereport(LOG,
  			(errmsg("autovacuum launcher shutting down")));
  
  	proc_exit(0);		/* done */
  }
  
  /*
   * do_start_worker
   *
   * Bare-bones procedure for starting an autovacuum worker from the launcher.
   * It determines what database to work on, sets up shared memory stuff and
!  * signals postmaster to start the worker.
   */
! static void
  do_start_worker(void)
  {
  	List	   *dblist;
! 	bool		for_xid_wrap;
! 	autovac_dbase *db;
! 	ListCell *cell;
  	TransactionId xidForceLimit;
  
  	/* Get a list of databases */
! 	dblist = autovac_get_database_list();
  
  	/*
  	 * Determine the oldest datfrozenxid/relfrozenxid that we will allow
--- 482,829 ----
  			ProcessConfigFile(PGC_SIGHUP);
  		}
  
+ 		/* a worker started up or finished */
+ 		if (got_SIGUSR1)
+ 		{
+ 			WorkerInfo *worker;
+ 			int			i;
+ 
+ 			got_SIGUSR1 = false;
+ 
+ 			/* Walk the workers and clean up finished entries. */
+ 			LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);	
+ 			foreach_worker(i, worker)
+ 			{
+ 				if (worker->wi_finished)
+ 				{
+ 					worker->wi_tableoid = InvalidOid;
+ 					worker->wi_dboid = InvalidOid;
+ 					worker->wi_workerpid = 0;
+ 					worker->wi_finished = false;
+ 					free_workers++;
+ 				}
+ 			}
+ 			LWLockRelease(AutovacuumLock);
+ 		}
+ 
+ #if 0
  		/*
! 		 * Find and remove all entries corresponding to workers that failed to
! 		 * start.  Problem: how do we detect that it failed to start, yet leave
! 		 * alone those that are still really starting up?
! 		 *
! 		 * Idea: if we find that a database is listed twice, and none of the
! 		 * workers has registered, then something is wrong.  This fails if
! 		 * all workers failed in different databases however.
! 		 *
! 		 * Another idea: wreak havoc if a worker was started longer than
! 		 * autovac_naptime seconds ago and still hasn't registered.
  		 */
! 		if (free_workers == 0)
! 		{
! 			LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 			foreach_worker(i, worker)
! 			{
! 				if (worker->wi_workerpid == 0 && ...)
! 					clear the worker entry
! 			}
! 			LWLockRelease(AutovacuumLock);
! 		}
! #endif
  
! 		/*
! 		 * See if there's need to start a new worker, and do so if possible.
! 		 * If there are no free worker slots, avoid doing all this work, as
! 		 * we will not be able to start the worker anyway.
! 		 */
! 		if (free_workers > 0)
  		{
! 			TimestampTz	current_time;
! 			Dlelem	   *elem;
! 
! 			elem = DLGetTail(DatabaseList);
  
! 			current_time = GetCurrentTimestamp();
! 
! 			if (elem != NULL)
! 			{
! 				autovac_dbase *avdb = DLE_VAL(elem);
! 				long	secs;
! 				int		usecs;
! 
! 				TimestampDifference(current_time, avdb->ad_next_worker, &secs, &usecs);
! 
! 				/* we have to start a worker now */
! 				if (secs <= 0 && usecs <= 0)
! 					launch_worker(current_time);
! 			}
  			else
  			{
  				/*
! 				 * Special case when the list is empty: start a worker right
! 				 * away.  This covers the initial case, when no database is in
! 				 * pgstats (thus the list is empty).
  				 */
! 				launch_worker(current_time);
  			}
  		}
  
  		/* have pgstat read the file again next time */
  		pgstat_clear_snapshot();
  	}
  
  	/* Normal exit from the autovac launcher is here */
  	ereport(LOG,
  			(errmsg("autovacuum launcher shutting down")));
+ 	AutoVacuumShmem->av_launcherpid = 0;
  
  	proc_exit(0);		/* done */
  }
  
+ 
+ /*
+  * Determine the time to sleep based on the database list.
+  */
+ static int
+ launcher_determine_sleep(void)
+ {
+ 	long	secs;
+ 	int		usecs;
+ 	Dlelem *elem;
+ 
+ 	/*
+ 	 * We sleep until the next scheduled vacuum.  We trust that when the
+ 	 * database list was built, care was taken so that no entries have
+ 	 * negative times; if the first entry has a too close next_worker value,
+ 	 * we will sleep a small nominal time.
+ 	 */
+ 	elem = DLGetTail(DatabaseList);
+ 	if (elem != NULL)
+ 	{
+ 		autovac_dbase  *avdb = DLE_VAL(elem);
+ 		TimestampTz		current_time = GetCurrentTimestamp();
+ 		TimestampTz		next_wakeup;
+ 
+ 		next_wakeup = avdb->ad_next_worker;
+ 		TimestampDifference(current_time, next_wakeup, &secs, &usecs);
+ 	}
+ 	else
+ 	{
+ 		/* list is empty, sleep for whole autovacuum_naptime seconds  */
+ 		secs = autovacuum_naptime;
+ 		usecs = 0;
+ 	}
+ 	/*
+ 	 * someone screwed up (invalid entry on the list); sleep a nominal amount
+ 	 */
+ 	if (secs <= 0L && usecs <= 0)
+ 	{
+ 		secs = 0;
+ 		usecs = 500000;	/* 500 ms */
+ 	}
+ 
+ 	return secs * 1000 + usecs / 1000;
+ }
+ 
+ /*
+  * Build an updated DatabaseList.  It must only contain databases that appear
+  * in pgstats, and must be sorted by next_worker from highest to lowest, distributed
+  * regularly across the next autovacuum_naptime interval.
+  *
+  * Receives the Oid of the database that made this list be generated (we call
+  * this the "new" database, because when the database was already present on
+  * the list, we expect that this function is not called at all).  The
+  * preexisting list, if any, will be used to preserve the order of the
+  * databases in the autovacuum_naptime period.  The new database is put at the
+  * end of the interval.  The actual values are not saved.
+  */
+ static void
+ rebuild_database_list(Oid newdb)
+ {
+ 	Dllist	   *newdllist;
+ 	List	   *dblist;
+ 	ListCell   *cell;
+ 	autovac_dbase *dbary;
+ 	int			i;
+ 	TimestampTz	current_time;
+ 	TimestampTz	initial_time;
+ 	int			millis_increment = 0;
+ 	int			total_dbs;
+ 	int			unset_dbs = 0;
+ 	MemoryContext newcxt;
+ 	MemoryContext oldcxt;
+ 
+ 	newcxt = AllocSetContextCreate(TopMemoryContext,
+ 								   "dblist context",
+ 								   ALLOCSET_DEFAULT_MINSIZE,
+ 								   ALLOCSET_DEFAULT_INITSIZE,
+ 								   ALLOCSET_DEFAULT_MAXSIZE);
+ 	oldcxt = MemoryContextSwitchTo(newcxt);
+ 
+ 	/*
+ 	 * To build a sorted dllist, we first store the elements in a fixed-size
+ 	 * array, which we sort, and finally we store the individual elements in
+ 	 * the doubly linked list.
+ 	 */
+ 	dblist = get_database_list();
+ 	current_time = GetCurrentTimestamp();
+ 
+ 	/*
+ 	 * The new database array.  We must not free it: these will become the
+ 	 * elements of the Dllist.
+ 	 */
+ 	dbary = palloc(sizeof(autovac_dbase) * list_length(dblist));
+ 
+ 	i = 0;
+ 	foreach(cell, dblist)
+ 	{
+ 		autovac_dbase  *avdb = lfirst(cell);
+ 		PgStat_StatDBEntry *dbentry;
+ 		Dlelem		   *elm;
+ 		
+ 		Assert(avdb->ad_next_worker == 0);
+ 
+ 		dbentry = pgstat_fetch_stat_dbentry(avdb->ad_datid);
+ 		/* skip DBs without pgstat entry */
+ 		if (dbentry == NULL)
+ 			continue;
+ 
+ 		/* We set the new database to "now + autovacuum_naptime" */
+ 		if (avdb->ad_datid == newdb)
+ 		{
+ 			avdb->ad_next_worker =
+ 				TimestampTzPlusMilliseconds(current_time,
+ 											autovacuum_naptime * 1000);
+ 		}
+ 		else
+ 		{
+ 			/*
+ 			 * Otherwise, if the database has an entry on the old list, copy
+ 			 * the next_worker field into the new list.
+ 			 */
+ 			elm = DatabaseList ? DLGetHead(DatabaseList) : NULL;
+ 			while (elm != NULL)
+ 			{
+ 				autovac_dbase	*tmp = DLE_VAL(elm);
+ 
+ 				if (tmp->ad_datid == avdb->ad_datid)
+ 				{
+ 					avdb->ad_next_worker = tmp->ad_next_worker;
+ 					break;
+ 				}
+ 				elm = DLGetSucc(elm);
+ 			}
+ 		}
+ 
+ 		/* other databases will have the time set later */
+ 		if (avdb->ad_next_worker == 0)
+ 			unset_dbs++;
+ 
+ 		/* copy the entry into the array */
+ 		memcpy(&(dbary[i++]), avdb, sizeof(autovac_dbase));
+ 	}
+ 
+ 	total_dbs = i;
+ 	if (unset_dbs > 0)
+ 		millis_increment = autovacuum_naptime * 1000 / unset_dbs;
+ 	initial_time = TimestampTzPlusMilliseconds(current_time, millis_increment);
+ 
+ 	/* now set the time to the unknown databases */
+ 	for (i = 0; i < total_dbs; i++)
+ 	{
+ 		autovac_dbase	*avdb = &dbary[i];
+ 
+ 		if (avdb->ad_next_worker == 0)
+ 		{
+ 			avdb->ad_next_worker = initial_time;
+ 			initial_time = TimestampTzPlusMilliseconds(initial_time,
+ 													   millis_increment);
+ 		}
+ 	}
+ 
+ 	/* sort the array */
+ 	qsort(dbary, i, sizeof(autovac_dbase), avdb_comparator);
+ 
+ 	/* enter each array element into the new dl list */
+ 	newdllist = DLNewList();
+ 	for (i = 0; i < total_dbs; i++)
+ 	{
+ 		Dlelem		   *elem;
+ 
+ 		/* insert the the i-th array element as a dlelem into the new list */
+ 		elem = DLNewElem(&dbary[i]);
+ 		DLAddHead(newdllist, elem);
+ 	}
+ 
+ 	/* Free the old list, and set the new one up */
+ 	if (DatabaseListCxt != NULL)
+ 		MemoryContextDelete(DatabaseListCxt);
+ 	DatabaseListCxt = newcxt;
+ 	DatabaseList = newdllist;
+ 
+ 	MemoryContextSwitchTo(oldcxt);
+ }
+ 
+ /* qsort comparator for autovac_dbase, using next_worker */
+ static int
+ avdb_comparator(const void *a, const void *b)
+ {
+ 	if (((autovac_dbase *) a)->ad_next_worker == ((autovac_dbase *) b)->ad_next_worker)
+ 		return 0;
+ 	else
+ 		return (((autovac_dbase *) a)->ad_next_worker > ((autovac_dbase *) b)->ad_next_worker) ? 1 : -1;
+ }
+ 
+ 
+ 
+ 
  /*
   * do_start_worker
   *
   * Bare-bones procedure for starting an autovacuum worker from the launcher.
   * It determines what database to work on, sets up shared memory stuff and
!  * signals postmaster to start the worker.  It fails gracefully if invoked when
!  * autovacuum_workers are already active.
!  *
!  * Return value is the OID of the database that the worker is going to process,
!  * or InvalidOid if no worker was actually started.
   */
! static Oid
  do_start_worker(void)
  {
  	List	   *dblist;
! 	WorkerInfo *worker;
! 	int			i;
! 	ListCell   *cell;
  	TransactionId xidForceLimit;
+ 	bool		for_xid_wrap;
+ 	autovac_dbase *avdb;
+ 	TimestampTz		current_time;
+ 
+ 	/*
+ 	 * Find an unused WorkerInfo entry to set up.  If there is none, go to
+ 	 * sleep.
+ 	 *
+ 	 * NB: we only read the array here, and save a pointer where we'll
+ 	 * write the entry later.  Since this is the only process that creates
+ 	 * new entries into the array, there's no risk that somebody else will
+ 	 * use that pointer while we weren't looking.
+ 	 */
+ 	LWLockAcquire(AutovacuumLock, LW_SHARED);
+ 	foreach_worker(i, worker)
+ 	{
+ 		/* Invalid database OID means unused worker entry; use it */
+ 		if (!OidIsValid(worker->wi_dboid))
+ 			break;
+ 	}
+ 	LWLockRelease(AutovacuumLock);
+ 
+ 	/* they're all used up */
+ 	if (i >= autovacuum_max_workers)
+ 		return InvalidOid;
  
  	/* Get a list of databases */
! 	dblist = get_database_list();
  
  	/*
  	 * Determine the oldest datfrozenxid/relfrozenxid that we will allow
***************
*** 497,507 ****
  	 * isn't clear how to construct a metric that measures that and not cause
  	 * starvation for less busy databases.
  	 */
! 	db = NULL;
  	for_xid_wrap = false;
  	foreach(cell, dblist)
  	{
  		autovac_dbase *tmp = lfirst(cell);
  
  		/* Find pgstat entry if any */
  		tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid);
--- 855,868 ----
  	 * isn't clear how to construct a metric that measures that and not cause
  	 * starvation for less busy databases.
  	 */
! 	avdb = NULL;
  	for_xid_wrap = false;
+ 	current_time = GetCurrentTimestamp();
  	foreach(cell, dblist)
  	{
  		autovac_dbase *tmp = lfirst(cell);
+ 		bool		skipit;
+ 		Dlelem	   *elem;
  
  		/* Find pgstat entry if any */
  		tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid);
***************
*** 509,517 ****
  		/* Check to see if this one is at risk of wraparound */
  		if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit))
  		{
! 			if (db == NULL ||
! 				TransactionIdPrecedes(tmp->ad_frozenxid, db->ad_frozenxid))
! 				db = tmp;
  			for_xid_wrap = true;
  			continue;
  		}
--- 870,878 ----
  		/* Check to see if this one is at risk of wraparound */
  		if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit))
  		{
! 			if (avdb == NULL ||
! 				TransactionIdPrecedes(tmp->ad_frozenxid, avdb->ad_frozenxid))
! 				avdb = tmp;
  			for_xid_wrap = true;
  			continue;
  		}
***************
*** 526,548 ****
  			continue;
  
  		/*
  		 * Remember the db with oldest autovac time.  (If we are here,
  		 * both tmp->entry and db->entry must be non-null.)
  		 */
! 		if (db == NULL ||
! 			tmp->ad_entry->last_autovac_time < db->ad_entry->last_autovac_time)
! 			db = tmp;
  	}
  
  	/* Found a database -- process it */
! 	if (db != NULL)
  	{
  		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 		AutoVacuumShmem->process_db = db->ad_datid;
  		LWLockRelease(AutovacuumLock);
  
  		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
  	}
  }
  
  /* SIGHUP: set flag to re-read config file at next convenient time */
--- 887,1012 ----
  			continue;
  
  		/*
+ 		 * Also, skip a database that appears on the passed database list as
+ 		 * having been processed recently (less than autovacuum_naptime seconds
+ 		 * ago).  We do this so that we don't select a database which we just
+ 		 * selected, but that pgstat hasn't gotten around to updating the
+ 		 * last autovacuum time yet.
+ 		 */
+ 		skipit = false;
+ 		elem = DatabaseList ? DLGetTail(DatabaseList) : NULL;
+ 		while (elem != NULL)
+ 		{
+ 			autovac_dbase *dbp = DLE_VAL(elem);
+ 
+ 			if (dbp->ad_datid == tmp->ad_datid)
+ 			{
+ 				TimestampTz		curr_plus_naptime;
+ 				TimestampTz		next = dbp->ad_next_worker;
+ 				
+ 				curr_plus_naptime =
+ 					TimestampTzPlusMilliseconds(current_time,
+ 												autovacuum_naptime * 1000);
+ 
+ 				/*
+ 				 * What we want here if to skip if next_worker falls between
+ 				 * the current time and the current time plus naptime.
+ 				 */
+ 				if (timestamp_cmp_internal(current_time, next) > 0)
+ 					skipit = false;
+ 				else if (timestamp_cmp_internal(next, curr_plus_naptime) > 0)
+ 					skipit = false;
+ 				else
+ 					skipit = true;
+ 
+ 				break;
+ 			}
+ 			elem = DLGetPred(elem);
+ 		}
+ 		if (skipit)
+ 			continue;
+ 
+ 		/*
  		 * Remember the db with oldest autovac time.  (If we are here,
  		 * both tmp->entry and db->entry must be non-null.)
  		 */
! 		if (avdb == NULL ||
! 			tmp->ad_entry->last_autovac_time < avdb->ad_entry->last_autovac_time)
! 			avdb = tmp;
  	}
  
  	/* Found a database -- process it */
! 	if (avdb != NULL)
  	{
  		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 		Assert(!OidIsValid(worker->wi_dboid));
! 		worker->wi_dboid = avdb->ad_datid;
! 		worker->wi_workerpid = 0;
  		LWLockRelease(AutovacuumLock);
  
  		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
+ 
+ 		return avdb->ad_datid;
+ 	}
+ 
+ 	return InvalidOid;
+ }
+ 
+ /*
+  * launch_worker
+  *
+  * Wrapper for starting a worker from the launcher.  Besides actually starting
+  * it, update the database list to reflect the next time that another one will
+  * need to be started on the selected database.  The actual database choice is
+  * left to do_start_worker.
+  *
+  * This routine is also expected to insert an entry into the database list if
+  * the selected database was previously absent from the list.  It returns the
+  * new database list.
+  */
+ static void
+ launch_worker(TimestampTz now)
+ {
+ 	Oid		dbid;
+ 	Dlelem *elem;
+ 
+ 	dbid = do_start_worker();
+ 	if (OidIsValid(dbid))
+ 		free_workers--;
+ 
+ 	/*
+ 	 * Walk the database list and update corresponding entry.  If it's not on
+ 	 * the list, we'll recreate the list.
+ 	 */
+ 	elem = DLGetHead(DatabaseList);
+ 	while (elem != NULL)
+ 	{
+ 		autovac_dbase *avdb = DLE_VAL(elem);
+ 
+ 		if (avdb->ad_datid == dbid)
+ 		{
+ 			/*
+ 			 * add autovacuum_naptime seconds to the current time, and use that
+ 			 * as the new "next_worker" field for this database.
+ 			 */
+ 			avdb->ad_next_worker =
+ 				TimestampTzPlusMilliseconds(now, autovacuum_naptime * 1000);
+ 
+ 			DLMoveToFront(elem);
+ 			break;
+ 		}
+ 		elem = DLGetSucc(elem);
  	}
+ 
+ 	/*
+ 	 * If the database was not present in the database list, we rebuild the
+ 	 * list.  It's possible that the database does not get into the list
+ 	 * anyway, for example if it's a database that doesn't have a pgstat entry,
+ 	 * but this is not a problem because we don't want to schedule workers
+ 	 * regularly into those in any case.
+ 	 */
+ 	if (elem == NULL)
+ 		rebuild_database_list(dbid);
  }
  
  /* SIGHUP: set flag to re-read config file at next convenient time */
***************
*** 552,557 ****
--- 1016,1028 ----
  	got_SIGHUP = true;
  }
  
+ /* SIGUSR1: a worker is up and running, or just finished */
+ static void
+ avl_sigusr1_handler(SIGNAL_ARGS)
+ {
+ 	got_SIGUSR1 = true;
+ }
+ 
  static void
  avlauncher_shutdown(SIGNAL_ARGS)
  {
***************
*** 667,673 ****
  AutoVacWorkerMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
! 	Oid			dbid;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
--- 1138,1146 ----
  AutoVacWorkerMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
! 	Oid			dbid = InvalidOid;
! 	WorkerInfo *worker;
! 	int			i;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
***************
*** 765,780 ****
  	SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
  
  	/*
! 	 * Get the database Id we're going to work on, and announce our PID
! 	 * in the shared memory area.  We remove the database OID immediately
! 	 * from the shared memory area.
  	 */
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 
! 	dbid = AutoVacuumShmem->process_db;
! 	AutoVacuumShmem->process_db = InvalidOid;
! 	AutoVacuumShmem->worker_pid = MyProcPid;
! 
  	LWLockRelease(AutovacuumLock);
  
  	if (OidIsValid(dbid))
--- 1238,1259 ----
  	SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
  
  	/*
! 	 * Walk the WorkerInfo array, and get the database OID we're going to work
! 	 * on.  Use the first entry with PID 0 in the list, and advertise our PID
! 	 * on it, thus marking it used.
  	 */
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 	foreach_worker(i, worker)
! 	{
! 		if (worker->wi_workerpid == 0)
! 		{
! 			dbid = worker->wi_dboid;
! 			worker->wi_workerpid = MyProcPid;
! 			break;
! 		}
! 	}
! 	if (AutoVacuumShmem->av_launcherpid != 0)
! 		kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
  	LWLockRelease(AutovacuumLock);
  
  	if (OidIsValid(dbid))
***************
*** 812,826 ****
  
  		/* And do an appropriate amount of work */
  		recentXid = ReadNewTransactionId();
! 		do_autovacuum(dbid);
  	}
  
- 	/*
- 	 * Now remove our PID from shared memory, so that the launcher can start
- 	 * another worker as soon as appropriate.
- 	 */
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 	AutoVacuumShmem->worker_pid = 0;
  	LWLockRelease(AutovacuumLock);
  
  	/* All done, go away */
--- 1291,1318 ----
  
  		/* And do an appropriate amount of work */
  		recentXid = ReadNewTransactionId();
! 		do_autovacuum(worker, dbid);
  	}
  
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 	if (!autovacuum_start_daemon)
! 	{
! 		/* in emergency mode we must cleanup after ourselves */
! 		worker->wi_workerpid = 0;
! 		worker->wi_dboid = InvalidOid;
! 		worker->wi_finished = false;
! 	}
! 	else
! 	{
! 		/*
! 		 * Otherwise, let the launcher know we're done.  Warning: must set the
! 		 * flag before sending the signal.  XXX do we need to prevent compiler
! 		 * overenthusiastic optimization here?
! 		 */
! 		worker->wi_finished = true;
! 		if (AutoVacuumShmem->av_launcherpid != 0)
! 			kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
! 	}
  	LWLockRelease(AutovacuumLock);
  
  	/* All done, go away */
***************
*** 828,840 ****
  }
  
  /*
!  * autovac_get_database_list
   *
   *		Return a list of all databases.  Note we cannot use pg_database,
   *		because we aren't connected; we use the flat database file.
   */
  static List *
! autovac_get_database_list(void)
  {
  	char	   *filename;
  	List	   *dblist = NIL;
--- 1320,1332 ----
  }
  
  /*
!  * get_database_list
   *
   *		Return a list of all databases.  Note we cannot use pg_database,
   *		because we aren't connected; we use the flat database file.
   */
  static List *
! get_database_list(void)
  {
  	char	   *filename;
  	List	   *dblist = NIL;
***************
*** 862,867 ****
--- 1354,1360 ----
  		avdb->ad_name = pstrdup(thisname);
  		avdb->ad_frozenxid = db_frozenxid;
  		/* this gets set later: */
+ 		avdb->ad_next_worker = 0;
  		avdb->ad_entry = NULL;
  
  		dblist = lappend(dblist, avdb);
***************
*** 880,886 ****
   * order not to ignore shutdown commands for too long.
   */
  static void
! do_autovacuum(Oid dbid)
  {
  	Relation	classRel,
  				avRel;
--- 1373,1379 ----
   * order not to ignore shutdown commands for too long.
   */
  static void
! do_autovacuum(WorkerInfo *worker, Oid dbid)
  {
  	Relation	classRel,
  				avRel;
***************
*** 1027,1035 ****
--- 1520,1550 ----
  	{
  		Oid		relid = lfirst_oid(cell);
  		autovac_table *tab;
+ 		WorkerInfo *other_worker;
+ 		int			i;
+ 		bool		skipit;
  
  		CHECK_FOR_INTERRUPTS();
  
+ 		LWLockAcquire(AutovacuumScheduleLock, LW_EXCLUSIVE);
+ 
+ 		/*
+ 		 * Check whether the table is being vacuumed concurrently by another
+ 		 * worker.
+ 		 */
+ 		skipit = false;
+ 		foreach_worker(i, other_worker)
+ 		{
+ 			if (other_worker->wi_tableoid == relid)
+ 			{
+ 				LWLockRelease(AutovacuumScheduleLock);
+ 				skipit = true;
+ 				break;
+ 			}
+ 		}
+ 		if (skipit)
+ 			continue;
+ 
  		/*
  		 * Check whether pgstat data still says we need to vacuum this table.
  		 * It could have changed if other worker processed the table while we
***************
*** 1046,1054 ****
  		if (tab == NULL)
  		{
  			/* someone else vacuumed the table */
  			continue;
  		}
! 		/* Ok, good to go! */
  
  		/* Set the vacuum cost parameters for this table */
  		VacuumCostDelay = tab->at_vacuum_cost_delay;
--- 1561,1576 ----
  		if (tab == NULL)
  		{
  			/* someone else vacuumed the table */
+ 			LWLockRelease(AutovacuumScheduleLock);
  			continue;
  		}
! 
! 		/*
! 		 * Ok, good to go.  Store the table in shared memory before releasing
! 		 * the lock so that other workers don't vacuum it concurrently.
! 		 */
! 		worker->wi_tableoid = relid;
! 		LWLockRelease(AutovacuumScheduleLock);
  
  		/* Set the vacuum cost parameters for this table */
  		VacuumCostDelay = tab->at_vacuum_cost_delay;
***************
*** 1615,1621 ****
  Size
  AutoVacuumShmemSize(void)
  {
! 	return sizeof(AutoVacuumShmemStruct);
  }
  
  /*
--- 2137,2144 ----
  Size
  AutoVacuumShmemSize(void)
  {
! 	return add_size(offsetof(AutoVacuumShmemStruct, av_workers),
!  					mul_size(autovacuum_max_workers, sizeof(WorkerInfo)));
  }
  
  /*
***************
*** 1638,1642 ****
  	if (found)
  		return;                 /* already initialized */
  
! 	MemSet(AutoVacuumShmem, 0, sizeof(AutoVacuumShmemStruct));
  }
--- 2161,2165 ----
  	if (found)
  		return;                 /* already initialized */
  
! 	MemSet(AutoVacuumShmem, 0, AutoVacuumShmemSize());
  }
---------------------------(end of broadcast)---------------------------
TIP 2: Don't 'kill -9' the postmaster

Reply via email to