On Fri, 2025-01-10 at 23:30 +1300, David Rowley wrote:
> Since bump.c does not add headers to the palloc'd chunks, I think the
> following code from hash_agg_entry_size() shouldn't be using
> CHUNKHDRSZ anymore.

Fixed.

I also tried to account for the power-of-two allocations for the
transition values. We don't do that in other places, but now that we
have the bump allocator which does not do that, it seems reasonable to
account for it here.

> I did some benchmarking using the attached script. There's a general
> speedup, but I saw some unexpected increase in the number of batches
> with the patched version on certain tests. See the attached results.
> For example, the work_mem = 8MB with 10 million rows shows "Batches:
> 129" on master but "Batches: 641" with the patched version. I didn't
> check why.

Somewhat counter-intuitively, HashAgg can use more batches when there
is more memory available. If already spilling, creating more small
batches is good, because it reduces the chances of recursing. The
limiting factor for creating a lot of tiny batches is that each
partition requires a logtape with its own write buffer, so if there's
more memory available, that allows creating more logtapes and a higher
partitioning fanout.

The runtimes I got running your tests are mixed. I'm still analyzing
whether test noise is a factor, or whether the increased number of
partitions is a factor. But any runtime regressions are minor in
comparison to the memory savings, so I think we are on the right track.

Attached v2.

Regards,
        Jeff Davis

From 35e86405da5dd920450c3d939be0f4c16a7dda24 Mon Sep 17 00:00:00 2001
From: Jeff Davis <j...@j-davis.com>
Date: Wed, 8 Jan 2025 11:46:35 -0800
Subject: [PATCH v2] HashAgg: use Bump allocator for hash TupleHashTable
 entries.

The entries aren't freed until the entire hash table is destroyed, so
use the Bump allocator to improve allocation speed, avoid wasting
space on the chunk header, and avoid wasting space due to the
power-of-two allocations.

Discussion: https://postgr.es/m/caaphdvqv1anb4cm36fzrwivxrevbo_lsg_eq3nqdxtjecaa...@mail.gmail.com
Reviewed-by: David Rowley
---
 src/backend/executor/execGrouping.c |  23 +++---
 src/backend/executor/nodeAgg.c      | 110 +++++++++++++++++++++++-----
 src/include/nodes/execnodes.h       |   5 +-
 3 files changed, 109 insertions(+), 29 deletions(-)

diff --git a/src/backend/executor/execGrouping.c b/src/backend/executor/execGrouping.c
index 07750253963..9be4963d06f 100644
--- a/src/backend/executor/execGrouping.c
+++ b/src/backend/executor/execGrouping.c
@@ -512,27 +512,32 @@ LookupTupleHashEntry_internal(TupleHashTable hashtable, TupleTableSlot *slot,
 		}
 		else
 		{
+			MinimalTuple mtup;
 			MinimalTuple firstTuple;
 			size_t totalsize; /* including alignment and additionalsize */
 
 			/* created new entry */
 			*isnew = true;
-			/* zero caller data */
-			MemoryContextSwitchTo(hashtable->tablecxt);
 
-			/* Copy the first tuple into the table context */
-			firstTuple = ExecCopySlotMinimalTuple(slot);
+			/*
+			 * Extract the minimal tuple into the temp context, then copy it
+			 * into the table context.
+			 */
+			MemoryContextSwitchTo(hashtable->tempcxt);
+			mtup = ExecCopySlotMinimalTuple(slot);
 
 			/*
-			 * Allocate additional space right after the MinimalTuple of size
-			 * additionalsize. The caller can get a pointer to this data with
-			 * TupleHashEntryGetAdditional(), and store arbitrary data there.
+			 * Allocate space for the MinimalTuple followed by empty space of
+			 * size additionalsize. The caller can get a maxaligned pointer to
+			 * this data with TupleHashEntryGetAdditional(), and store
+			 * arbitrary data there.
 			 *
 			 * This avoids the need to store an extra pointer or allocate an
 			 * additional chunk, which would waste memory.
 			 */
-			totalsize = MAXALIGN(firstTuple->t_len) + hashtable->additionalsize;
-			firstTuple = repalloc(firstTuple, totalsize);
+			totalsize = MAXALIGN(mtup->t_len) + hashtable->additionalsize;
+			firstTuple = MemoryContextAlloc(hashtable->tablecxt, totalsize);
+			memcpy(firstTuple, mtup, mtup->t_len);
 			memset((char *) firstTuple + firstTuple->t_len, 0,
 				   totalsize - firstTuple->t_len);
 
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 19640752967..43a90ed6f7a 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -405,6 +405,7 @@ static void build_hash_tables(AggState *aggstate);
 static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
 static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
 										  bool nullcheck);
+static void hash_create_memory(AggState *aggstate);
 static long hash_choose_num_buckets(double hashentrysize,
 									long ngroups, Size memory);
 static int	hash_choose_num_partitions(double input_groups,
@@ -1503,7 +1504,7 @@ build_hash_table(AggState *aggstate, int setno, long nbuckets)
 {
 	AggStatePerHash perhash = &aggstate->perhash[setno];
 	MemoryContext metacxt = aggstate->hash_metacxt;
-	MemoryContext hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory;
+	MemoryContext tablecxt = aggstate->hash_tablecxt;
 	MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory;
 	Size		additionalsize;
 
@@ -1529,7 +1530,7 @@ build_hash_table(AggState *aggstate, int setno, long nbuckets)
 											 nbuckets,
 											 additionalsize,
 											 metacxt,
-											 hashcxt,
+											 tablecxt,
 											 tmpcxt,
 											 DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
 }
@@ -1700,15 +1701,24 @@ hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
 							 tupleWidth);
 	Size		pergroupSize = numTrans * sizeof(AggStatePerGroupData);
 
-	tupleChunkSize = CHUNKHDRSZ + tupleSize;
-
-	if (pergroupSize > 0)
-		pergroupChunkSize = CHUNKHDRSZ + pergroupSize;
-	else
-		pergroupChunkSize = 0;
+	/*
+	 * Entries use the Bump allocator, so the chunk sizes are the same as the
+	 * requested sizes.
+	 */
+	tupleChunkSize = tupleSize;
+	pergroupChunkSize = pergroupSize;
 
+	/*
+	 * Transition values use AllocSet, which has a chunk header and also uses
+	 * power-of-two allocations.
+	 */
 	if (transitionSpace > 0)
-		transitionChunkSize = CHUNKHDRSZ + transitionSpace;
+	{
+		Size pow2 = (Size)1 << my_log2(transitionSpace);
+
+		/* use Max to protect against overflow */
+		transitionChunkSize = Max(CHUNKHDRSZ + pow2, transitionSpace);
+	}
 	else
 		transitionChunkSize = 0;
 
@@ -1858,15 +1868,18 @@ hash_agg_check_limits(AggState *aggstate)
 	uint64		ngroups = aggstate->hash_ngroups_current;
 	Size		meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt,
 													 true);
-	Size		hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory,
-														true);
+	Size		entry_mem = MemoryContextMemAllocated(aggstate->hash_tablecxt,
+													 true);
+	Size		tval_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory,
+													 true);
+	Size		total_mem = meta_mem + entry_mem + tval_mem;
 
 	/*
 	 * Don't spill unless there's at least one group in the hash table so we
 	 * can be sure to make progress even in edge cases.
 	 */
 	if (aggstate->hash_ngroups_current > 0 &&
-		(meta_mem + hashkey_mem > aggstate->hash_mem_limit ||
+		(total_mem > aggstate->hash_mem_limit ||
 		 ngroups > aggstate->hash_ngroups_limit))
 	{
 		hash_agg_enter_spill_mode(aggstate);
@@ -1917,6 +1930,7 @@ static void
 hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
 {
 	Size		meta_mem;
+	Size		entry_mem;
 	Size		hashkey_mem;
 	Size		buffer_mem;
 	Size		total_mem;
@@ -1928,7 +1942,10 @@ hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
 	/* memory for the hash table itself */
 	meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true);
 
-	/* memory for the group keys and transition states */
+	/* memory for hash entries */
+	entry_mem = MemoryContextMemAllocated(aggstate->hash_tablecxt, true);
+
+	/* memory for byref transition states */
 	hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, true);
 
 	/* memory for read/write tape buffers, if spilled */
@@ -1937,7 +1954,7 @@ hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
 		buffer_mem += HASHAGG_READ_BUFFER_SIZE;
 
 	/* update peak mem */
-	total_mem = meta_mem + hashkey_mem + buffer_mem;
+	total_mem = meta_mem + entry_mem + hashkey_mem + buffer_mem;
 	if (total_mem > aggstate->hash_mem_peak)
 		aggstate->hash_mem_peak = total_mem;
 
@@ -1959,6 +1976,58 @@ hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
 	}
 }
 
+/*
+ * Create memory contexts used for hash aggregation.
+ */
+static void
+hash_create_memory(AggState *aggstate)
+{
+	Size		minContextSize = ALLOCSET_DEFAULT_MINSIZE;
+	Size		initBlockSize = ALLOCSET_DEFAULT_INITSIZE;
+	Size		maxBlockSize = ALLOCSET_DEFAULT_MAXSIZE;
+
+	/*
+	 * The hashcontext's per-tuple memory will be used for byref transition
+	 * values and returned by AggCheckCallContext().
+	 */
+	aggstate->hashcontext = CreateWorkExprContext(aggstate->ss.ps.state);
+
+	/*
+	 * The meta context will be used for the bucket array of
+	 * TupleHashEntryData (or arrays, in the case of grouping sets). As the
+	 * hash table grows, the bucket array will double in size and the old one
+	 * will be freed, so an AllocSet is appropriate. For large bucket arrays,
+	 * the large allocation path will be used, so it's not worth worrying
+	 * about wasting space due to power-of-two allocations.
+	 */
+	aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
+												   "HashAgg meta context",
+												   ALLOCSET_DEFAULT_SIZES);
+
+	/*
+	 * The hash entries themselves, which include the grouping key
+	 * (firstTuple) and pergroup data, are stored in the table context. The
+	 * bump allocator can be used because the entries are not freed until the
+	 * entire hash table is reset. The bump allocator is faster for
+	 * allocations and avoids wasting space on the chunk header or
+	 * power-of-two allocations.
+	 *
+	 * Like CreateWorkExprContext(), use smaller sizings for smaller work_mem,
+	 * to avoid large jumps in memory usage.
+	 */
+	while (16 * maxBlockSize > work_mem * 1024L)
+		maxBlockSize >>= 1;
+
+	if (maxBlockSize < ALLOCSET_DEFAULT_INITSIZE)
+		maxBlockSize = ALLOCSET_DEFAULT_INITSIZE;
+
+	aggstate->hash_tablecxt = BumpContextCreate(aggstate->ss.ps.state->es_query_cxt,
+												"HashAgg table context",
+												minContextSize, initBlockSize,
+												maxBlockSize);
+
+}
+
 /*
  * Choose a reasonable number of buckets for the initial hash table size.
  */
@@ -2618,6 +2687,7 @@ agg_refill_hash_table(AggState *aggstate)
 
 	/* free memory and reset hash tables */
 	ReScanExprContext(aggstate->hashcontext);
+	MemoryContextReset(aggstate->hash_tablecxt);
 	for (int setno = 0; setno < aggstate->num_hashes; setno++)
 		ResetTupleHashTable(aggstate->perhash[setno].hashtable);
 
@@ -3285,7 +3355,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 	}
 
 	if (use_hashing)
-		aggstate->hashcontext = CreateWorkExprContext(estate);
+		hash_create_memory(aggstate);
 
 	ExecAssignExprContext(estate, &aggstate->ss.ps);
 
@@ -3580,9 +3650,6 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 		Plan	   *outerplan = outerPlan(node);
 		uint64		totalGroups = 0;
 
-		aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
-													   "HashAgg meta context",
-													   ALLOCSET_DEFAULT_SIZES);
 		aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc,
 															&TTSOpsMinimalTuple);
 		aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc,
@@ -4327,6 +4394,12 @@ ExecEndAgg(AggState *node)
 		MemoryContextDelete(node->hash_metacxt);
 		node->hash_metacxt = NULL;
 	}
+	if (node->hash_tablecxt != NULL)
+	{
+		MemoryContextDelete(node->hash_tablecxt);
+		node->hash_tablecxt = NULL;
+	}
+
 
 	for (transno = 0; transno < node->numtrans; transno++)
 	{
@@ -4443,6 +4516,7 @@ ExecReScanAgg(AggState *node)
 		node->hash_ngroups_current = 0;
 
 		ReScanExprContext(node->hashcontext);
+		MemoryContextReset(node->hash_tablecxt);
 		/* Rebuild an empty hash table */
 		build_hash_tables(node);
 		node->table_filled = false;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 7d5871d6fac..fcbdcebbebe 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2526,7 +2526,8 @@ typedef struct AggState
 	/* these fields are used in AGG_HASHED and AGG_MIXED modes: */
 	bool		table_filled;	/* hash table filled yet? */
 	int			num_hashes;
-	MemoryContext hash_metacxt; /* memory for hash table itself */
+	MemoryContext hash_metacxt; /* memory for hash table bucket array */
+	MemoryContext hash_tablecxt; /* memory for hash table entries */
 	struct LogicalTapeSet *hash_tapeset;	/* tape set for hash spill tapes */
 	struct HashAggSpill *hash_spills;	/* HashAggSpill for each grouping set,
 										 * exists only during first pass */
@@ -2552,7 +2553,7 @@ typedef struct AggState
 										 * per-group pointers */
 
 	/* support for evaluation of agg input expressions: */
-#define FIELDNO_AGGSTATE_ALL_PERGROUPS 53
+#define FIELDNO_AGGSTATE_ALL_PERGROUPS 54
 	AggStatePerGroup *all_pergroups;	/* array of first ->pergroups, than
 										 * ->hash_pergroup */
 	SharedAggInfo *shared_info; /* one entry per worker */
-- 
2.34.1

Reply via email to