Alvaro Herrera wrote:
> Hi,
> 
> Here is the autovacuum patch I am currently working with.

Obviously I forgot to attach the patch, sorry.

-- 
Alvaro Herrera                          Developer, http://www.PostgreSQL.org/
"Para tener más hay que desear menos"
Index: src/backend/postmaster/autovacuum.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/postmaster/autovacuum.c,v
retrieving revision 1.40
diff -c -p -r1.40 autovacuum.c
*** src/backend/postmaster/autovacuum.c	28 Mar 2007 22:17:12 -0000	1.40
--- src/backend/postmaster/autovacuum.c	4 Apr 2007 23:34:15 -0000
***************
*** 52,57 ****
--- 52,58 ----
  #include "utils/syscache.h"
  
  
+ static volatile sig_atomic_t got_SIGUSR1 = false;
  static volatile sig_atomic_t got_SIGHUP = false;
  static volatile sig_atomic_t avlauncher_shutdown_request = false;
  
*************** static volatile sig_atomic_t avlauncher_
*** 59,64 ****
--- 60,66 ----
   * GUC parameters
   */
  bool		autovacuum_start_daemon = false;
+ int			autovacuum_max_workers;
  int			autovacuum_naptime;
  int			autovacuum_vac_thresh;
  double		autovacuum_vac_scale;
*************** int			autovacuum_freeze_max_age;
*** 69,75 ****
  int			autovacuum_vac_cost_delay;
  int			autovacuum_vac_cost_limit;
  
! /* Flag to tell if we are in the autovacuum daemon process */
  static bool am_autovacuum_launcher = false;
  static bool am_autovacuum_worker = false;
  
--- 71,77 ----
  int			autovacuum_vac_cost_delay;
  int			autovacuum_vac_cost_limit;
  
! /* Flags to tell if we are in an autovacuum process */
  static bool am_autovacuum_launcher = false;
  static bool am_autovacuum_worker = false;
  
*************** static int	default_freeze_min_age;
*** 82,95 ****
  /* Memory context for long-lived data */
  static MemoryContext AutovacMemCxt;
  
! /* struct to keep list of candidate databases for vacuum */
! typedef struct autovac_dbase
  {
! 	Oid			ad_datid;
! 	char	   *ad_name;
! 	TransactionId ad_frozenxid;
! 	PgStat_StatDBEntry *ad_entry;
! } autovac_dbase;
  
  /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
  typedef struct av_relation
--- 84,105 ----
  /* Memory context for long-lived data */
  static MemoryContext AutovacMemCxt;
  
! /* struct to keep track of databases in launcher */
! typedef struct avl_dbase
  {
! 	Oid			adl_datid;			/* hash key -- must be first */
! 	TimestampTz	adl_next_worker;
! 	int			adl_score;
! } avl_dbase;
! 
! /* struct to keep track of databases in worker */
! typedef struct avw_dbase
! {
! 	Oid			adw_datid;
! 	char	   *adw_name;
! 	TransactionId adw_frozenxid;
! 	PgStat_StatDBEntry *adw_entry;
! } avw_dbase;
  
  /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
  typedef struct av_relation
*************** typedef struct autovac_table
*** 110,123 ****
  	int			at_vacuum_cost_limit;
  } autovac_table;
  
  typedef struct
  {
! 	Oid		process_db;			/* OID of database to process */
! 	int		worker_pid;			/* PID of the worker process, if any */
  } AutoVacuumShmemStruct;
  
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
--- 120,169 ----
  	int			at_vacuum_cost_limit;
  } autovac_table;
  
+ /*-------------
+  * This struct holds information about a single worker's whereabouts.  We keep
+  * an array of these in shared memory, sized according to
+  * autovacuum_max_workers.
+  *
+  * wi_dboid		OID of the database this worker is supposed to work on
+  * wi_tableoid	OID of the table currently being vacuumed
+  * wi_workerpid	PID of the running worker, 0 if not yet started
+  * wi_finished	True when the worker is done and about to exit
+  *
+  * The locking for this is a bit weird: all fields except wi_tableoid are
+  * protected by AutovacuumLock, and wi_tableoid is protected by
+  * AutovacuumScheduleLock.
+  *-------------
+  */
+ typedef struct
+ {
+ 	Oid			wi_dboid;
+ 	Oid			wi_tableoid;
+ 	int			wi_workerpid;
+ 	bool		wi_finished;
+ } WorkerInfo;
+ 
  typedef struct
  {
! 	pid_t		av_launcherpid;
! 	WorkerInfo	av_workers[1];
! 	/* VARIABLE LENGTH STRUCT */
  } AutoVacuumShmemStruct;
  
+ /* Macro to iterate over all workers.  Beware multiple evaluation of args! */
+ #define foreach_worker(_i, _worker) \
+ 	_worker = (WorkerInfo *) (AutoVacuumShmem + \
+ 							  offsetof(AutoVacuumShmemStruct, av_workers)); \
+ 	for (_i = 0; _i < autovacuum_max_workers; _i++, _worker += sizeof(WorkerInfo))
+ 
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
+ /* number of currently free worker slots; only valid in the launcher */
+ static int free_workers;
+ /* the database list in the launcher, and the context that contains it */
+ static Dllist *DatabaseList = NULL;
+ static MemoryContext DatabaseListCxt = NULL;
+ 
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
*************** static pid_t avworker_forkexec(void);
*** 125,133 ****
  NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static void do_start_worker(void);
! static void do_autovacuum(void);
! static List *autovac_get_database_list(void);
  
  static void relation_check_autovac(Oid relid, Form_pg_class classForm,
  					   Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
--- 171,185 ----
  NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static Oid do_start_worker(void);
! static int launcher_determine_sleep(bool canlaunch);
! static void launch_worker(TimestampTz now);
! static List *get_database_list(void);
! static void rebuild_database_list(Oid newdb);
! static int db_comparator(const void *a, const void *b);
! 
! static void do_autovacuum(WorkerInfo *worker);
! static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
  
  static void relation_check_autovac(Oid relid, Form_pg_class classForm,
  					   Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
*************** static void relation_needs_vacanalyze(Oi
*** 141,152 ****
  
  static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
  						  bool doanalyze, int freeze_min_age);
- static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
  static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
  						  PgStat_StatDBEntry *shared,
  						  PgStat_StatDBEntry *dbentry);
  static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
  static void avl_sighup_handler(SIGNAL_ARGS);
  static void avlauncher_shutdown(SIGNAL_ARGS);
  static void avl_quickdie(SIGNAL_ARGS);
  
--- 193,204 ----
  
  static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
  						  bool doanalyze, int freeze_min_age);
  static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
  						  PgStat_StatDBEntry *shared,
  						  PgStat_StatDBEntry *dbentry);
  static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
  static void avl_sighup_handler(SIGNAL_ARGS);
+ static void avl_sigusr1_handler(SIGNAL_ARGS);
  static void avlauncher_shutdown(SIGNAL_ARGS);
  static void avl_quickdie(SIGNAL_ARGS);
  
*************** StartAutoVacLauncher(void)
*** 230,241 ****
  
  /*
   * Main loop for the autovacuum launcher process.
   */
  NON_EXEC_STATIC void
  AutoVacLauncherMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
- 	MemoryContext	avlauncher_cxt;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
--- 282,329 ----
  
  /*
   * Main loop for the autovacuum launcher process.
+  *
+  * The signalling between launcher and worker is as follows:
+  *
+  * When the worker has finished starting up, it stores its PID in wi_workerpid
+  * and sends a SIGUSR1 signal to the launcher.  The launcher then knows that
+  * the postmaster is ready to start a new worker.  We do it this way because
+  * otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't
+  * yet processed the last one, in which case the second signal would be lost.
+  * This is only useful when two workers need to be started close to one
+  * another, which should be rare but it's possible.
+  *
+  * Additionally, when the worker is finished with the vacuum work, it sets the
+  * wi_finished flag and sends a SIGUSR1 signal to the launcher.  Upon receipt
+  * of this signal, the launcher then clears the entry for future use and may
+  * start another worker right away, if need be.
+  *
+  * There is at least one race condition here: if the workers are all busy, a
+  * database needs immediate attention and a worker finishes just after the
+  * launcher started a worker and sent the signal to postmaster, but before
+  * postmaster processes the signal; at this point, the launcher receives a
+  * signal from the finishing process, sees the empty slot, and sends the
+  * signal to postmaster again to start another worker.  But the postmaster
+  * SendPostmasterSignal() flag was already set, so the signal is lost.  To
+  * avoid this problem, the launcher should not try to start a new worker until
+  * all WorkerInfo entries that have the wi_dboid field set have a PID assigned.
+  * FIXME someday.  The problem is that if we have workers failing to start for
+  * some reason, holding the start of new workers will worsen the starvation by
+  * disabling the start of a new worker as soon as one worker fails to start.
+  * So it's important to be able to distinguish a worker that has failed
+  * starting from a worker that is just taking its little bit of time to do so.
+  *
+  * There is another potential problem if, for some reason, a worker starts and
+  * is not able to finish correctly.  It will not be able to set its finished
+  * flag, so the launcher will believe that it's still starting up.  To prevent
+  * this problem, we should check the PGPROCs of worker processes, and clean
+  * them up if we find they are not actually running (or they correspond to
+  * processes that are not autovacuum workers.)  FIXME someday.
   */
  NON_EXEC_STATIC void
  AutoVacLauncherMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
*************** AutoVacLauncherMain(int argc, char *argv
*** 264,272 ****
  	 * Set up signal handlers.	Since this is an auxiliary process, it has
  	 * particular signal requirements -- no deadlock checker or sinval
  	 * catchup, for example.
- 	 *
- 	 * XXX It may be a good idea to receive signals when an avworker process
- 	 * finishes.
  	 */
  	pqsignal(SIGHUP, avl_sighup_handler);
  
--- 352,357 ----
*************** AutoVacLauncherMain(int argc, char *argv
*** 276,282 ****
  	pqsignal(SIGALRM, SIG_IGN);
  
  	pqsignal(SIGPIPE, SIG_IGN);
! 	pqsignal(SIGUSR1, SIG_IGN);
  	/* We don't listen for async notifies */
  	pqsignal(SIGUSR2, SIG_IGN);
  	pqsignal(SIGFPE, FloatExceptionHandler);
--- 361,367 ----
  	pqsignal(SIGALRM, SIG_IGN);
  
  	pqsignal(SIGPIPE, SIG_IGN);
! 	pqsignal(SIGUSR1, avl_sigusr1_handler);
  	/* We don't listen for async notifies */
  	pqsignal(SIGUSR2, SIG_IGN);
  	pqsignal(SIGFPE, FloatExceptionHandler);
*************** AutoVacLauncherMain(int argc, char *argv
*** 300,311 ****
  	 * that we can reset the context during error recovery and thereby avoid
  	 * possible memory leaks.
  	 */
! 	avlauncher_cxt = AllocSetContextCreate(TopMemoryContext,
! 										   "Autovacuum Launcher",
! 										   ALLOCSET_DEFAULT_MINSIZE,
! 										   ALLOCSET_DEFAULT_INITSIZE,
! 										   ALLOCSET_DEFAULT_MAXSIZE);
! 	MemoryContextSwitchTo(avlauncher_cxt);
  
  
  	/*
--- 385,396 ----
  	 * that we can reset the context during error recovery and thereby avoid
  	 * possible memory leaks.
  	 */
! 	AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! 										  "Autovacuum Launcher",
! 										  ALLOCSET_DEFAULT_MINSIZE,
! 										  ALLOCSET_DEFAULT_INITSIZE,
! 										  ALLOCSET_DEFAULT_MAXSIZE);
! 	MemoryContextSwitchTo(AutovacMemCxt);
  
  
  	/*
*************** AutoVacLauncherMain(int argc, char *argv
*** 336,346 ****
  		 * Now return to normal top-level context and clear ErrorContext for
  		 * next time.
  		 */
! 		MemoryContextSwitchTo(avlauncher_cxt);
  		FlushErrorState();
  
  		/* Flush any leaked data in the top-level context */
! 		MemoryContextResetAndDeleteChildren(avlauncher_cxt);
  
  		/* Make sure pgstat also considers our stat data as gone */
  		pgstat_clear_snapshot();
--- 421,431 ----
  		 * Now return to normal top-level context and clear ErrorContext for
  		 * next time.
  		 */
! 		MemoryContextSwitchTo(AutovacMemCxt);
  		FlushErrorState();
  
  		/* Flush any leaked data in the top-level context */
! 		MemoryContextResetAndDeleteChildren(AutovacMemCxt);
  
  		/* Make sure pgstat also considers our stat data as gone */
  		pgstat_clear_snapshot();
*************** AutoVacLauncherMain(int argc, char *argv
*** 361,378 ****
  	ereport(LOG,
  			(errmsg("autovacuum launcher started")));
  
  	PG_SETMASK(&UnBlockSig);
  
  	/*
! 	 * take a nap before executing the first iteration, unless we were
! 	 * requested an emergency run.
  	 */
! 	if (autovacuum_start_daemon)
! 		pg_usleep(autovacuum_naptime * 1000000L); 
  
  	for (;;)
  	{
! 		int		worker_pid;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
--- 446,476 ----
  	ereport(LOG,
  			(errmsg("autovacuum launcher started")));
  
+ 	/* must unblock signals before calling rebuild_database_list */
  	PG_SETMASK(&UnBlockSig);
  
+ 	/* in emergency mode, just start a worker and go away */
+ 	if (!autovacuum_start_daemon)
+ 	{
+ 		do_start_worker();
+ 		proc_exit(0);		/* done */
+ 	}
+ 
+ 	AutoVacuumShmem->av_launcherpid = MyProcPid;
+ 
  	/*
! 	 * Create the initial database list.  The invariant we want this list to
! 	 * keep is that it's ordered by decreasing next_time.  As soon as an entry is updated to
! 	 * a higher time, it will be moved to the front (which is correct because
! 	 * the only operation is to add autovacuum_naptime to the entry, and time
! 	 * always increases).
  	 */
! 	rebuild_database_list(InvalidOid);
! 	free_workers = autovacuum_max_workers;
  
  	for (;;)
  	{
! 		int		millis;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
*************** AutoVacLauncherMain(int argc, char *argv
*** 381,386 ****
--- 479,490 ----
  		if (!PostmasterIsAlive(true))
  			exit(1);
  
+ 		millis = launcher_determine_sleep(free_workers > 0);
+ 
+ 		/* Sleep for a while according to schedule */
+ 		pg_usleep(millis * 1000);
+ 
+ 		/* the normal shutdown case */
  		if (avlauncher_shutdown_request)
  			break;
  
*************** AutoVacLauncherMain(int argc, char *argv
*** 390,469 ****
  			ProcessConfigFile(PGC_SIGHUP);
  		}
  
  		/*
! 		 * if there's a worker already running, sleep until it
! 		 * disappears.
  		 */
! 		LWLockAcquire(AutovacuumLock, LW_SHARED);
! 		worker_pid = AutoVacuumShmem->worker_pid;
! 		LWLockRelease(AutovacuumLock);
! 
! 		if (worker_pid != 0)
  		{
! 			PGPROC *proc = BackendPidGetProc(worker_pid);
  
! 			if (proc != NULL && proc->isAutovacuum)
! 				goto sleep;
  			else
  			{
  				/*
! 				 * if the worker is not really running (or it's a process
! 				 * that's not an autovacuum worker), remove the PID from shmem.
! 				 * This should not happen, because either the worker exits
! 				 * cleanly, in which case it'll remove the PID, or it dies, in
! 				 * which case postmaster will cause a system reset cycle.
  				 */
! 				LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 				worker_pid = 0;
! 				LWLockRelease(AutovacuumLock);
  			}
  		}
  
- 		do_start_worker();
- 
- sleep:
- 		/*
- 		 * in emergency mode, exit immediately so that the postmaster can
- 		 * request another run right away if needed.
- 		 *
- 		 * XXX -- maybe it would be better to handle this inside the launcher
- 		 * itself.
- 		 */
- 		if (!autovacuum_start_daemon)
- 			break;
- 
  		/* have pgstat read the file again next time */
  		pgstat_clear_snapshot();
- 
- 		/* now sleep until the next autovac iteration */
- 		pg_usleep(autovacuum_naptime * 1000000L); 
  	}
  
  	/* Normal exit from the autovac launcher is here */
  	ereport(LOG,
  			(errmsg("autovacuum launcher shutting down")));
  
  	proc_exit(0);		/* done */
  }
  
  /*
   * do_start_worker
   *
   * Bare-bones procedure for starting an autovacuum worker from the launcher.
   * It determines what database to work on, sets up shared memory stuff and
!  * signals postmaster to start the worker.
   */
! static void
  do_start_worker(void)
  {
  	List	   *dblist;
! 	bool		for_xid_wrap;
! 	autovac_dbase *db;
! 	ListCell *cell;
  	TransactionId xidForceLimit;
  
  	/* Get a list of databases */
! 	dblist = autovac_get_database_list();
  
  	/*
  	 * Determine the oldest datfrozenxid/relfrozenxid that we will allow
--- 494,877 ----
  			ProcessConfigFile(PGC_SIGHUP);
  		}
  
+ 		/* a worker started up or finished */
+ 		if (got_SIGUSR1)
+ 		{
+ 			WorkerInfo *worker;
+ 			int			i;
+ 
+ 			got_SIGUSR1 = false;
+ 
+ 			/* Walk the workers and clean up finished entries. */
+ 			LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);	
+ 			foreach_worker(i, worker)
+ 			{
+ 				if (worker->wi_finished)
+ 				{
+ 					worker->wi_tableoid = InvalidOid;
+ 					worker->wi_dboid = InvalidOid;
+ 					worker->wi_workerpid = 0;
+ 					worker->wi_finished = false;
+ 					free_workers++;
+ 				}
+ 			}
+ 			LWLockRelease(AutovacuumLock);
+ 		}
+ 
  		/*
! 		 * See if there's need to start a new worker, and do so if possible.
! 		 * If there are no free worker slots, avoid doing all this work, as
! 		 * we will not be able to start the worker anyway.
  		 */
! 		if (free_workers > 0)
  		{
! 			TimestampTz	current_time;
! 			Dlelem	   *elem;
! 
! 			elem = DLGetTail(DatabaseList);
! 			current_time = GetCurrentTimestamp();
  
! 			if (elem != NULL)
! 			{
! 				avl_dbase *avdb = DLE_VAL(elem);
! 				long	secs;
! 				int		usecs;
! 
! 				TimestampDifference(current_time, avdb->adl_next_worker, &secs, &usecs);
! 
! 				/* do we have to start a worker? */
! 				if (secs <= 0 && usecs <= 0)
! 					launch_worker(current_time);
! 			}
  			else
  			{
  				/*
! 				 * Special case when the list is empty: start a worker right
! 				 * away.  This covers the initial case, when no database is in
! 				 * pgstats (thus the list is empty).
  				 */
! 				launch_worker(current_time);
  			}
  		}
  
  		/* have pgstat read the file again next time */
  		pgstat_clear_snapshot();
  	}
  
  	/* Normal exit from the autovac launcher is here */
  	ereport(LOG,
  			(errmsg("autovacuum launcher shutting down")));
+ 	AutoVacuumShmem->av_launcherpid = 0;
  
  	proc_exit(0);		/* done */
  }
  
+ 
+ /*
+  * Determine the time to sleep, in milliseconds, based on the database list.
+  *
+  * The "canlaunch" parameter indicates whether we can start a worker right now,
+  * for example due to the workers being all busy.
+  */
+ static int
+ launcher_determine_sleep(bool canlaunch)
+ {
+ 	long	secs;
+ 	int		usecs;
+ 	Dlelem *elem;
+ 
+ 	/*
+ 	 * We sleep until the next scheduled vacuum.  We trust that when the
+ 	 * database list was built, care was taken so that no entries have times in
+ 	 * the past; if the first entry has too close a next_worker value, or a
+ 	 * time in the past, we will sleep a small nominal time.
+ 	 */
+ 	if (!canlaunch)
+ 	{
+ 		secs = autovacuum_naptime;
+ 		usecs = 0;
+ 	}
+ 	else if ((elem = DLGetTail(DatabaseList)) != NULL)
+ 	{
+ 		avl_dbase  *avdb = DLE_VAL(elem);
+ 		TimestampTz	current_time = GetCurrentTimestamp();
+ 		TimestampTz	next_wakeup;
+ 
+ 		next_wakeup = avdb->adl_next_worker;
+ 		TimestampDifference(current_time, next_wakeup, &secs, &usecs);
+ 	}
+ 	else
+ 	{
+ 		/* list is empty, sleep for whole autovacuum_naptime seconds  */
+ 		secs = autovacuum_naptime;
+ 		usecs = 0;
+ 	}
+ 
+ 	/*
+ 	 * someone screwed up (invalid entry on the list); sleep a nominal amount
+ 	 */
+ 	if (secs <= 0L && usecs <= 0)
+ 	{
+ 		secs = 0;
+ 		usecs = 500000;	/* 500 ms */
+ 	}
+ 
+ 	return secs * 1000 + usecs / 1000;
+ }
+ 
+ /*
+  * Build an updated DatabaseList.  It must only contain databases that appear
+  * in pgstats, and must be sorted by next_worker from highest to lowest,
+  * distributed regularly across the next autovacuum_naptime interval.
+  *
+  * Receives the Oid of the database that made this list be generated (we call
+  * this the "new" database, because when the database was already present on
+  * the list, we expect that this function is not called at all).  The
+  * preexisting list, if any, will be used to preserve the order of the
+  * databases in the autovacuum_naptime period.  The new database is put at the
+  * end of the interval.  The actual values are not saved, which should not be
+  * much of a problem.
+  */
+ static void
+ rebuild_database_list(Oid newdb)
+ {
+ 	List	   *dblist;
+ 	ListCell   *cell;
+ 	MemoryContext newcxt;
+ 	MemoryContext oldcxt;
+ 	MemoryContext tmpcxt;
+ 	HASHCTL		hctl;
+ 	int			score;
+ 	int			nelems;
+ 	HTAB	   *dbhash;
+ 
+ 	newcxt = AllocSetContextCreate(AutovacMemCxt,
+ 								   "AV dblist",
+ 								   ALLOCSET_DEFAULT_MINSIZE,
+ 								   ALLOCSET_DEFAULT_INITSIZE,
+ 								   ALLOCSET_DEFAULT_MAXSIZE);
+ 	tmpcxt = AllocSetContextCreate(newcxt,
+ 								   "tmp AV dblist",
+ 								   ALLOCSET_DEFAULT_MINSIZE,
+ 								   ALLOCSET_DEFAULT_INITSIZE,
+ 								   ALLOCSET_DEFAULT_MAXSIZE);
+ 	oldcxt = MemoryContextSwitchTo(tmpcxt);
+ 
+ 	/*
+ 	 * Implementing this is not as simple as it sounds, because we need to put
+ 	 * the new database at the end of the list; next the databases that were
+ 	 * already on the list, and finally (at the tail of the list) all the other
+ 	 * databases that are not on the existing list.
+ 	 *
+ 	 * To do this, we build an empty hash table of scored databases.  We will
+ 	 * start with the lowest score (zero) for the new database, then increasing
+ 	 * scores for the databases in the existing list, in order, and lastly
+ 	 * increasing scores for all databases gotten via get_database_list() that
+ 	 * are not already on the hash.
+ 	 *
+ 	 * Then we will put all the hash elements into an array, sort the array by
+ 	 * score, and finally put the array elements into the new doubly linked
+ 	 * list.
+ 	 */
+ 	hctl.keysize = sizeof(Oid);
+ 	hctl.entrysize = sizeof(avl_dbase);
+ 	hctl.hash = oid_hash;
+ 	hctl.hcxt = tmpcxt;
+ 	dbhash = hash_create("db hash", 20, &hctl,	/* magic number here FIXME */
+ 						 HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+ 
+ 	/* start by inserting the new database */
+ 	score = 0;
+ 	if (OidIsValid(newdb))
+ 	{
+ 		avl_dbase	*db;
+ 		PgStat_StatDBEntry *entry;
+ 
+ 		/* only consider this database if it has a pgstat entry */
+ 		entry = pgstat_fetch_stat_dbentry(newdb);
+ 		if (entry != NULL)
+ 		{
+ 			/* we assume it isn't found because the hash was just created */
+ 			db = hash_search(dbhash, &newdb, HASH_ENTER, NULL);
+ 
+ 			/* hash_search already filled in the key */
+ 			db->adl_score = score++;
+ 			/* next_worker is filled in later */
+ 		}
+ 	}
+ 
+ 	/* Now insert the databases from the existing list */
+ 	if (DatabaseList != NULL)
+ 	{
+ 		Dlelem	*elem;
+ 
+ 		elem = DLGetHead(DatabaseList);
+ 		while (elem != NULL)
+ 		{
+ 			avl_dbase  *avdb = DLE_VAL(elem);
+ 			avl_dbase  *db;
+ 			bool		found;
+ 			PgStat_StatDBEntry *entry;
+ 
+ 			/*
+ 			 * skip databases with no stat entries -- in particular, this
+ 			 * gets rid of dropped databases
+ 			 */
+ 			entry = pgstat_fetch_stat_dbentry(avdb->adl_datid);
+ 			if (entry == NULL)
+ 				continue;
+ 
+ 			db = hash_search(dbhash, &(avdb->adl_datid), HASH_ENTER, &found);
+ 
+ 			if (!found)
+ 			{
+ 				/* hash_search already filled in the key */
+ 				db->adl_score = score++;
+ 				/* next_worker is filled in later */
+ 			}
+ 
+ 			elem = DLGetSucc(elem);
+ 		}
+ 	}
+ 
+ 	/* finally, insert all qualifying databases not previously inserted */
+ 	dblist = get_database_list();
+ 	foreach (cell, dblist)
+ 	{
+ 		avw_dbase  *avdb = lfirst(cell);
+ 		avl_dbase  *db;
+ 		bool		found;
+ 		PgStat_StatDBEntry *entry;
+ 
+ 		/* only consider databases with a pgstat entry */
+ 		entry = pgstat_fetch_stat_dbentry(avdb->adw_datid);
+ 		if (entry == NULL)
+ 			continue;
+ 
+ 		db = hash_search(dbhash, &(avdb->adw_datid), HASH_ENTER, &found);
+ 		/* only update the score if the database was not already on the hash */
+ 		if (!found)
+ 		{
+ 			/* hash_search already filled in the key */
+ 			db->adl_score = score++;
+ 			/* next_worker is filled in later */
+ 		}
+ 	}
+ 	nelems = score;
+ 
+ 	/* from here on, the allocated memory belongs to the new list */
+ 	MemoryContextSwitchTo(newcxt);
+ 	DatabaseList = DLNewList();
+ 
+ 	if (nelems > 0)
+ 	{
+ 		TimestampTz		current_time;
+ 		int				millis_increment;
+ 		avl_dbase	   *dbary;
+ 		avl_dbase	   *db;
+ 		HASH_SEQ_STATUS	seq;
+ 		int				i;
+ 
+ 		/* put all the hash elements into an array */
+ 		dbary = palloc(nelems * sizeof(avl_dbase));
+ 
+ 		i = 0;
+ 		hash_seq_init(&seq, dbhash);
+ 		while ((db = hash_seq_search(&seq)) != NULL)
+ 			memcpy(&(dbary[i++]), db, sizeof(avl_dbase));
+ 
+ 		/* sort the array */
+ 		qsort(dbary, nelems, sizeof(avl_dbase), db_comparator);
+ 
+ 		/* this is the time interval between databases in the schedule */
+ 		millis_increment = 1000.0 * autovacuum_naptime / nelems;
+ 		current_time = GetCurrentTimestamp();
+ 
+ 		/*
+ 		 * move the elements from the array into the dllist, setting the 
+ 		 * next_worker while walking the array
+ 		 */
+ 		for (i = 0; i < nelems; i++)
+ 		{
+ 			avl_dbase  *db = &(dbary[i]);
+ 			Dlelem	   *elem;
+ 
+ 			current_time = TimestampTzPlusMilliseconds(current_time,
+ 													   millis_increment);
+ 			db->adl_next_worker = current_time;
+ 
+ 			elem = DLNewElem(db);
+ 			/* later elements should go closer to the head of the list */
+ 			DLAddHead(DatabaseList, elem);
+ 		}
+ 	}
+ 
+ 	/* all done, clean up memory */
+ 	if (DatabaseListCxt != NULL)
+ 		MemoryContextDelete(DatabaseListCxt);
+ 	MemoryContextDelete(tmpcxt);
+ 	DatabaseListCxt = newcxt;
+ 	MemoryContextSwitchTo(oldcxt);
+ }
+ 
+ /* qsort comparator for avl_dbase, using adl_score */
+ static int
+ db_comparator(const void *a, const void *b)
+ {
+ 	if (((avl_dbase *) a)->adl_score == ((avl_dbase *) b)->adl_score)
+ 		return 0;
+ 	else
+ 		return (((avl_dbase *) a)->adl_score < ((avl_dbase *) b)->adl_score) ? 1 : -1;
+ }
+ 
  /*
   * do_start_worker
   *
   * Bare-bones procedure for starting an autovacuum worker from the launcher.
   * It determines what database to work on, sets up shared memory stuff and
!  * signals postmaster to start the worker.  It fails gracefully if invoked when
!  * autovacuum_workers are already active.
!  *
!  * Return value is the OID of the database that the worker is going to process,
!  * or InvalidOid if no worker was actually started.
   */
! static Oid
  do_start_worker(void)
  {
  	List	   *dblist;
! 	WorkerInfo *worker;
! 	int			i;
! 	ListCell   *cell;
  	TransactionId xidForceLimit;
+ 	bool		for_xid_wrap;
+ 	avw_dbase  *avdb;
+ 	TimestampTz	current_time;
+ 	bool		skipit = false;
+ 
+ 	/*
+ 	 * Find an unused WorkerInfo entry to set up.  If there is none, go to
+ 	 * sleep.
+ 	 *
+ 	 * NB: we only read the array here, and save a pointer where we'll
+ 	 * write the entry later.  Since this is the only process that creates
+ 	 * new entries into the array, there's no risk that somebody else will
+ 	 * use that pointer while we weren't looking.
+ 	 */
+ 	LWLockAcquire(AutovacuumLock, LW_SHARED);
+ 	foreach_worker(i, worker)
+ 	{
+ 		/* Invalid database OID means unused worker entry; use it */
+ 		if (!OidIsValid(worker->wi_dboid))
+ 			break;
+ 	}
+ 	LWLockRelease(AutovacuumLock);
+ 
+ 	/* they're all used up */
+ 	if (i >= autovacuum_max_workers)
+ 		return InvalidOid;
  
  	/* Get a list of databases */
! 	dblist = get_database_list();
  
  	/*
  	 * Determine the oldest datfrozenxid/relfrozenxid that we will allow
*************** do_start_worker(void)
*** 495,515 ****
  	 * isn't clear how to construct a metric that measures that and not cause
  	 * starvation for less busy databases.
  	 */
! 	db = NULL;
  	for_xid_wrap = false;
  	foreach(cell, dblist)
  	{
! 		autovac_dbase *tmp = lfirst(cell);
  
  		/* Find pgstat entry if any */
! 		tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid);
  
  		/* Check to see if this one is at risk of wraparound */
! 		if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit))
  		{
! 			if (db == NULL ||
! 				TransactionIdPrecedes(tmp->ad_frozenxid, db->ad_frozenxid))
! 				db = tmp;
  			for_xid_wrap = true;
  			continue;
  		}
--- 903,925 ----
  	 * isn't clear how to construct a metric that measures that and not cause
  	 * starvation for less busy databases.
  	 */
! 	avdb = NULL;
  	for_xid_wrap = false;
+ 	current_time = GetCurrentTimestamp();
  	foreach(cell, dblist)
  	{
! 		avw_dbase  *tmp = lfirst(cell);
! 		Dlelem	   *elem;
  
  		/* Find pgstat entry if any */
! 		tmp->adw_entry = pgstat_fetch_stat_dbentry(tmp->adw_datid);
  
  		/* Check to see if this one is at risk of wraparound */
! 		if (TransactionIdPrecedes(tmp->adw_frozenxid, xidForceLimit))
  		{
! 			if (avdb == NULL ||
! 				TransactionIdPrecedes(tmp->adw_frozenxid, avdb->adw_frozenxid))
! 				avdb = tmp;
  			for_xid_wrap = true;
  			continue;
  		}
*************** do_start_worker(void)
*** 520,545 ****
  		 * Otherwise, skip a database with no pgstat entry; it means it
  		 * hasn't seen any activity.
  		 */
! 		if (!tmp->ad_entry)
  			continue;
  
  		/*
  		 * Remember the db with oldest autovac time.  (If we are here,
  		 * both tmp->entry and db->entry must be non-null.)
  		 */
! 		if (db == NULL ||
! 			tmp->ad_entry->last_autovac_time < db->ad_entry->last_autovac_time)
! 			db = tmp;
  	}
  
  	/* Found a database -- process it */
! 	if (db != NULL)
  	{
  		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 		AutoVacuumShmem->process_db = db->ad_datid;
  		LWLockRelease(AutovacuumLock);
  
  		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
  	}
  }
  
--- 930,1069 ----
  		 * Otherwise, skip a database with no pgstat entry; it means it
  		 * hasn't seen any activity.
  		 */
! 		if (!tmp->adw_entry)
! 			continue;
! 
! 		/*
! 		 * Also, skip a database that appears on the database list as having
! 		 * been processed recently (less than autovacuum_naptime seconds ago).
! 		 * We do this so that we don't select a database which we just
! 		 * selected, but that pgstat hasn't gotten around to updating the last
! 		 * autovacuum time yet.
! 		 */
! 		skipit = false;
! 		elem = DatabaseList ? DLGetTail(DatabaseList) : NULL;
! 
! 		while (elem != NULL)
! 		{
! 			avl_dbase *dbp = DLE_VAL(elem);
! 
! 			if (dbp->adl_datid == tmp->adw_datid)
! 			{
! 				TimestampTz		curr_plus_naptime;
! 				TimestampTz		next = dbp->adl_next_worker;
! 				
! 				curr_plus_naptime =
! 					TimestampTzPlusMilliseconds(current_time,
! 												autovacuum_naptime * 1000);
! 
! 				/*
! 				 * What we want here if to skip if next_worker falls between
! 				 * the current time and the current time plus naptime.
! 				 */
! 				if (timestamp_cmp_internal(current_time, next) > 0)
! 					skipit = false;
! 				else if (timestamp_cmp_internal(next, curr_plus_naptime) > 0)
! 					skipit = false;
! 				else
! 					skipit = true;
! 
! 				break;
! 			}
! 			elem = DLGetPred(elem);
! 		}
! 		if (skipit)
  			continue;
  
  		/*
  		 * Remember the db with oldest autovac time.  (If we are here,
  		 * both tmp->entry and db->entry must be non-null.)
  		 */
! 		if (avdb == NULL ||
! 			tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time)
! 			avdb = tmp;
  	}
  
  	/* Found a database -- process it */
! 	if (avdb != NULL)
  	{
  		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 		Assert(!OidIsValid(worker->wi_dboid));
! 		worker->wi_dboid = avdb->adw_datid;
! 		worker->wi_workerpid = 0;
  		LWLockRelease(AutovacuumLock);
  
  		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
+ 
+ 		return avdb->adw_datid;
+ 	}
+ 	else if (skipit)
+ 	{
+ 		/*
+ 		 * If we skipped all databases on the list, rebuild it, because it
+ 		 * probably contains a dropped database.
+ 		 */
+ 		rebuild_database_list(InvalidOid);
+ 	}
+ 
+ 	return InvalidOid;
+ }
+ 
+ /*
+  * launch_worker
+  *
+  * Wrapper for starting a worker from the launcher.  Besides actually starting
+  * it, update the database list to reflect the next time that another one will
+  * need to be started on the selected database.  The actual database choice is
+  * left to do_start_worker.
+  *
+  * This routine is also expected to insert an entry into the database list if
+  * the selected database was previously absent from the list.  It returns the
+  * new database list.
+  */
+ static void
+ launch_worker(TimestampTz now)
+ {
+ 	Oid		dbid;
+ 	Dlelem *elem;
+ 
+ 	dbid = do_start_worker();
+ 	if (OidIsValid(dbid))
+ 	{
+ 		free_workers--;
+ 
+ 		/*
+ 		 * Walk the database list and update the corresponding entry.  If the
+ 		 * database is not on the list, we'll recreate the list.
+ 		 */
+ 		elem = DatabaseList == NULL ? NULL : DLGetHead(DatabaseList);
+ 		while (elem != NULL)
+ 		{
+ 			avl_dbase *avdb = DLE_VAL(elem);
+ 
+ 			if (avdb->adl_datid == dbid)
+ 			{
+ 				/*
+ 				 * add autovacuum_naptime seconds to the current time, and use
+ 				 * that as the new "next_worker" field for this database.
+ 				 */
+ 				avdb->adl_next_worker =
+ 					TimestampTzPlusMilliseconds(now, autovacuum_naptime * 1000);
+ 
+ 				DLMoveToFront(elem);
+ 				break;
+ 			}
+ 			elem = DLGetSucc(elem);
+ 		}
+ 
+ 		/*
+ 		 * If the database was not present in the database list, we rebuild the
+ 		 * list.  It's possible that the database does not get into the list
+ 		 * anyway, for example if it's a database that doesn't have a pgstat
+ 		 * entry, but this is not a problem because we don't want to schedule
+ 		 * workers regularly into those in any case.
+ 		 */
+ 		if (elem == NULL)
+ 			rebuild_database_list(dbid);
  	}
  }
  
*************** avl_sighup_handler(SIGNAL_ARGS)
*** 550,555 ****
--- 1074,1086 ----
  	got_SIGHUP = true;
  }
  
+ /* SIGUSR1: a worker is up and running, or just finished */
+ static void
+ avl_sigusr1_handler(SIGNAL_ARGS)
+ {
+ 	got_SIGUSR1 = true;
+ }
+ 
  static void
  avlauncher_shutdown(SIGNAL_ARGS)
  {
*************** NON_EXEC_STATIC void
*** 665,671 ****
  AutoVacWorkerMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
! 	Oid			dbid;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
--- 1196,1204 ----
  AutoVacWorkerMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
! 	Oid			dbid = InvalidOid;
! 	WorkerInfo *worker;
! 	int			i;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 763,778 ****
  	SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
  
  	/*
! 	 * Get the database Id we're going to work on, and announce our PID
! 	 * in the shared memory area.  We remove the database OID immediately
! 	 * from the shared memory area.
  	 */
! 	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 
! 	dbid = AutoVacuumShmem->process_db;
! 	AutoVacuumShmem->process_db = InvalidOid;
! 	AutoVacuumShmem->worker_pid = MyProcPid;
  
  	LWLockRelease(AutovacuumLock);
  
  	if (OidIsValid(dbid))
--- 1296,1323 ----
  	SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
  
  	/*
! 	 * Force statement_timeout to zero to avoid a timeout setting from
! 	 * preventing regular maintenance from being executed.
  	 */
! 	SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
  
+ 	/*
+ 	 * Walk the WorkerInfo array, and get the database OID we're going to work
+ 	 * on.  Use the first entry with PID 0 in the list, and advertise our PID
+ 	 * on it, thus marking it used.
+ 	 */
+ 	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ 	foreach_worker(i, worker)
+ 	{
+ 		if (worker->wi_workerpid == 0)
+ 		{
+ 			dbid = worker->wi_dboid;
+ 			worker->wi_workerpid = MyProcPid;
+ 			break;
+ 		}
+ 	}
+ 	if (AutoVacuumShmem->av_launcherpid != 0)
+ 		kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
  	LWLockRelease(AutovacuumLock);
  
  	if (OidIsValid(dbid))
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 803,824 ****
  
  		/* Create the memory context where cross-transaction state is stored */
  		AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! 											  "Autovacuum context",
  											  ALLOCSET_DEFAULT_MINSIZE,
  											  ALLOCSET_DEFAULT_INITSIZE,
  											  ALLOCSET_DEFAULT_MAXSIZE);
  
  		/* And do an appropriate amount of work */
  		recentXid = ReadNewTransactionId();
! 		do_autovacuum();
  	}
  
- 	/*
- 	 * Now remove our PID from shared memory, so that the launcher can start
- 	 * another worker as soon as appropriate.
- 	 */
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 	AutoVacuumShmem->worker_pid = 0;
  	LWLockRelease(AutovacuumLock);
  
  	/* All done, go away */
--- 1348,1385 ----
  
  		/* Create the memory context where cross-transaction state is stored */
  		AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! 											  "AV worker",
  											  ALLOCSET_DEFAULT_MINSIZE,
  											  ALLOCSET_DEFAULT_INITSIZE,
  											  ALLOCSET_DEFAULT_MAXSIZE);
  
  		/* And do an appropriate amount of work */
  		recentXid = ReadNewTransactionId();
! 		do_autovacuum(worker);
  	}
  
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 	if (!autovacuum_start_daemon)
! 	{
! 		/* in emergency mode we must cleanup after ourselves */
! 		worker->wi_workerpid = 0;
! 		worker->wi_dboid = InvalidOid;
! 		worker->wi_tableoid = InvalidOid;
! 		worker->wi_finished = false;
! 	}
! 	else
! 	{
! 		/*
! 		 * Otherwise, let the launcher know we're done.  Warning: must set the
! 		 * flag before sending the signal.  Note: we don't care about code
! 		 * rearrangement from the compiler, because we're doing this with the
! 		 * lock held, thus the launcher can't read the flag until we've
! 		 * released the lock, below.
! 		 */
! 		worker->wi_finished = true;
! 		if (AutoVacuumShmem->av_launcherpid != 0)
! 			kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
! 	}
  	LWLockRelease(AutovacuumLock);
  
  	/* All done, go away */
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 826,838 ****
  }
  
  /*
!  * autovac_get_database_list
   *
   *		Return a list of all databases.  Note we cannot use pg_database,
   *		because we aren't connected; we use the flat database file.
   */
  static List *
! autovac_get_database_list(void)
  {
  	char	   *filename;
  	List	   *dblist = NIL;
--- 1387,1399 ----
  }
  
  /*
!  * get_database_list
   *
   *		Return a list of all databases.  Note we cannot use pg_database,
   *		because we aren't connected; we use the flat database file.
   */
  static List *
! get_database_list(void)
  {
  	char	   *filename;
  	List	   *dblist = NIL;
*************** autovac_get_database_list(void)
*** 852,866 ****
  	while (read_pg_database_line(db_file, thisname, &db_id,
  								 &db_tablespace, &db_frozenxid))
  	{
! 		autovac_dbase *avdb;
  
! 		avdb = (autovac_dbase *) palloc(sizeof(autovac_dbase));
  
! 		avdb->ad_datid = db_id;
! 		avdb->ad_name = pstrdup(thisname);
! 		avdb->ad_frozenxid = db_frozenxid;
  		/* this gets set later: */
! 		avdb->ad_entry = NULL;
  
  		dblist = lappend(dblist, avdb);
  	}
--- 1413,1427 ----
  	while (read_pg_database_line(db_file, thisname, &db_id,
  								 &db_tablespace, &db_frozenxid))
  	{
! 		avw_dbase *avdb;
  
! 		avdb = (avw_dbase *) palloc(sizeof(avw_dbase));
  
! 		avdb->adw_datid = db_id;
! 		avdb->adw_name = pstrdup(thisname);
! 		avdb->adw_frozenxid = db_frozenxid;
  		/* this gets set later: */
! 		avdb->adw_entry = NULL;
  
  		dblist = lappend(dblist, avdb);
  	}
*************** autovac_get_database_list(void)
*** 878,884 ****
   * order not to ignore shutdown commands for too long.
   */
  static void
! do_autovacuum(void)
  {
  	Relation	classRel,
  				avRel;
--- 1439,1445 ----
   * order not to ignore shutdown commands for too long.
   */
  static void
! do_autovacuum(WorkerInfo *worker)
  {
  	Relation	classRel,
  				avRel;
*************** do_autovacuum(void)
*** 1038,1047 ****
--- 1599,1642 ----
  		Oid		relid = lfirst_oid(cell);
  		autovac_table *tab;
  		char   *relname;
+ 		WorkerInfo *other_worker;
+ 		int         i;
+ 		bool        skipit;
  
  		CHECK_FOR_INTERRUPTS();
  
  		/*
+ 		 * hold schedule lock from here until we're sure that this table
+ 		 * still needs vacuuming
+ 		 */
+ 		LWLockAcquire(AutovacuumScheduleLock, LW_EXCLUSIVE);
+ 
+ 		/*
+ 		 * Check whether the table is being vacuumed concurrently by another
+ 		 * worker.
+ 		 */
+ 		skipit = false;
+ 		foreach_worker(i, other_worker)
+ 		{
+ 			/*
+ 			 * ignore not-yet-registered or not running workers, and workers in
+ 			 * other databases
+ 			 */
+ 			if (other_worker->wi_workerpid == 0 ||
+ 				other_worker->wi_dboid != MyDatabaseId)
+ 				continue;
+ 
+ 			if (other_worker->wi_tableoid == relid)
+ 			{
+ 				LWLockRelease(AutovacuumScheduleLock);
+ 				skipit = true;
+ 				break;
+ 			}
+ 		}
+ 		if (skipit)
+ 			continue;
+ 
+ 		/*
  		 * Check whether pgstat data still says we need to vacuum this table.
  		 * It could have changed if something else processed the table while we
  		 * weren't looking.
*************** do_autovacuum(void)
*** 1053,1061 ****
  		if (tab == NULL)
  		{
  			/* someone else vacuumed the table */
  			continue;
  		}
! 		/* Ok, good to go! */
  
  		/* Set the vacuum cost parameters for this table */
  		VacuumCostDelay = tab->at_vacuum_cost_delay;
--- 1648,1663 ----
  		if (tab == NULL)
  		{
  			/* someone else vacuumed the table */
+ 			LWLockRelease(AutovacuumScheduleLock);
  			continue;
  		}
! 
! 		/*
! 		 * Ok, good to go.  Store the table in shared memory before releasing
! 		 * the lock so that other workers don't vacuum it concurrently.
! 		 */
! 		worker->wi_tableoid = relid;
! 		LWLockRelease(AutovacuumScheduleLock);
  
  		/* Set the vacuum cost parameters for this table */
  		VacuumCostDelay = tab->at_vacuum_cost_delay;
*************** IsAutoVacuumWorkerProcess(void)
*** 1630,1636 ****
  Size
  AutoVacuumShmemSize(void)
  {
! 	return sizeof(AutoVacuumShmemStruct);
  }
  
  /*
--- 2232,2239 ----
  Size
  AutoVacuumShmemSize(void)
  {
! 	return add_size(offsetof(AutoVacuumShmemStruct, av_workers),
!  					mul_size(autovacuum_max_workers, sizeof(WorkerInfo)));
  }
  
  /*
*************** AutoVacuumShmemInit(void)
*** 1653,1657 ****
  	if (found)
  		return;                 /* already initialized */
  
! 	MemSet(AutoVacuumShmem, 0, sizeof(AutoVacuumShmemStruct));
  }
--- 2256,2260 ----
  	if (found)
  		return;                 /* already initialized */
  
! 	MemSet(AutoVacuumShmem, 0, AutoVacuumShmemSize());
  }
Index: src/backend/storage/ipc/procarray.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/ipc/procarray.c,v
retrieving revision 1.23
diff -c -p -r1.23 procarray.c
*** src/backend/storage/ipc/procarray.c	25 Mar 2007 19:45:14 -0000	1.23
--- src/backend/storage/ipc/procarray.c	30 Mar 2007 20:46:08 -0000
***************
*** 36,41 ****
--- 36,42 ----
  #include "access/xact.h"
  #include "access/twophase.h"
  #include "miscadmin.h"
+ #include "postmaster/autovacuum.h"
  #include "storage/procarray.h"
  #include "utils/tqual.h"
  
*************** ProcArrayShmemSize(void)
*** 89,95 ****
  
  	size = offsetof(ProcArrayStruct, procs);
  	size = add_size(size, mul_size(sizeof(PGPROC *),
! 								 add_size(MaxBackends, max_prepared_xacts)));
  
  	return size;
  }
--- 90,98 ----
  
  	size = offsetof(ProcArrayStruct, procs);
  	size = add_size(size, mul_size(sizeof(PGPROC *),
! 								   add_size(add_size(MaxBackends,
! 												 	 max_prepared_xacts),
! 											autovacuum_max_workers)));
  
  	return size;
  }
*************** CreateSharedProcArray(void)
*** 112,118 ****
  		 * We're the first - initialize.
  		 */
  		procArray->numProcs = 0;
! 		procArray->maxProcs = MaxBackends + max_prepared_xacts;
  	}
  }
  
--- 115,121 ----
  		 * We're the first - initialize.
  		 */
  		procArray->numProcs = 0;
! 		procArray->maxProcs = MaxBackends + max_prepared_xacts + autovacuum_max_workers;
  	}
  }
  
Index: src/backend/storage/lmgr/lock.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/lmgr/lock.c,v
retrieving revision 1.176
diff -c -p -r1.176 lock.c
*** src/backend/storage/lmgr/lock.c	1 Feb 2007 19:10:28 -0000	1.176
--- src/backend/storage/lmgr/lock.c	30 Mar 2007 20:46:08 -0000
***************
*** 37,42 ****
--- 37,43 ----
  #include "access/twophase_rmgr.h"
  #include "miscadmin.h"
  #include "pgstat.h"
+ #include "postmaster/autovacuum.h"
  #include "storage/lmgr.h"
  #include "utils/memutils.h"
  #include "utils/ps_status.h"
***************
*** 47,53 ****
  int			max_locks_per_xact; /* set by guc.c */
  
  #define NLOCKENTS() \
! 	mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
  
  
  /*
--- 48,55 ----
  int			max_locks_per_xact; /* set by guc.c */
  
  #define NLOCKENTS() \
! 	mul_size(max_locks_per_xact, \
! 			 add_size(add_size(MaxBackends, max_prepared_xacts), autovacuum_max_workers))
  
  
  /*
Index: src/backend/utils/misc/guc.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/misc/guc.c,v
retrieving revision 1.383
diff -c -p -r1.383 guc.c
*** src/backend/utils/misc/guc.c	19 Mar 2007 23:38:30 -0000	1.383
--- src/backend/utils/misc/guc.c	30 Mar 2007 20:46:08 -0000
*************** static struct config_int ConfigureNamesI
*** 1620,1625 ****
--- 1620,1634 ----
  		&autovacuum_freeze_max_age,
  		200000000, 100000000, 2000000000, NULL, NULL
  	},
+ 	{
+ 		/* this is PGC_POSTMASTER because it determines shared memory size */
+ 		{"autovacuum_max_workers", PGC_POSTMASTER, AUTOVACUUM,
+ 			gettext_noop("Sets the maximum number of simultaneously running autovacuum worker processes."),
+ 			NULL
+ 		},
+ 		&autovacuum_max_workers,
+ 		10, 1, INT_MAX, NULL, NULL
+ 	},
  
  	{
  		{"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER,
Index: src/include/postmaster/autovacuum.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/postmaster/autovacuum.h,v
retrieving revision 1.8
diff -c -p -r1.8 autovacuum.h
*** src/include/postmaster/autovacuum.h	15 Feb 2007 23:23:23 -0000	1.8
--- src/include/postmaster/autovacuum.h	30 Mar 2007 20:46:08 -0000
***************
*** 16,21 ****
--- 16,22 ----
  
  /* GUC variables */
  extern bool autovacuum_start_daemon;
+ extern int	autovacuum_max_workers;
  extern int	autovacuum_naptime;
  extern int	autovacuum_vac_thresh;
  extern double autovacuum_vac_scale;
Index: src/include/storage/lwlock.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/storage/lwlock.h,v
retrieving revision 1.34
diff -c -p -r1.34 lwlock.h
*** src/include/storage/lwlock.h	15 Feb 2007 23:23:23 -0000	1.34
--- src/include/storage/lwlock.h	30 Mar 2007 20:25:59 -0000
*************** typedef enum LWLockId
*** 62,67 ****
--- 62,68 ----
  	BtreeVacuumLock,
  	AddinShmemInitLock,
  	AutovacuumLock,
+ 	AutovacuumScheduleLock,
  	/* Individual lock IDs end here */
  	FirstBufMappingLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
---------------------------(end of broadcast)---------------------------
TIP 2: Don't 'kill -9' the postmaster

Reply via email to