ITAGAKI Takahiro wrote:
> > > >Yes, that's correct.  Per previous discussion, what I actually wanted to
> > > >do was to create a GUC setting to simplify the whole thing, something
> > > >like "autovacuum_max_mb_per_second" or "autovacuum_max_io_per_second".
> > > >Then, have each worker use up to (max_per_second/active workers) as much
> > > >IO resources.
> > 
> > One thing I forgot to mention is that this is unlikely to be implemented
> > in 8.3.
> 
> This is a WIP cost balancing patch built on autovacuum-multiworkers-5.patch.
> The total cost of workers are adjusted to autovacuum_vacuum_cost_delay.

I manually merged your patch on top of my own.  This is the result.
Please have a look at whether the new code is correct and behaves sanely
(I haven't tested it).

-- 
Alvaro Herrera                                http://www.CommandPrompt.com/
PostgreSQL Replication, Consulting, Custom Development, 24x7 support
Index: src/backend/commands/vacuum.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/vacuum.c,v
retrieving revision 1.349
diff -c -p -r1.349 vacuum.c
*** src/backend/commands/vacuum.c	14 Mar 2007 18:48:55 -0000	1.349
--- src/backend/commands/vacuum.c	11 Apr 2007 23:43:23 -0000
*************** vacuum_delay_point(void)
*** 3504,3509 ****
--- 3504,3512 ----
  
  		VacuumCostBalance = 0;
  
+ 		/* update balance values for workers */
+ 		AutoVacuumUpdateDelay();
+ 
  		/* Might have gotten an interrupt while sleeping */
  		CHECK_FOR_INTERRUPTS();
  	}
Index: src/backend/postmaster/autovacuum.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/postmaster/autovacuum.c,v
retrieving revision 1.40
diff -c -p -r1.40 autovacuum.c
*** src/backend/postmaster/autovacuum.c	28 Mar 2007 22:17:12 -0000	1.40
--- src/backend/postmaster/autovacuum.c	11 Apr 2007 23:43:31 -0000
***************
*** 43,48 ****
--- 43,49 ----
  #include "storage/proc.h"
  #include "storage/procarray.h"
  #include "storage/sinval.h"
+ #include "storage/spin.h"
  #include "tcop/tcopprot.h"
  #include "utils/flatfiles.h"
  #include "utils/fmgroids.h"
***************
*** 52,57 ****
--- 53,59 ----
  #include "utils/syscache.h"
  
  
+ static volatile sig_atomic_t got_SIGUSR1 = false;
  static volatile sig_atomic_t got_SIGHUP = false;
  static volatile sig_atomic_t avlauncher_shutdown_request = false;
  
*************** static volatile sig_atomic_t avlauncher_
*** 59,64 ****
--- 61,67 ----
   * GUC parameters
   */
  bool		autovacuum_start_daemon = false;
+ int			autovacuum_max_workers;
  int			autovacuum_naptime;
  int			autovacuum_vac_thresh;
  double		autovacuum_vac_scale;
*************** int			autovacuum_freeze_max_age;
*** 69,75 ****
  int			autovacuum_vac_cost_delay;
  int			autovacuum_vac_cost_limit;
  
! /* Flag to tell if we are in the autovacuum daemon process */
  static bool am_autovacuum_launcher = false;
  static bool am_autovacuum_worker = false;
  
--- 72,78 ----
  int			autovacuum_vac_cost_delay;
  int			autovacuum_vac_cost_limit;
  
! /* Flags to tell if we are in an autovacuum process */
  static bool am_autovacuum_launcher = false;
  static bool am_autovacuum_worker = false;
  
*************** static int	default_freeze_min_age;
*** 82,95 ****
  /* Memory context for long-lived data */
  static MemoryContext AutovacMemCxt;
  
! /* struct to keep list of candidate databases for vacuum */
! typedef struct autovac_dbase
  {
! 	Oid			ad_datid;
! 	char	   *ad_name;
! 	TransactionId ad_frozenxid;
! 	PgStat_StatDBEntry *ad_entry;
! } autovac_dbase;
  
  /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
  typedef struct av_relation
--- 85,106 ----
  /* Memory context for long-lived data */
  static MemoryContext AutovacMemCxt;
  
! /* struct to keep track of databases in launcher */
! typedef struct avl_dbase
  {
! 	Oid			adl_datid;			/* hash key -- must be first */
! 	TimestampTz	adl_next_worker;
! 	int			adl_score;
! } avl_dbase;
! 
! /* struct to keep track of databases in worker */
! typedef struct avw_dbase
! {
! 	Oid			adw_datid;
! 	char	   *adw_name;
! 	TransactionId adw_frozenxid;
! 	PgStat_StatDBEntry *adw_entry;
! } avw_dbase;
  
  /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
  typedef struct av_relation
*************** typedef struct autovac_table
*** 110,123 ****
  	int			at_vacuum_cost_limit;
  } autovac_table;
  
  typedef struct
  {
! 	Oid		process_db;			/* OID of database to process */
! 	int		worker_pid;			/* PID of the worker process, if any */
  } AutoVacuumShmemStruct;
  
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
--- 121,195 ----
  	int			at_vacuum_cost_limit;
  } autovac_table;
  
+ /*-------------
+  * This struct holds information about a single worker's whereabouts.  We keep
+  * an array of these in shared memory, sized according to
+  * autovacuum_max_workers.
+  *
+  * wi_links		entry into free list or running list
+  * wi_dboid		OID of the database this worker is supposed to work on
+  * wi_tableoid	OID of the table currently being vacuumed
+  * wi_workerpid	PID of the running worker, 0 if not yet started
+  * wi_launchtime Time this worker was launched
+  *
+  * All fields are protected by AutovacuumLock, except for wi_tableoid which is
+  * protected by AutovacuumScheduleLock (which is read-only for everyone except
+  * that worker itself).
+  *-------------
+  */
+ typedef struct WorkerInfoData
+ {
+ 	SHM_QUEUE	wi_links;
+ 	Oid			wi_dboid;
+ 	Oid			wi_tableoid;
+ 	int			wi_workerpid;
+ 	TimestampTz	wi_launchtime;
+ 	int			wi_cost_delay;
+ 	int			wi_cost_limit;
+ 	int			wi_cost_limit_base;
+ } WorkerInfoData;
+ 
+ typedef struct WorkerInfoData *WorkerInfo;
+ 
+ /* the spinlock protecting the PGPROC array */
+ NON_EXEC_STATIC slock_t *AutovacProcLock = NULL;
+ 
+ /*-------------
+  * The main autovacuum shmem struct.  On shared memory we store: 1) this main
+  * struct; 2) the array of WorkerInfo structs; 3) the array of PGPROCs.
+  *
+  * av_launcherpid	the PID of the autovacuum launcher
+  * av_freeProcs		the PGPROC freelist
+  * av_freeWorkers	the WorkerInfo freelist
+  * av_runningWorkers the WorkerInfo non-free queue
+  * av_startingWorker pointer to WorkerInfo currently being started (cleared by
+  *					the worker itself as soon as it's up and running)
+  * av_rebalance		true when a worker determines that cost limits must be
+  * 					rebalanced
+  *
+  * This struct is protected by AutovacuumLock, except for the PGPROC list which
+  * is protected by the AutovacProcLock spinlock.
+  *-------------
+  */
  typedef struct
  {
! 	pid_t			av_launcherpid;
! 	SHMEM_OFFSET	av_freeProcs;
! 	SHMEM_OFFSET	av_freeWorkers;
! 	SHM_QUEUE		av_runningWorkers;
! 	SHMEM_OFFSET	av_startingWorker;
! 	bool			av_rebalance;
  } AutoVacuumShmemStruct;
  
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
+ /* the database list in the launcher, and the context that contains it */
+ static Dllist *DatabaseList = NULL;
+ static MemoryContext DatabaseListCxt = NULL;
+ 
+ /* Pointer to my own WorkerInfo, valid on each worker */
+ static WorkerInfo	MyWorkerInfo = NULL;
+ 
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
*************** static pid_t avworker_forkexec(void);
*** 125,133 ****
  NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static void do_start_worker(void);
  static void do_autovacuum(void);
! static List *autovac_get_database_list(void);
  
  static void relation_check_autovac(Oid relid, Form_pg_class classForm,
  					   Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
--- 197,212 ----
  NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static Oid do_start_worker(void);
! static uint64 launcher_determine_sleep(bool canlaunch);
! static void launch_worker(TimestampTz now);
! static List *get_database_list(void);
! static void rebuild_database_list(Oid newdb);
! static int db_comparator(const void *a, const void *b);
! static void autovac_balance_cost(void);
! 
  static void do_autovacuum(void);
! static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
  
  static void relation_check_autovac(Oid relid, Form_pg_class classForm,
  					   Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
*************** static void relation_needs_vacanalyze(Oi
*** 141,152 ****
  
  static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
  						  bool doanalyze, int freeze_min_age);
- static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
  static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
  						  PgStat_StatDBEntry *shared,
  						  PgStat_StatDBEntry *dbentry);
  static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
  static void avl_sighup_handler(SIGNAL_ARGS);
  static void avlauncher_shutdown(SIGNAL_ARGS);
  static void avl_quickdie(SIGNAL_ARGS);
  
--- 220,231 ----
  
  static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
  						  bool doanalyze, int freeze_min_age);
  static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
  						  PgStat_StatDBEntry *shared,
  						  PgStat_StatDBEntry *dbentry);
  static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
  static void avl_sighup_handler(SIGNAL_ARGS);
+ static void avl_sigusr1_handler(SIGNAL_ARGS);
  static void avlauncher_shutdown(SIGNAL_ARGS);
  static void avl_quickdie(SIGNAL_ARGS);
  
*************** StartAutoVacLauncher(void)
*** 230,241 ****
  
  /*
   * Main loop for the autovacuum launcher process.
   */
  NON_EXEC_STATIC void
  AutoVacLauncherMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
- 	MemoryContext	avlauncher_cxt;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
--- 309,340 ----
  
  /*
   * Main loop for the autovacuum launcher process.
+  *
+  * The signalling between launcher and worker is as follows:
+  *
+  * When the worker has finished starting up, it stores its PID in wi_workerpid
+  * and sends a SIGUSR1 signal to the launcher.  The launcher then knows that
+  * the postmaster is ready to start a new worker.  We do it this way because
+  * otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't
+  * yet processed the last one, in which case the second signal would be lost.
+  * This is only useful when two workers need to be started close to one
+  * another, which should be rare but it's possible.
+  *
+  * When the worker exits, its ProcKill routine (actually AutovacWorkerProcKill)
+  * is in charge of resetting the WorkerInfo entry and signalling the launcher.
+  * The launcher then wakes up and can launch a new worker if need be, or just
+  * go back to sleep.
+  *
+  * There is a potential problem if, for some reason, a worker starts and is not
+  * able to bootstrap itself correctly.  To prevent this situation from starving
+  * the whole system, the launcher checks the launch time of the "starting
+  * worker".  If it's too old (older than autovacuum_naptime seconds), it resets
+  * the worker entry and puts it back into the free list.
   */
  NON_EXEC_STATIC void
  AutoVacLauncherMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
*************** AutoVacLauncherMain(int argc, char *argv
*** 264,272 ****
  	 * Set up signal handlers.	Since this is an auxiliary process, it has
  	 * particular signal requirements -- no deadlock checker or sinval
  	 * catchup, for example.
- 	 *
- 	 * XXX It may be a good idea to receive signals when an avworker process
- 	 * finishes.
  	 */
  	pqsignal(SIGHUP, avl_sighup_handler);
  
--- 363,368 ----
*************** AutoVacLauncherMain(int argc, char *argv
*** 276,282 ****
  	pqsignal(SIGALRM, SIG_IGN);
  
  	pqsignal(SIGPIPE, SIG_IGN);
! 	pqsignal(SIGUSR1, SIG_IGN);
  	/* We don't listen for async notifies */
  	pqsignal(SIGUSR2, SIG_IGN);
  	pqsignal(SIGFPE, FloatExceptionHandler);
--- 372,378 ----
  	pqsignal(SIGALRM, SIG_IGN);
  
  	pqsignal(SIGPIPE, SIG_IGN);
! 	pqsignal(SIGUSR1, avl_sigusr1_handler);
  	/* We don't listen for async notifies */
  	pqsignal(SIGUSR2, SIG_IGN);
  	pqsignal(SIGFPE, FloatExceptionHandler);
*************** AutoVacLauncherMain(int argc, char *argv
*** 300,311 ****
  	 * that we can reset the context during error recovery and thereby avoid
  	 * possible memory leaks.
  	 */
! 	avlauncher_cxt = AllocSetContextCreate(TopMemoryContext,
! 										   "Autovacuum Launcher",
! 										   ALLOCSET_DEFAULT_MINSIZE,
! 										   ALLOCSET_DEFAULT_INITSIZE,
! 										   ALLOCSET_DEFAULT_MAXSIZE);
! 	MemoryContextSwitchTo(avlauncher_cxt);
  
  
  	/*
--- 396,407 ----
  	 * that we can reset the context during error recovery and thereby avoid
  	 * possible memory leaks.
  	 */
! 	AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! 										  "Autovacuum Launcher",
! 										  ALLOCSET_DEFAULT_MINSIZE,
! 										  ALLOCSET_DEFAULT_INITSIZE,
! 										  ALLOCSET_DEFAULT_MAXSIZE);
! 	MemoryContextSwitchTo(AutovacMemCxt);
  
  
  	/*
*************** AutoVacLauncherMain(int argc, char *argv
*** 336,346 ****
  		 * Now return to normal top-level context and clear ErrorContext for
  		 * next time.
  		 */
! 		MemoryContextSwitchTo(avlauncher_cxt);
  		FlushErrorState();
  
  		/* Flush any leaked data in the top-level context */
! 		MemoryContextResetAndDeleteChildren(avlauncher_cxt);
  
  		/* Make sure pgstat also considers our stat data as gone */
  		pgstat_clear_snapshot();
--- 432,442 ----
  		 * Now return to normal top-level context and clear ErrorContext for
  		 * next time.
  		 */
! 		MemoryContextSwitchTo(AutovacMemCxt);
  		FlushErrorState();
  
  		/* Flush any leaked data in the top-level context */
! 		MemoryContextResetAndDeleteChildren(AutovacMemCxt);
  
  		/* Make sure pgstat also considers our stat data as gone */
  		pgstat_clear_snapshot();
*************** AutoVacLauncherMain(int argc, char *argv
*** 361,378 ****
  	ereport(LOG,
  			(errmsg("autovacuum launcher started")));
  
  	PG_SETMASK(&UnBlockSig);
  
  	/*
! 	 * take a nap before executing the first iteration, unless we were
! 	 * requested an emergency run.
  	 */
! 	if (autovacuum_start_daemon)
! 		pg_usleep(autovacuum_naptime * 1000000L); 
  
  	for (;;)
  	{
! 		int		worker_pid;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
--- 457,488 ----
  	ereport(LOG,
  			(errmsg("autovacuum launcher started")));
  
+ 	/* must unblock signals before calling rebuild_database_list */
  	PG_SETMASK(&UnBlockSig);
  
+ 	/* in emergency mode, just start a worker and go away */
+ 	if (!autovacuum_start_daemon)
+ 	{
+ 		do_start_worker();
+ 		proc_exit(0);		/* done */
+ 	}
+ 
+ 	AutoVacuumShmem->av_launcherpid = MyProcPid;
+ 
  	/*
! 	 * Create the initial database list.  The invariant we want this list to
! 	 * keep is that it's ordered by decreasing next_time.  As soon as an entry is updated to
! 	 * a higher time, it will be moved to the front (which is correct because
! 	 * the only operation is to add autovacuum_naptime to the entry, and time
! 	 * always increases).
  	 */
! 	rebuild_database_list(InvalidOid);
  
  	for (;;)
  	{
! 		uint64		micros;
! 		bool	can_launch;
! 		TimestampTz current_time = 0;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
*************** AutoVacLauncherMain(int argc, char *argv
*** 381,386 ****
--- 491,503 ----
  		if (!PostmasterIsAlive(true))
  			exit(1);
  
+ 		micros = launcher_determine_sleep(AutoVacuumShmem->av_freeWorkers !=
+ 										  INVALID_OFFSET);
+ 
+ 		/* Sleep for a while according to schedule */
+ 		pg_usleep(micros);
+ 
+ 		/* the normal shutdown case */
  		if (avlauncher_shutdown_request)
  			break;
  
*************** AutoVacLauncherMain(int argc, char *argv
*** 388,469 ****
  		{
  			got_SIGHUP = false;
  			ProcessConfigFile(PGC_SIGHUP);
  		}
  
  		/*
! 		 * if there's a worker already running, sleep until it
! 		 * disappears.
  		 */
  		LWLockAcquire(AutovacuumLock, LW_SHARED);
- 		worker_pid = AutoVacuumShmem->worker_pid;
- 		LWLockRelease(AutovacuumLock);
  
! 		if (worker_pid != 0)
  		{
! 			PGPROC *proc = BackendPidGetProc(worker_pid);
  
! 			if (proc != NULL && proc->isAutovacuum)
! 				goto sleep;
! 			else
  			{
  				/*
! 				 * if the worker is not really running (or it's a process
! 				 * that's not an autovacuum worker), remove the PID from shmem.
! 				 * This should not happen, because either the worker exits
! 				 * cleanly, in which case it'll remove the PID, or it dies, in
! 				 * which case postmaster will cause a system reset cycle.
  				 */
! 				LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 				worker_pid = 0;
! 				LWLockRelease(AutovacuumLock);
  			}
  		}
  
! 		do_start_worker();
  
! sleep:
! 		/*
! 		 * in emergency mode, exit immediately so that the postmaster can
! 		 * request another run right away if needed.
! 		 *
! 		 * XXX -- maybe it would be better to handle this inside the launcher
! 		 * itself.
! 		 */
! 		if (!autovacuum_start_daemon)
! 			break;
  
! 		/* have pgstat read the file again next time */
! 		pgstat_clear_snapshot();
  
! 		/* now sleep until the next autovac iteration */
! 		pg_usleep(autovacuum_naptime * 1000000L); 
  	}
  
  	/* Normal exit from the autovac launcher is here */
  	ereport(LOG,
  			(errmsg("autovacuum launcher shutting down")));
  
  	proc_exit(0);		/* done */
  }
  
  /*
   * do_start_worker
   *
   * Bare-bones procedure for starting an autovacuum worker from the launcher.
   * It determines what database to work on, sets up shared memory stuff and
!  * signals postmaster to start the worker.
   */
! static void
  do_start_worker(void)
  {
  	List	   *dblist;
! 	bool		for_xid_wrap;
! 	autovac_dbase *db;
! 	ListCell *cell;
  	TransactionId xidForceLimit;
  
  	/* Get a list of databases */
! 	dblist = autovac_get_database_list();
  
  	/*
  	 * Determine the oldest datfrozenxid/relfrozenxid that we will allow
--- 505,927 ----
  		{
  			got_SIGHUP = false;
  			ProcessConfigFile(PGC_SIGHUP);
+ 
+ 			LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ 			autovac_balance_cost();
+ 			LWLockRelease(AutovacuumLock);
+ 		}
+ 
+ 		/* a worker started up or finished */
+ 		if (got_SIGUSR1)
+ 		{
+ 			got_SIGUSR1 = false;
+ 
+ 			/* rebalance cost limits, if needed */
+ 			if (AutoVacuumShmem->av_rebalance)
+ 			{
+ 				LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ 				autovac_balance_cost();
+ 				LWLockRelease(AutovacuumLock);
+ 			}
  		}
  
  		/*
! 		 * There are some conditions that we need to check before trying to
! 		 * start a launcher.  First, we need to make sure that there is a
! 		 * launcher slot available.  Second, we need to make sure that no other
! 		 * worker is still starting up.
  		 */
+ 
  		LWLockAcquire(AutovacuumLock, LW_SHARED);
  
! 		can_launch = (AutoVacuumShmem->av_freeWorkers != INVALID_OFFSET);
! 
! 		if (can_launch && AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
  		{
! 			long	secs;
! 			int		usecs;
! 			WorkerInfo worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
! 
! 			if (current_time == 0)
! 				current_time = GetCurrentTimestamp();
! 
! 			/*
! 			 * We can't launch another worker when another one is still
! 			 * starting up, so just sleep for a bit more; that worker will wake
! 			 * us up again as soon as it's ready.  We will only wait
! 			 * autovacuum_naptime seconds for this to happen however.  Note
! 			 * that failure to connect to a particular database is not a
! 			 * problem here, because the worker removes itself from the
! 			 * startingWorker pointer before trying to connect; only low-level
! 			 * problems, like fork() failure, can get us here.
! 			 */
! 			TimestampDifference(worker->wi_launchtime, current_time,
! 								&secs, &usecs);
  
! 			/* ignore microseconds, as they cannot make any difference */
! 			if (secs > autovacuum_naptime)
  			{
+ 				LWLockRelease(AutovacuumLock);
+ 				LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
  				/*
! 				 * No other process can put a worker in starting mode, so if
! 				 * startingWorker is still INVALID after exchanging our lock,
! 				 * we assume it's the same one we saw above (so we don't
! 				 * recheck the launch time).
  				 */
! 				if (AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
! 				{
! 					worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
! 					worker->wi_dboid = InvalidOid;
! 					worker->wi_tableoid = InvalidOid;
! 					worker->wi_workerpid = 0;
! 					worker->wi_launchtime = 0;
! 					worker->wi_links.next = AutoVacuumShmem->av_freeWorkers;
! 					AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(worker);
! 					AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
! 				}
  			}
+ 			else
+ 				can_launch = false;
  		}
+ 		LWLockRelease(AutovacuumLock);		/* either shared or exclusive */
  
! 		if (can_launch)
! 		{
! 			Dlelem	   *elem;
  
! 			elem = DLGetTail(DatabaseList);
  
! 			if (current_time == 0)
! 				current_time = GetCurrentTimestamp();
  
! 			if (elem != NULL)
! 			{
! 				avl_dbase *avdb = DLE_VAL(elem);
! 				long	secs;
! 				int		usecs;
! 
! 				TimestampDifference(current_time, avdb->adl_next_worker, &secs, &usecs);
! 
! 				/* do we have to start a worker? */
! 				if (secs <= 0 && usecs <= 0)
! 					launch_worker(current_time);
! 			}
! 			else
! 			{
! 				/*
! 				 * Special case when the list is empty: start a worker right
! 				 * away.  This covers the initial case, when no database is in
! 				 * pgstats (thus the list is empty).
! 				 */
! 				launch_worker(current_time);
! 			}
! 		}
  	}
  
  	/* Normal exit from the autovac launcher is here */
  	ereport(LOG,
  			(errmsg("autovacuum launcher shutting down")));
+ 	AutoVacuumShmem->av_launcherpid = 0;
  
  	proc_exit(0);		/* done */
  }
  
+ 
+ /*
+  * Determine the time to sleep, in microseconds, based on the database list.
+  *
+  * The "canlaunch" parameter indicates whether we can start a worker right now,
+  * for example due to the workers being all busy.
+  */
+ static uint64
+ launcher_determine_sleep(bool canlaunch)
+ {
+ 	long	secs;
+ 	int		usecs;
+ 	Dlelem *elem;
+ 
+ 	/*
+ 	 * We sleep until the next scheduled vacuum.  We trust that when the
+ 	 * database list was built, care was taken so that no entries have times in
+ 	 * the past; if the first entry has too close a next_worker value, or a
+ 	 * time in the past, we will sleep a small nominal time.
+ 	 */
+ 	if (!canlaunch)
+ 	{
+ 		secs = autovacuum_naptime;
+ 		usecs = 0;
+ 	}
+ 	else if ((elem = DLGetTail(DatabaseList)) != NULL)
+ 	{
+ 		avl_dbase  *avdb = DLE_VAL(elem);
+ 		TimestampTz	current_time = GetCurrentTimestamp();
+ 		TimestampTz	next_wakeup;
+ 
+ 		next_wakeup = avdb->adl_next_worker;
+ 		TimestampDifference(current_time, next_wakeup, &secs, &usecs);
+ 	}
+ 	else
+ 	{
+ 		/* list is empty, sleep for whole autovacuum_naptime seconds  */
+ 		secs = autovacuum_naptime;
+ 		usecs = 0;
+ 	}
+ 
+ 	/* 100ms is the smallest time we'll allow the launcher to sleep */
+ 	if (secs <= 0L && usecs <= 100000)
+ 	{
+ 		secs = 0L;
+ 		usecs = 100000;	/* 100 ms */
+ 	}
+ 
+ 	return secs * 1000000 + usecs;
+ }
+ 
+ /*
+  * Build an updated DatabaseList.  It must only contain databases that appear
+  * in pgstats, and must be sorted by next_worker from highest to lowest,
+  * distributed regularly across the next autovacuum_naptime interval.
+  *
+  * Receives the Oid of the database that made this list be generated (we call
+  * this the "new" database, because when the database was already present on
+  * the list, we expect that this function is not called at all).  The
+  * preexisting list, if any, will be used to preserve the order of the
+  * databases in the autovacuum_naptime period.  The new database is put at the
+  * end of the interval.  The actual values are not saved, which should not be
+  * much of a problem.
+  */
+ static void
+ rebuild_database_list(Oid newdb)
+ {
+ 	List	   *dblist;
+ 	ListCell   *cell;
+ 	MemoryContext newcxt;
+ 	MemoryContext oldcxt;
+ 	MemoryContext tmpcxt;
+ 	HASHCTL		hctl;
+ 	int			score;
+ 	int			nelems;
+ 	HTAB	   *dbhash;
+ 
+ 	/* use fresh stats */
+ 	pgstat_clear_snapshot();
+ 
+ 	newcxt = AllocSetContextCreate(AutovacMemCxt,
+ 								   "AV dblist",
+ 								   ALLOCSET_DEFAULT_MINSIZE,
+ 								   ALLOCSET_DEFAULT_INITSIZE,
+ 								   ALLOCSET_DEFAULT_MAXSIZE);
+ 	tmpcxt = AllocSetContextCreate(newcxt,
+ 								   "tmp AV dblist",
+ 								   ALLOCSET_DEFAULT_MINSIZE,
+ 								   ALLOCSET_DEFAULT_INITSIZE,
+ 								   ALLOCSET_DEFAULT_MAXSIZE);
+ 	oldcxt = MemoryContextSwitchTo(tmpcxt);
+ 
+ 	/*
+ 	 * Implementing this is not as simple as it sounds, because we need to put
+ 	 * the new database at the end of the list; next the databases that were
+ 	 * already on the list, and finally (at the tail of the list) all the other
+ 	 * databases that are not on the existing list.
+ 	 *
+ 	 * To do this, we build an empty hash table of scored databases.  We will
+ 	 * start with the lowest scor
---------------------------(end of broadcast)---------------------------
TIP 5: don't forget to increase your free space map settings

Reply via email to