Hi,

On Fri, 21 Apr 2023 16:44:48 -0400
Melanie Plageman <melanieplage...@gmail.com> wrote:

> On Fri, Apr 7, 2023 at 8:01 PM Jehan-Guillaume de Rorthais
> <j...@dalibo.com> wrote:
> >
> > On Fri, 31 Mar 2023 14:06:11 +0200
> > Jehan-Guillaume de Rorthais <j...@dalibo.com> wrote:
> >  
> > > > [...]  
> > > > >> Hmmm, not sure is WARNING is a good approach, but I don't have a
> > > > >> better idea at the moment.  
> > > > >
> > > > > I stepped it down to NOTICE and added some more infos.
> > > > >
> > > > > [...]
> > > > >   NOTICE:  Growing number of hash batch to 32768 is exhausting allowed
> > > > > memory (137494656 > 2097152)  
> > > > [...]
> > > >
> > > > OK, although NOTICE that may actually make it less useful - the default
> > > > level is WARNING, and regular users are unable to change the level. So
> > > > very few people will actually see these messages.  
> > [...]  
> > > Anyway, maybe this should be added in the light of next patch, balancing
> > > between increasing batches and allowed memory. The WARNING/LOG/NOTICE
> > > message could appears when we actually break memory rules because of some
> > > bad HJ situation.  
> >
> > So I did some more minor editions to the memory context patch and start
> > working on the balancing memory patch. Please, find in attachment the v4
> > patch set:
> >
> > * 0001-v4-Describe-hybrid-hash-join-implementation.patch:
> >   Adds documentation written by Melanie few years ago
> > * 0002-v4-Allocate-hash-batches-related-BufFile-in-a-dedicated.patch:
> >   The batches' BufFile dedicated memory context patch  
> 
> This is only a review of the code in patch 0002 (the patch to use a more
> granular memory context for multi-batch hash join batch files). I have
> not reviewed the changes to comments in detail either.

Ok.

> I think the biggest change that is needed is to implement this memory
> context usage for parallel hash join.

Indeed. 

> To implement a file buffer memory context for multi-patch parallel hash
> join [...]

Thank you for your review and pointers!

After (some days off and then) studying the parallel code, I end up with this:

1. As explained by Melanie, the v5 patch sets accessor->context to fileCxt.

2. BufFile buffers were wrongly allocated in ExecutorState context for
   accessor->read|write_file, from sts_puttuple and sts_parallel_scan_next when
   they first need to work with the accessor FileSet.

   The v5 patch now allocate them in accessor->context, directly in
   sts_puttuple and sts_parallel_scan_next. This avoids to wrap each single
   call of these functions inside MemoryContextSwitchTo calls. I suppose this
   is correct as the comment about accessor->context says 
   "/* Memory context for **buffers**. */" in struct SharedTuplestoreAccessor.

3. accessor->write_chunk is currently already allocated in accessor->context.

   In consequence, this buffer is now allocated in the fileCxt instead
   of hashCxt context. 

   This is a bit far fetched, but I suppose this is ok as it act as a second
   level buffer, on top of the BufFile.

4. accessor->read_buffer is currently allocated in accessor->context as well.

   This buffer holds tuple read from the fileset. This is still a buffer, but
   not related to any file anymore...

Because of 3 and 4, and the gross context granularity of SharedTuplestore
related-code, I'm now wondering if "fileCxt" shouldn't be renamed "bufCxt".

Regards,
>From acaa94fab35ab966366a29a092780e54a68de0f7 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Thu, 30 Apr 2020 07:16:28 -0700
Subject: [PATCH 1/3] Describe hybrid hash join implementation

This is just a draft to spark conversation on what a good comment might
be like in this file on how the hybrid hash join algorithm is
implemented in Postgres. I'm pretty sure this is the accepted term for
this algorithm https://en.wikipedia.org/wiki/Hash_join#Hybrid_hash_join
---
 src/backend/executor/nodeHashjoin.c | 36 +++++++++++++++++++++++++++++
 1 file changed, 36 insertions(+)

diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 0a3f32f731..920d1831c2 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -10,6 +10,42 @@
  * IDENTIFICATION
  *	  src/backend/executor/nodeHashjoin.c
  *
+ *   HYBRID HASH JOIN
+ *
+ *  If the inner side tuples of a hash join do not fit in memory, the hash join
+ *  can be executed in multiple batches.
+ *
+ *  If the statistics on the inner side relation are accurate, planner chooses a
+ *  multi-batch strategy and estimates the number of batches.
+ *
+ *  The query executor measures the real size of the hashtable and increases the
+ *  number of batches if the hashtable grows too large.
+ *
+ *  The number of batches is always a power of two, so an increase in the number
+ *  of batches doubles it.
+ *
+ *  Serial hash join measures batch size lazily -- waiting until it is loading a
+ *  batch to determine if it will fit in memory. While inserting tuples into the
+ *  hashtable, serial hash join will, if that tuple were to exceed work_mem,
+ *  dump out the hashtable and reassign them either to other batch files or the
+ *  current batch resident in the hashtable.
+ *
+ *  Parallel hash join, on the other hand, completes all changes to the number
+ *  of batches during the build phase. If it increases the number of batches, it
+ *  dumps out all the tuples from all batches and reassigns them to entirely new
+ *  batch files. Then it checks every batch to ensure it will fit in the space
+ *  budget for the query.
+ *
+ *  In both parallel and serial hash join, the executor currently makes a best
+ *  effort. If a particular batch will not fit in memory, it tries doubling the
+ *  number of batches. If after a batch increase, there is a batch which
+ *  retained all or none of its tuples, the executor disables growth in the
+ *  number of batches globally. After growth is disabled, all batches that would
+ *  have previously triggered an increase in the number of batches instead
+ *  exceed the space allowed.
+ *
+ *  TODO: should we discuss that tuples can only spill forward?
+ *
  * PARALLELISM
  *
  * Hash joins can participate in parallel query execution in several ways.  A
-- 
2.39.2

>From c5ed2ae2c2749af4f5058b012dc5e8a9e1529127 Mon Sep 17 00:00:00 2001
From: Jehan-Guillaume de Rorthais <j...@dalibo.com>
Date: Mon, 27 Mar 2023 15:54:39 +0200
Subject: [PATCH 2/3] Allocate hash batches related BufFile in a dedicated
 context

---
 src/backend/executor/nodeHash.c           | 43 ++++++++++++++++-------
 src/backend/executor/nodeHashjoin.c       | 18 +++++++---
 src/backend/utils/sort/sharedtuplestore.c |  8 +++++
 src/include/executor/hashjoin.h           | 15 ++++++--
 src/include/executor/nodeHashjoin.h       |  2 +-
 5 files changed, 64 insertions(+), 22 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 5fd1c5553b..a4fbf29301 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -484,7 +484,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 	 *
 	 * The hashtable control block is just palloc'd from the executor's
 	 * per-query memory context.  Everything else should be kept inside the
-	 * subsidiary hashCxt or batchCxt.
+	 * subsidiary hashCxt, batchCxt or fileCxt.
 	 */
 	hashtable = palloc_object(HashJoinTableData);
 	hashtable->nbuckets = nbuckets;
@@ -538,6 +538,10 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 												"HashBatchContext",
 												ALLOCSET_DEFAULT_SIZES);
 
+	hashtable->fileCxt = AllocSetContextCreate(hashtable->hashCxt,
+								 "HashBatchFiles",
+								 ALLOCSET_DEFAULT_SIZES);
+
 	/* Allocate data that will live for the life of the hashjoin */
 
 	oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
@@ -570,15 +574,21 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 
 	if (nbatch > 1 && hashtable->parallel_state == NULL)
 	{
+		MemoryContext oldctx;
+
 		/*
 		 * allocate and initialize the file arrays in hashCxt (not needed for
 		 * parallel case which uses shared tuplestores instead of raw files)
 		 */
+		oldctx = MemoryContextSwitchTo(hashtable->fileCxt);
+
 		hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
 		hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
 		/* The files will not be opened until needed... */
 		/* ... but make sure we have temp tablespaces established for them */
 		PrepareTempTablespaces();
+
+		MemoryContextSwitchTo(oldctx);
 	}
 
 	MemoryContextSwitchTo(oldcxt);
@@ -913,7 +923,6 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	int			oldnbatch = hashtable->nbatch;
 	int			curbatch = hashtable->curbatch;
 	int			nbatch;
-	MemoryContext oldcxt;
 	long		ninmemory;
 	long		nfreed;
 	HashMemoryChunk oldchunks;
@@ -934,13 +943,16 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 		   hashtable, nbatch, hashtable->spaceUsed);
 #endif
 
-	oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
-
 	if (hashtable->innerBatchFile == NULL)
 	{
+		MemoryContext oldcxt = MemoryContextSwitchTo(hashtable->fileCxt);
+
 		/* we had no file arrays before */
 		hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
 		hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
+
+		MemoryContextSwitchTo(oldcxt);
+
 		/* time to establish the temp tablespaces, too */
 		PrepareTempTablespaces();
 	}
@@ -951,8 +963,6 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 		hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch, nbatch);
 	}
 
-	MemoryContextSwitchTo(oldcxt);
-
 	hashtable->nbatch = nbatch;
 
 	/*
@@ -1022,9 +1032,11 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 			{
 				/* dump it out */
 				Assert(batchno > curbatch);
+
 				ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
 									  hashTuple->hashvalue,
-									  &hashtable->innerBatchFile[batchno]);
+									  &hashtable->innerBatchFile[batchno],
+									  hashtable->fileCxt);
 
 				hashtable->spaceUsed -= hashTupleSize;
 				nfreed++;
@@ -1681,9 +1693,11 @@ ExecHashTableInsert(HashJoinTable hashtable,
 		 * put the tuple into a temp file for later batches
 		 */
 		Assert(batchno > hashtable->curbatch);
+
 		ExecHashJoinSaveTuple(tuple,
 							  hashvalue,
-							  &hashtable->innerBatchFile[batchno]);
+							  &hashtable->innerBatchFile[batchno],
+							  hashtable->fileCxt);
 	}
 
 	if (shouldFree)
@@ -2663,8 +2677,11 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
 		{
 			/* Put the tuple into a temp file for later batches */
 			Assert(batchno > hashtable->curbatch);
+
 			ExecHashJoinSaveTuple(tuple, hashvalue,
-								  &hashtable->innerBatchFile[batchno]);
+								  &hashtable->innerBatchFile[batchno],
+								  hashtable->fileCxt);
+
 			pfree(hashTuple);
 			hashtable->spaceUsed -= tupleSize;
 			hashtable->spaceUsedSkew -= tupleSize;
@@ -3093,8 +3110,8 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
 	pstate->nbatch = nbatch;
 	batches = dsa_get_address(hashtable->area, pstate->batches);
 
-	/* Use hash join memory context. */
-	oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
+	/* Use hash join file memory context. */
+	oldcxt = MemoryContextSwitchTo(hashtable->fileCxt);
 
 	/* Allocate this backend's accessor array. */
 	hashtable->nbatch = nbatch;
@@ -3196,8 +3213,8 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
 	 */
 	Assert(DsaPointerIsValid(pstate->batches));
 
-	/* Use hash join memory context. */
-	oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
+	/* Use hash join file memory context. */
+	oldcxt = MemoryContextSwitchTo(hashtable->fileCxt);
 
 	/* Allocate this backend's accessor array. */
 	hashtable->nbatch = pstate->nbatch;
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 920d1831c2..ac72fbfbb6 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -485,8 +485,10 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 					 */
 					Assert(parallel_state == NULL);
 					Assert(batchno > hashtable->curbatch);
+
 					ExecHashJoinSaveTuple(mintuple, hashvalue,
-										  &hashtable->outerBatchFile[batchno]);
+										  &hashtable->outerBatchFile[batchno],
+										  hashtable->fileCxt);
 
 					if (shouldFree)
 						heap_free_minimal_tuple(mintuple);
@@ -1308,21 +1310,27 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
  * The data recorded in the file for each tuple is its hash value,
  * then the tuple in MinimalTuple format.
  *
- * Note: it is important always to call this in the regular executor
- * context, not in a shorter-lived context; else the temp file buffers
- * will get messed up.
+ * Note: it is important always to call this in the HashBatchFiles context,
+ * not in a shorter-lived context; else the temp file buffers will get messed
+ * up.
  */
 void
 ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
-					  BufFile **fileptr)
+					  BufFile **fileptr, MemoryContext filecxt)
 {
 	BufFile    *file = *fileptr;
 
 	if (file == NULL)
 	{
+		MemoryContext oldctx;
+
+		oldctx = MemoryContextSwitchTo(filecxt);
+
 		/* First write to this batch file, so open it. */
 		file = BufFileCreateTemp(false);
 		*fileptr = file;
+
+		MemoryContextSwitchTo(oldctx);
 	}
 
 	BufFileWrite(file, &hashvalue, sizeof(uint32));
diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c
index 0831249159..236be65f22 100644
--- a/src/backend/utils/sort/sharedtuplestore.c
+++ b/src/backend/utils/sort/sharedtuplestore.c
@@ -308,11 +308,15 @@ sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
 	{
 		SharedTuplestoreParticipant *participant;
 		char		name[MAXPGPATH];
+		MemoryContext oldcxt;
 
 		/* Create one.  Only this backend will write into it. */
 		sts_filename(name, accessor, accessor->participant);
+
+		oldcxt = MemoryContextSwitchTo(accessor->context);
 		accessor->write_file =
 			BufFileCreateFileSet(&accessor->fileset->fs, name);
+		MemoryContextSwitchTo(oldcxt);
 
 		/* Set up the shared state for this backend's file. */
 		participant = &accessor->sts->participants[accessor->participant];
@@ -527,11 +531,15 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
 			if (accessor->read_file == NULL)
 			{
 				char		name[MAXPGPATH];
+				MemoryContext oldcxt;
 
 				sts_filename(name, accessor, accessor->read_participant);
+
+				oldcxt = MemoryContextSwitchTo(accessor->context);
 				accessor->read_file =
 					BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
 									   false);
+				MemoryContextSwitchTo(oldcxt);
 			}
 
 			/* Seek and load the chunk header. */
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 8ee59d2c71..74867c3e40 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -25,10 +25,14 @@
  *
  * Each active hashjoin has a HashJoinTable control block, which is
  * palloc'd in the executor's per-query context.  All other storage needed
- * for the hashjoin is kept in private memory contexts, two for each hashjoin.
+ * for the hashjoin is kept in private memory contexts, three for each
+ * hashjoin:
+ * - HashTableContext (hashCxt): the control block associated to the hash table
+ * - HashBatchContext (batchCxt): storages for batches
+ * - HashBatchFiles (fileCxt): storage for temp files buffers
+ *
  * This makes it easy and fast to release the storage when we don't need it
- * anymore.  (Exception: data associated with the temp files lives in the
- * per-query context too, since we always call buffile.c in that context.)
+ * anymore.
  *
  * The hashtable contexts are made children of the per-query context, ensuring
  * that they will be discarded at end of statement even if the join is
@@ -39,6 +43,10 @@
  * "hashCxt", while storage that is only wanted for the current batch is
  * allocated in the "batchCxt".  By resetting the batchCxt at the end of
  * each batch, we free all the per-batch storage reliably and without tedium.
+ * Note that data associated with the temp files lives in the "fileCxt" context
+ * which lives during the entire join as temp files might need to survives
+ * batches. These files are explicitly destroyed by calling BufFileClose()
+ * when the code is done with them.
  *
  * During first scan of inner relation, we get its tuples from executor.
  * If nbatch > 1 then tuples that don't belong in first batch get saved
@@ -350,6 +358,7 @@ typedef struct HashJoinTableData
 
 	MemoryContext hashCxt;		/* context for whole-hash-join storage */
 	MemoryContext batchCxt;		/* context for this-batch-only storage */
+	MemoryContext fileCxt;		/* context for the BufFile related storage */
 
 	/* used for dense allocation of tuples (into linked chunks) */
 	HashMemoryChunk chunks;		/* one list for the whole batch */
diff --git a/src/include/executor/nodeHashjoin.h b/src/include/executor/nodeHashjoin.h
index d367070883..a8f9ae1989 100644
--- a/src/include/executor/nodeHashjoin.h
+++ b/src/include/executor/nodeHashjoin.h
@@ -29,6 +29,6 @@ extern void ExecHashJoinInitializeWorker(HashJoinState *state,
 										 ParallelWorkerContext *pwcxt);
 
 extern void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
-								  BufFile **fileptr);
+								  BufFile **fileptr, MemoryContext filecxt);
 
 #endif							/* NODEHASHJOIN_H */
-- 
2.39.2

Reply via email to