On Tue, 28 Mar 2023 00:43:34 +0200
Tomas Vondra <tomas.von...@enterprisedb.com> wrote:

> On 3/27/23 23:13, Jehan-Guillaume de Rorthais wrote:
> > Please, find in attachment a patch to allocate bufFiles in a dedicated
> > context. I picked up your patch, backpatch'd it, went through it and did
> > some minor changes to it. I have some comment/questions thought.
> > 
> >   1. I'm not sure why we must allocate the "HashBatchFiles" new context
> >   under ExecutorState and not under hashtable->hashCxt?
> > 
> >   The only references I could find was in hashjoin.h:30:
> > 
> >    /* [...]
> >     * [...] (Exception: data associated with the temp files lives in the
> >     * per-query context too, since we always call buffile.c in that
> > context.)
> > 
> >   And in nodeHashjoin.c:1243:ExecHashJoinSaveTuple() (I reworded this
> >   original comment in the patch):
> > 
> >    /* [...]
> >     * Note: it is important always to call this in the regular executor
> >     * context, not in a shorter-lived context; else the temp file buffers
> >     * will get messed up.
> > 
> > 
> >   But these are not explanation of why BufFile related allocations must be
> > under a per-query context. 
> >   
> 
> Doesn't that simply describe the current (unpatched) behavior where
> BufFile is allocated in the per-query context? 

I wasn't sure. The first quote from hashjoin.h seems to describe a stronger
rule about «**always** call buffile.c in per-query context». But maybe it ought
to be «always call buffile.c from one of the sub-query context»? I assume the
aim is to enforce the tmp files removal on query end/error?

> I mean, the current code calls BufFileCreateTemp() without switching the
> context, so it's in the ExecutorState. But with the patch it very clearly is
> not.
> 
> And I'm pretty sure the patch should do
> 
>     hashtable->fileCxt = AllocSetContextCreate(hashtable->hashCxt,
>                                          "HashBatchFiles",
>                                          ALLOCSET_DEFAULT_SIZES);
> 
> and it'd still work. Or why do you think we *must* allocate it under
> ExecutorState?

That was actually my very first patch and it indeed worked. But I was confused
about the previous quoted code comments. That's why I kept your original code
and decided to rise the discussion here.

Fixed in new patch in attachment.

> FWIW The comment in hashjoin.h needs updating to reflect the change.

Done in the last patch. Is my rewording accurate?

> >   2. Wrapping each call of ExecHashJoinSaveTuple() with a memory context
> > switch seems fragile as it could be forgotten in futur code path/changes.
> > So I added an Assert() in the function to make sure the current memory
> > context is "HashBatchFiles" as expected.
> >   Another way to tie this up might be to pass the memory context as
> > argument to the function.
> >   ... Or maybe I'm over precautionary.
> >   
> 
> I'm not sure I'd call that fragile, we have plenty other code that
> expects the memory context to be set correctly. Not sure about the
> assert, but we don't have similar asserts anywhere else.

I mostly sticked it there to stimulate the discussion around this as I needed
to scratch that itch.

> But I think it's just ugly and overly verbose

+1

Your patch was just a demo/debug patch by the time. It needed some cleanup now
:)

> it'd be much nicer to e.g. pass the memory context as a parameter, and do
> the switch inside.

That was a proposition in my previous mail, so I did it in the new patch. Let's
see what other reviewers think.

> >   3. You wrote:
> >   
> >>> A separate BufFile memory context helps, although people won't see it
> >>> unless they attach a debugger, I think. Better than nothing, but I was
> >>> wondering if we could maybe print some warnings when the number of batch
> >>> files gets too high ...  
> > 
> >   So I added a WARNING when batches memory are exhausting the memory size
> >   allowed.
> > 
> >    +   if (hashtable->fileCxt->mem_allocated > hashtable->spaceAllowed)
> >    +       elog(WARNING, "Growing number of hash batch is exhausting
> > memory");
> > 
> >   This is repeated on each call of ExecHashIncreaseNumBatches when BufFile
> >   overflows the memory budget. I realize now I should probably add the
> > memory limit, the number of current batch and their memory consumption.
> >   The message is probably too cryptic for a user. It could probably be
> >   reworded, but some doc or additionnal hint around this message might help.
> >   
> 
> Hmmm, not sure is WARNING is a good approach, but I don't have a better
> idea at the moment.

I stepped it down to NOTICE and added some more infos.

Here is the output of the last patch with a 1MB work_mem:

  =# explain analyze select * from small join large using (id);
  WARNING:  increasing number of batches from 1 to 2
  WARNING:  increasing number of batches from 2 to 4
  WARNING:  increasing number of batches from 4 to 8
  WARNING:  increasing number of batches from 8 to 16
  WARNING:  increasing number of batches from 16 to 32
  WARNING:  increasing number of batches from 32 to 64
  WARNING:  increasing number of batches from 64 to 128
  WARNING:  increasing number of batches from 128 to 256
  WARNING:  increasing number of batches from 256 to 512
  NOTICE:  Growing number of hash batch to 512 is exhausting allowed memory
           (2164736 > 2097152)
  WARNING:  increasing number of batches from 512 to 1024
  NOTICE:  Growing number of hash batch to 1024 is exhausting allowed memory
           (4329472 > 2097152)
  WARNING:  increasing number of batches from 1024 to 2048
  NOTICE:  Growing number of hash batch to 2048 is exhausting allowed memory
           (8626304 > 2097152)
  WARNING:  increasing number of batches from 2048 to 4096
  NOTICE:  Growing number of hash batch to 4096 is exhausting allowed memory
           (17252480 > 2097152)
  WARNING:  increasing number of batches from 4096 to 8192
  NOTICE:  Growing number of hash batch to 8192 is exhausting allowed memory
           (34504832 > 2097152)
  WARNING:  increasing number of batches from 8192 to 16384
  NOTICE:  Growing number of hash batch to 16384 is exhausting allowed memory
           (68747392 > 2097152)
  WARNING:  increasing number of batches from 16384 to 32768
  NOTICE:  Growing number of hash batch to 32768 is exhausting allowed memory
           (137494656 > 2097152)

                                QUERY PLAN
  --------------------------------------------------------------------------
  Hash Join  (cost=6542057.16..7834651.23 rows=7 width=74)
             (actual time=558502.127..724007.708 rows=7040 loops=1)
             Hash Cond: (small.id = large.id)
  ->  Seq Scan on small (cost=0.00..940094.00 rows=94000000 width=41)
                        (actual time=0.035..3.666 rows=10000 loops=1)
  ->  Hash (cost=6542057.07..6542057.07 rows=7 width=41)
           (actual time=558184.152..558184.153 rows=700000000 loops=1) 
           Buckets: 32768 (originally 1024)
           Batches: 32768 (originally 1)
           Memory Usage: 1921kB
    -> Seq Scan on large (cost=0.00..6542057.07 rows=7 width=41)
                         (actual time=0.324..193750.567 rows=700000000 loops=1)
  Planning Time: 1.588 ms
  Execution Time: 724011.074 ms (8 rows)

Regards,
>From b1f9e315cf40b75929431236e200fbdf5a0068c4 Mon Sep 17 00:00:00 2001
From: Jehan-Guillaume de Rorthais <j...@dalibo.com>
Date: Mon, 27 Mar 2023 15:54:39 +0200
Subject: [PATCH] Allocate hash batches related BufFile in a dedicated context

---
 src/backend/executor/nodeHash.c     | 43 +++++++++++++++++++++++++----
 src/backend/executor/nodeHashjoin.c | 18 ++++++++----
 src/include/executor/hashjoin.h     | 15 ++++++++--
 src/include/executor/nodeHashjoin.h |  2 +-
 4 files changed, 64 insertions(+), 14 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 748c9b0024..3da83ac22a 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -484,7 +484,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 	 *
 	 * The hashtable control block is just palloc'd from the executor's
 	 * per-query memory context.  Everything else should be kept inside the
-	 * subsidiary hashCxt or batchCxt.
+	 * subsidiary hashCxt, batchCxt or fileCxt.
 	 */
 	hashtable = palloc_object(HashJoinTableData);
 	hashtable->nbuckets = nbuckets;
@@ -538,6 +538,10 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 												"HashBatchContext",
 												ALLOCSET_DEFAULT_SIZES);
 
+	hashtable->fileCxt = AllocSetContextCreate(CurrentMemoryContext,
+								 "HashBatchFiles",
+								 ALLOCSET_DEFAULT_SIZES);
+
 	/* Allocate data that will live for the life of the hashjoin */
 
 	oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
@@ -570,15 +574,21 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 
 	if (nbatch > 1 && hashtable->parallel_state == NULL)
 	{
+		MemoryContext oldctx;
+
 		/*
 		 * allocate and initialize the file arrays in hashCxt (not needed for
 		 * parallel case which uses shared tuplestores instead of raw files)
 		 */
+		oldctx = MemoryContextSwitchTo(hashtable->fileCxt);
+
 		hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
 		hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
 		/* The files will not be opened until needed... */
 		/* ... but make sure we have temp tablespaces established for them */
 		PrepareTempTablespaces();
+
+		MemoryContextSwitchTo(oldctx);
 	}
 
 	MemoryContextSwitchTo(oldcxt);
@@ -929,12 +939,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	nbatch = oldnbatch * 2;
 	Assert(nbatch > 1);
 
+	elog(WARNING, "increasing number of batches from %d to %d", oldnbatch, nbatch);
+
+	elog(LOG, "ExecHashIncreaseNumBatches ======= context stats start =======");
+	MemoryContextStats(TopMemoryContext);
+
+
 #ifdef HJDEBUG
 	printf("Hashjoin %p: increasing nbatch to %d because space = %zu\n",
 		   hashtable, nbatch, hashtable->spaceUsed);
 #endif
 
-	oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
+	oldcxt = MemoryContextSwitchTo(hashtable->fileCxt);
 
 	if (hashtable->innerBatchFile == NULL)
 	{
@@ -1022,9 +1038,11 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 			{
 				/* dump it out */
 				Assert(batchno > curbatch);
+
 				ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
 									  hashTuple->hashvalue,
-									  &hashtable->innerBatchFile[batchno]);
+									  &hashtable->innerBatchFile[batchno],
+									  hashtable->fileCxt);
 
 				hashtable->spaceUsed -= hashTupleSize;
 				nfreed++;
@@ -1042,6 +1060,13 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 		oldchunks = nextchunk;
 	}
 
+	if (hashtable->fileCxt->mem_allocated > hashtable->spaceAllowed)
+		elog(NOTICE,
+			 "Growing number of hash batch to %d is exhausting allowed memory (%ld > %ld)",
+			 nbatch,
+			 hashtable->fileCxt->mem_allocated,
+			 hashtable->spaceAllowed);
+
 #ifdef HJDEBUG
 	printf("Hashjoin %p: freed %ld of %ld tuples, space now %zu\n",
 		   hashtable, nfreed, ninmemory, hashtable->spaceUsed);
@@ -1063,6 +1088,9 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 			   hashtable);
 #endif
 	}
+
+	elog(LOG, "ExecHashIncreaseNumBatches ======= context stats end =======");
+	MemoryContextStats(TopMemoryContext);
 }
 
 /*
@@ -1681,9 +1709,11 @@ ExecHashTableInsert(HashJoinTable hashtable,
 		 * put the tuple into a temp file for later batches
 		 */
 		Assert(batchno > hashtable->curbatch);
+
 		ExecHashJoinSaveTuple(tuple,
 							  hashvalue,
-							  &hashtable->innerBatchFile[batchno]);
+							  &hashtable->innerBatchFile[batchno],
+							  hashtable->fileCxt);
 	}
 
 	if (shouldFree)
@@ -2534,8 +2564,11 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
 		{
 			/* Put the tuple into a temp file for later batches */
 			Assert(batchno > hashtable->curbatch);
+
 			ExecHashJoinSaveTuple(tuple, hashvalue,
-								  &hashtable->innerBatchFile[batchno]);
+								  &hashtable->innerBatchFile[batchno],
+								  hashtable->fileCxt);
+
 			pfree(hashTuple);
 			hashtable->spaceUsed -= tupleSize;
 			hashtable->spaceUsedSkew -= tupleSize;
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index f189fb4d28..6055abde49 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -432,8 +432,10 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 					 */
 					Assert(parallel_state == NULL);
 					Assert(batchno > hashtable->curbatch);
+
 					ExecHashJoinSaveTuple(mintuple, hashvalue,
-										  &hashtable->outerBatchFile[batchno]);
+										  &hashtable->outerBatchFile[batchno],
+										  hashtable->fileCxt);
 
 					if (shouldFree)
 						heap_free_minimal_tuple(mintuple);
@@ -1234,21 +1236,27 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
  * The data recorded in the file for each tuple is its hash value,
  * then the tuple in MinimalTuple format.
  *
- * Note: it is important always to call this in the regular executor
- * context, not in a shorter-lived context; else the temp file buffers
- * will get messed up.
+ * Note: it is important always to call this in the HashBatchFiles context,
+ * not in a shorter-lived context; else the temp file buffers will get messed
+ * up.
  */
 void
 ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
-					  BufFile **fileptr)
+					  BufFile **fileptr, MemoryContext filecxt)
 {
 	BufFile    *file = *fileptr;
 
 	if (file == NULL)
 	{
+		MemoryContext oldctx;
+
+		oldctx = MemoryContextSwitchTo(filecxt);
+
 		/* First write to this batch file, so open it. */
 		file = BufFileCreateTemp(false);
 		*fileptr = file;
+
+		MemoryContextSwitchTo(oldctx);
 	}
 
 	BufFileWrite(file, &hashvalue, sizeof(uint32));
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index acb7592ca0..d759235d7f 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -25,10 +25,14 @@
  *
  * Each active hashjoin has a HashJoinTable control block, which is
  * palloc'd in the executor's per-query context.  All other storage needed
- * for the hashjoin is kept in private memory contexts, two for each hashjoin.
+ * for the hashjoin is kept in private memory contexts, three for each
+ * hashjoin:
+ * - HashTableContext (hashCxt): the control block associated to the hash table
+ * - HashBatchContext (batchCxt): storages for batches
+ * - HashBatchFiles (fileCxt): storage for temp files buffers
+ *
  * This makes it easy and fast to release the storage when we don't need it
- * anymore.  (Exception: data associated with the temp files lives in the
- * per-query context too, since we always call buffile.c in that context.)
+ * anymore.
  *
  * The hashtable contexts are made children of the per-query context, ensuring
  * that they will be discarded at end of statement even if the join is
@@ -39,6 +43,10 @@
  * "hashCxt", while storage that is only wanted for the current batch is
  * allocated in the "batchCxt".  By resetting the batchCxt at the end of
  * each batch, we free all the per-batch storage reliably and without tedium.
+ * Note that data associated with the temp files lives in the "fileCxt" context
+ * which lives during the entire join as temp files might need to survives
+ * batches. These files are explicitly destroyed by calling BufFileClose()
+ * when the code is done with them.
  *
  * During first scan of inner relation, we get its tuples from executor.
  * If nbatch > 1 then tuples that don't belong in first batch get saved
@@ -348,6 +356,7 @@ typedef struct HashJoinTableData
 
 	MemoryContext hashCxt;		/* context for whole-hash-join storage */
 	MemoryContext batchCxt;		/* context for this-batch-only storage */
+	MemoryContext fileCxt;		/* context for the BufFile related storage */
 
 	/* used for dense allocation of tuples (into linked chunks) */
 	HashMemoryChunk chunks;		/* one list for the whole batch */
diff --git a/src/include/executor/nodeHashjoin.h b/src/include/executor/nodeHashjoin.h
index d367070883..a8f9ae1989 100644
--- a/src/include/executor/nodeHashjoin.h
+++ b/src/include/executor/nodeHashjoin.h
@@ -29,6 +29,6 @@ extern void ExecHashJoinInitializeWorker(HashJoinState *state,
 										 ParallelWorkerContext *pwcxt);
 
 extern void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
-								  BufFile **fileptr);
+								  BufFile **fileptr, MemoryContext filecxt);
 
 #endif							/* NODEHASHJOIN_H */
-- 
2.39.2

Reply via email to