From 3346ed95bff15607e90a69741b0fcc5b90aff9c9 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.kumar@enterprisedb.com>
Date: Wed, 23 Oct 2019 10:46:49 +0530
Subject: [PATCH v1] divide vacuum cost limit

---
 src/backend/access/heap/vacuumlazy.c | 119 ++++++++++++++++++++++++++++++++++-
 1 file changed, 118 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 2faa4e9..73b6af7 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -137,6 +137,7 @@
 #define PARALLEL_VACUUM_KEY_SHARED			1
 #define PARALLEL_VACUUM_KEY_DEAD_TUPLES		2
 #define PARALLEL_VACUUM_KEY_QUERY_TEXT		3
+#define PARALLEL_VACUUM_KEY_COST_BALANCE	4
 
 /*
  * PARALLEL_VACUUM_DISABLE_LEADER_PARTICIPATION disables the leader's
@@ -227,6 +228,14 @@ typedef struct LVShared
 } LVShared;
 #define SizeOfLVShared offsetof(LVShared, indstats) + sizeof(LVSharedIndStats)
 
+typedef struct LVCostBalance
+{
+	pg_atomic_uint32	nslot;
+	int		nworkers;
+	int		vaccostbalance[FLEXIBLE_ARRAY_MEMBER];
+} LVCostBalance;
+#define SizeOfLVCostBalance offsetof(LVCostBalance, vaccostbalance) + sizeof(int)
+
 /* Struct for parallel lazy vacuum */
 typedef struct LVParallelState
 {
@@ -235,6 +244,8 @@ typedef struct LVParallelState
 	/* Shared information among parallel vacuum workers */
 	LVShared		*lvshared;
 
+	/* Shared cost balance. */
+	LVCostBalance	*lvcostbalance;
 	/*
 	 * Always true except for a debugging case where
 	 * PARALLEL_VACUUM_DISABLE_LEADER_PARTICIPATION are defined.
@@ -1927,6 +1938,31 @@ lazy_check_needs_freeze(Buffer buf, bool *hastup)
 	return false;
 }
 
+static void
+compute_cost_balance(LVParallelState *lps)
+{
+	int i;
+
+	/*
+	 * Share the estimated worker counts so that each worker can compute their
+	 * cost limit.  Include the leader if it is participating in the index
+	 * vacuum phase.
+	 * XXX: Actual worker launched might be lesser than the estimated worker so
+	 * in that case each worker might operate with less vacuum cost limit.
+	 */
+	lps->lvcostbalance->nworkers = lps->pcxt->nworkers;
+	if (lps->leaderparticipates)
+		lps->lvcostbalance->nworkers += 1;
+
+	/*
+	 * Divide the current cost balance among the worker so that we don't loose
+	 * accounting of the I/O balance so far.
+	 */
+	for (i = 0; i < lps->pcxt->nworkers; i++)
+		lps->lvcostbalance->vaccostbalance[i] =
+				VacuumCostBalance / lps->lvcostbalance->nworkers;
+}
+
 /*
  * Vacuuming indexes with parallel vacuum workers. This function must be used
  * by the parallel vacuum leader process.
@@ -1936,6 +1972,8 @@ lazy_parallel_vacuum_indexes(LVRelStats *vacrelstats, Relation *Irel,
 							 int nindexes, IndexBulkDeleteResult **stats,
 							 LVParallelState *lps)
 {
+	int i;
+	
 	Assert(!IsParallelWorker());
 	Assert(ParallelVacuumIsActive(lps));
 	Assert(nindexes > 0);
@@ -1950,6 +1988,9 @@ lazy_parallel_vacuum_indexes(LVRelStats *vacrelstats, Relation *Irel,
 	lps->lvshared->reltuples = vacrelstats->old_live_tuples;
 	lps->lvshared->estimated_count = true;
 
+	/* Compute cost balance for the workers. */
+	compute_cost_balance(lps);
+
 	LaunchParallelWorkers(lps->pcxt);
 
 	ereport(elevel,
@@ -1963,14 +2004,38 @@ lazy_parallel_vacuum_indexes(LVRelStats *vacrelstats, Relation *Irel,
 	 * does that in case where no workers launched.
 	 */
 	if (lps->leaderparticipates || lps->pcxt->nworkers_launched == 0)
+	{
+		int base_cost_limit = VacuumCostLimit;
+
+		/*
+		 * If leader is participating and we have launched the parallel workers
+		 * then compute the leaders share of the cost limit and cost balance.
+		 */
+		if (lps->pcxt->nworkers_launched > 0)
+		{
+			VacuumCostLimit /= lps->lvcostbalance->nworkers;
+			VacuumCostBalance /= lps->lvcostbalance->nworkers;
+		}
+
 		vacuum_or_cleanup_indexes_worker(Irel, nindexes, stats, lps->lvshared,
 										 vacrelstats->dead_tuples);
+		VacuumCostLimit = base_cost_limit;
+	}
 
 	/* Wait for all vacuum workers to finish */
 	WaitForParallelWorkersToFinish(lps->pcxt);
 
 	/* Reset the processing count */
 	pg_atomic_write_u32(&(lps->lvshared->nprocessed), 0);
+	pg_atomic_write_u32(&(lps->lvcostbalance->nslot), 0);
+
+	/*
+	 * Index vacuuming phase is complete, so collect the remaining balance from
+	 * all the worker and add to the current balance of the leader.  So that we
+	 * don't loose the accounting for the extra I/O balance of the workers.
+	 */
+	for (i = 0; i < lps->pcxt->nworkers_launched; i++)
+		VacuumCostBalance += lps->lvcostbalance->vaccostbalance[i];
 
 	/*
 	 * Reinitialize the parallel context to relaunch parallel workers
@@ -1988,6 +2053,8 @@ lazy_parallel_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel,
 									   int nindexes, IndexBulkDeleteResult **stats,
 									   LVParallelState *lps)
 {
+	int i;
+
 	Assert(!IsParallelWorker());
 	Assert(ParallelVacuumIsActive(lps));
 	Assert(nindexes > 0);
@@ -2004,6 +2071,9 @@ lazy_parallel_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel,
 	lps->lvshared->estimated_count =
 		(vacrelstats->tupcount_pages < vacrelstats->rel_pages);
 
+	/* Compute cost balance for the workers. */
+	compute_cost_balance(lps);
+
 	LaunchParallelWorkers(lps->pcxt);
 
 	ereport(elevel,
@@ -2017,12 +2087,34 @@ lazy_parallel_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel,
 	 * that in case where no workers launched.
 	 */
 	if (lps->leaderparticipates || lps->pcxt->nworkers_launched == 0)
+	{
+		int base_cost_limit = VacuumCostLimit;
+
+		/*
+		 * If leader is participating and we have launched the parallel workers
+		 * then compute the leaders share of the cost limit and cost balance.
+		 */
+		if (lps->pcxt->nworkers_launched > 0)
+		{
+			VacuumCostLimit /= lps->lvcostbalance->nworkers;
+			VacuumCostBalance /= lps->lvcostbalance->nworkers;
+		}
+
 		vacuum_or_cleanup_indexes_worker(Irel, nindexes, stats, lps->lvshared,
 										 vacrelstats->dead_tuples);
+		VacuumCostLimit = base_cost_limit;
+	}
 
 	/* Wait for all vacuum workers to finish */
 	WaitForParallelWorkersToFinish(lps->pcxt);
 
+	/*
+	 * Index vacuuming phase is complete, so collect the remaining balance from
+	 * all the worker and add to the current balance of the leader.  So that we
+	 * don't loose the accounting for the extra I/O balance of the workers.
+	 */	
+	for (i = 0; i < lps->pcxt->nworkers_launched; i++)
+		VacuumCostBalance += lps->lvcostbalance->vaccostbalance[i];
 
 	/*
 	 * We don't need to reinitialize the parallel context unlike parallel index
@@ -2937,10 +3029,12 @@ begin_parallel_vacuum(LVRelStats *vacrelstats, Oid relid, BlockNumber nblocks,
 	LVShared	*shared;
 	ParallelContext *pcxt;
 	LVDeadTuples	*tidmap;
+	LVCostBalance	*costbalance;
 	long	maxtuples;
 	char	*sharedquery;
 	Size	est_shared;
 	Size	est_deadtuples;
+	Size	est_costbalance;
 	int		querylen;
 	int		i;
 
@@ -3019,6 +3113,14 @@ begin_parallel_vacuum(LVRelStats *vacrelstats, Oid relid, BlockNumber nblocks,
 	memcpy(sharedquery, debug_query_string, querylen + 1);
 	sharedquery[querylen] = '\0';
 	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
+	
+	/* Vacuum cost balance. */
+	est_costbalance = MAXALIGN(add_size(SizeOfLVCostBalance,
+								   mul_size(sizeof(int), nrequested)));	
+	costbalance = (LVCostBalance *) shm_toc_allocate(pcxt->toc, est_costbalance);
+	pg_atomic_init_u32(&(costbalance->nslot), 0);
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_COST_BALANCE, costbalance);
+	lps->lvcostbalance = costbalance;
 
 	return lps;
 }
@@ -3084,8 +3186,10 @@ heap_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	Relation	*indrels;
 	LVShared	*lvshared;
 	LVDeadTuples	*dead_tuples;
+	LVCostBalance	*costbalance;	
 	int			nindexes;
 	char		*sharedquery;
+	int			slot;
 	IndexBulkDeleteResult **stats;
 
 	lvshared = (LVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED,
@@ -3118,6 +3222,11 @@ heap_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	dead_tuples = (LVDeadTuples *) shm_toc_lookup(toc,
 												  PARALLEL_VACUUM_KEY_DEAD_TUPLES,
 												  false);
+	
+	costbalance = (LVCostBalance *) shm_toc_lookup(toc,
+												   PARALLEL_VACUUM_KEY_COST_BALANCE,
+												   false);
+	slot = pg_atomic_fetch_add_u32(&(costbalance->nslot), 1);
 
 	/* Set cost-based vacuum delay */
 	VacuumCostActive = (VacuumCostDelay > 0);
@@ -3126,13 +3235,21 @@ heap_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	VacuumPageMiss = 0;
 	VacuumPageDirty = 0;
 
+	/* Compute the vacuum cost limit for the worker. */
+	VacuumCostLimit = VacuumCostLimit / costbalance->nworkers;
+	VacuumCostBalance = costbalance->vaccostbalance[slot];
+
 	stats = (IndexBulkDeleteResult **)
 		palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
 
 	/* Do either vacuuming indexes or cleaning indexes */
 	vacuum_or_cleanup_indexes_worker(indrels, nindexes, stats, lvshared,
 									 dead_tuples);
-
+	/*
+	 * Share the remaining balance with the leader so that we don't loose
+	 * accounting for the same.
+	 */
+	costbalance->vaccostbalance[slot] = VacuumCostBalance;
 	vac_close_indexes(nindexes, indrels, RowExclusiveLock);
 	table_close(onerel, ShareUpdateExclusiveLock);
 }
-- 
1.8.3.1

