On 1/10/25 15:54, Melanie Plageman wrote:
> On Thu, Jan 9, 2025 at 6:59 PM Tomas Vondra <to...@vondra.me> wrote:
>>
>> ...
> 
>> Robert's idea kept using buffered files, but limited how many we can
>> fill at any phase. Say we'd use a limit of 1024 batches, but we actually
>> need 1M batches. Then we'd do the build in two phases - we'd generate
>> 1024 batches, and then we'd split each of those batches into 1024
>> smaller batches. The trick (as I understand it) is those batches can't
>> overlap, so we'd not need more than 1024 batches, which greatly limits
>> the memory consumption. We could even use a lower limit, derived from
>> work_mem or something like that.
> 
> I think this is because we get the batch based on
> 
> *batchno = pg_rotate_right32(hashvalue, hashtable->log2_nbuckets) &
> (nbatch - 1);
> 
> And tuples can only spill forward. I think Robert's example is if we
> plan for 64 batches and eventually increase to 256 batches, a tuple
> assigned to batch 1 could go to 65, 129, or 193 but no other batch --
> meaning we would only need 3 files open when processing batch 1. But I
> think we would need to do more explicit file flushing and closing and
> opening, right? Which maybe doesn't matter when compared to the
> overhead of so many more buffers.
> 

I had a quiet evening yesterday, so I decided to take a stab at this and
see how hard would it be, and how bad would the impact be. Attached is
an experimental patch, doing the *bare* minimum for a simple query:

1) It defines a limit of 128 batches (a bit low, but also 1MB). In
practice we'd use something like 256 - 1024, probably. Doesn't matter.

2) Ensures the initial pass over data in MultiExecPrivateHash does not
use more than 128 batches, switches to "tooManyBatches=true" if that
happens (and dumps the batch to file ExecHashDumpBatchToFile, even if
it's batchno=0). And later it calls ExecHashHandleTooManyBatches() to
increase the nbatch further.

3) Does something similar for the outer relation - if there are too many
batches, we do ExecHashJoinRepartitionBatches() which first partitions
into 128 batches. This only does a single pass in the WIP, though.
Should be recursive or something.

4) Extends the BufFile API with BufFileHasBuffer/BufFileFreeBuffer so
that the code can free the buffers. It also means the buffer needs to be
allocated separately, not embedded in BufFile struct. (I'm a bit
surprised it works without having to re-read the buffer after freeing
it, but that's probably thanks to how hashjoin uses the files).

Anyway, this seems to work, and a simple experiment looks like this:

--------------------------------------------------------------------
create table t (a int, b text);

insert into t select i, md5(i::text)
  from generate_series(1,100000) s(i);

insert into t select i, md5(i::text)
  from generate_series(1,100000) s(i);

insert into t select i, md5(i::text)
  from generate_series(1,100000) s(i);

insert into t select i, md5(i::text)
  from generate_series(1,100000) s(i);

vacuum analyze;

set work_mem='128kB';
explain analyze select * from t t1 join t t2 on (t1.a = t2.a);
--------------------------------------------------------------------

This is just enough to need 256 batches, i.e. "one doubling" over the
128 batch limit.

On master I get this:

                            QUERY PLAN
-----------------------------------------------------------------------
 Hash Join  (cost=15459.00..51555.40 rows=1638740 width=74) (actual
time=80.065..337.192 rows=1600000 loops=1)
   Hash Cond: (t1.a = t2.a)
   Buffers: shared hit=6668, temp read=5806 written=5806
   ->  Seq Scan on t t1  (cost=0.00..7334.00 rows=400000 width=37)
(actual time=0.008..19.867 rows=400000 loops=1)
         Buffers: shared hit=3334
   ->  Hash  (cost=7334.00..7334.00 rows=400000 width=37) (actual
time=76.824..76.824 rows=400000 loops=1)
         Buckets: 4096  Batches: 256  Memory Usage: 132kB
         Buffers: shared hit=3334, temp written=2648
         ->  Seq Scan on t t2  (cost=0.00..7334.00 rows=400000 width=37)
(actual time=0.001..20.855 rows=400000 loops=1)
               Buffers: shared hit=3334
 Planning Time: 0.100 ms
 Execution Time: 385.779 ms
(12 rows)

while with the patch we get this:

                            QUERY PLAN
-----------------------------------------------------------------------
 Hash Join  (cost=15459.00..51555.40 rows=1638740 width=74) (actual
time=93.346..325.604 rows=1600000 loops=1)
   Hash Cond: (t1.a = t2.a)
   Buffers: shared hit=6668, temp read=8606 written=8606
   ->  Seq Scan on t t1  (cost=0.00..7334.00 rows=400000 width=37)
(actual time=0.006..14.416 rows=400000 loops=1)
         Buffers: shared hit=3334
   ->  Hash  (cost=7334.00..7334.00 rows=400000 width=37) (actual
time=48.481..48.482 rows=400000 loops=1)
         Buckets: 4096 (originally 4096)  Batches: 256 (originally 128)
Memory Usage: 132kB
         Buffers: shared hit=3334, temp read=23 written=2860
         ->  Seq Scan on t t2  (cost=0.00..7334.00 rows=400000 width=37)
(actual time=0.001..14.754 rows=400000 loops=1)
               Buffers: shared hit=3334
 Planning Time: 0.061 ms
 Execution Time: 374.229 ms
(12 rows)


So for this particular query there doesn't seem to be a particularly
massive hit. With more batches that's unfortunately not the case, but
that seems to be mostly due to looping over all batches when freeing
buffers. Looping over 1M batches (which we do for every batch) is
expensive, but that could be improved somehow - we only have a couple
files open (say 1024), so we could keep them in a list or something. And
I think we don't need to free the batches this often anyway, we might
not even opened any future batches.

I'm sure there's plenty of issues with the patch - e.g. it may not not
handle nbatch increases later (after batch 0), I ignored skew buckets,
and stuff like that ...

But it does seem like a workable idea ...


regards

-- 
Tomas Vondra
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6f8a379e3b9..6cd0a3328bb 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -79,7 +79,8 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
 										  size_t size);
 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
-
+static void ExecHashDumpBatchToFile(HashJoinTable hashtable);
+static void ExecHashHandleTooManyBatches(HashJoinTable hashtable, TupleTableSlot *slot);
 
 /* ----------------------------------------------------------------
  *		ExecHash
@@ -195,6 +196,13 @@ MultiExecPrivateHash(HashState *node)
 		}
 	}
 
+	/*
+	 * If we had to disable adding more batches while building the hash, now
+	 * is a good time to do the next phase - release all file buffers and split
+	 * each batch into new batches.
+	 */
+	ExecHashHandleTooManyBatches(hashtable, node->ps.ps_ResultTupleSlot);
+
 	/* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
 	if (hashtable->nbuckets != hashtable->nbuckets_optimal)
 		ExecHashIncreaseNumBuckets(hashtable);
@@ -502,10 +510,17 @@ ExecHashTableCreate(HashState *state)
 	hashtable->skewBucketLen = 0;
 	hashtable->nSkewBuckets = 0;
 	hashtable->skewBucketNums = NULL;
-	hashtable->nbatch = nbatch;
 	hashtable->curbatch = 0;
-	hashtable->nbatch_original = nbatch;
-	hashtable->nbatch_outstart = nbatch;
+
+	/* limit the number of batches we can populate at once */
+	hashtable->nbatch_maximum = HASHJOIN_BATCHES_PER_PHASE;
+	hashtable->nbatch = Min(nbatch, hashtable->nbatch_maximum);
+	hashtable->curbatch = 0;
+	hashtable->nbatch_original = Min(nbatch, hashtable->nbatch_maximum);
+	hashtable->nbatch_outstart = Min(nbatch, hashtable->nbatch_maximum);
+
+	/* assume we haven't hit the batch count limit */
+	hashtable->tooManyBatches = (nbatch > HASHJOIN_BATCHES_PER_PHASE);
 	hashtable->growEnabled = true;
 	hashtable->totalTuples = 0;
 	hashtable->partialTuples = 0;
@@ -874,7 +889,8 @@ ExecHashTableDestroy(HashJoinTable hashtable)
 	 */
 	if (hashtable->innerBatchFile != NULL)
 	{
-		for (i = 1; i < hashtable->nbatch; i++)
+		/* XXX We might have allocated a file for batch 0, so release it. */
+		for (i = 0; i < hashtable->nbatch; i++)
 		{
 			if (hashtable->innerBatchFile[i])
 				BufFileClose(hashtable->innerBatchFile[i]);
@@ -905,6 +921,12 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	long		nfreed;
 	HashMemoryChunk oldchunks;
 
+	/*
+	 * We should never get here after hitting too many batches, and switching
+	 * to spilling data for all batches.
+	 */
+	Assert(!hashtable->tooManyBatches);
+
 	/* do nothing if we've decided to shut off growth */
 	if (!hashtable->growEnabled)
 		return;
@@ -1054,6 +1076,166 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	}
 }
 
+/*
+ * ExecHashDumpBatchToFile
+ *		Dump the current hash table into a file for the current batch.
+ *
+ * Once we hit the number of batches we're populating, we stop doubling the
+ * number of batches. But we also can't keep the current batch in memory, so
+ * instead we dump the data to file and will read it later after we increase
+ * the number of batches (at which point we won't need to keep the current
+ * batches).
+ *
+ * XXX This is virtually the same thing we do in ExecHashIncreaseNumBatches.
+ */
+static void
+ExecHashDumpBatchToFile(HashJoinTable hashtable)
+{
+	HashMemoryChunk oldchunks;
+	int			curbatch = hashtable->curbatch;
+
+	memset(hashtable->buckets.unshared, 0,
+		   sizeof(HashJoinTuple) * hashtable->nbuckets);
+	oldchunks = hashtable->chunks;
+	hashtable->chunks = NULL;
+
+	/* so, let's scan through the old chunks, and all tuples in each chunk */
+	while (oldchunks != NULL)
+	{
+		HashMemoryChunk nextchunk = oldchunks->next.unshared;
+
+		/* position within the buffer (up to oldchunks->used) */
+		size_t		idx = 0;
+
+		/* process all tuples stored in this chunk (and then free it) */
+		while (idx < oldchunks->used)
+		{
+			HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(oldchunks) + idx);
+			MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+			int			hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+			int			bucketno;
+			int			batchno;
+
+			ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
+									  &bucketno, &batchno);
+
+			/* everything belongs to current batch, we're not adding any */
+			Assert(batchno == curbatch);
+
+			/* dump it out */
+			ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
+								  hashTuple->hashvalue,
+								  &hashtable->innerBatchFile[batchno],
+								  hashtable);
+
+			hashtable->spaceUsed -= hashTupleSize;
+
+			/* next tuple in this chunk */
+			idx += MAXALIGN(hashTupleSize);
+
+			/* allow this loop to be cancellable */
+			CHECK_FOR_INTERRUPTS();
+		}
+
+		/* we're done with this chunk - free it and proceed to the next one */
+		pfree(oldchunks);
+		oldchunks = nextchunk;
+	}
+}
+
+/*
+ * ExecHashHandleTooManyBatches
+ *		Handle the case when we hit the number of batches while building the
+ *		hash table.
+ *
+ * We can hit the situation again (e.g. we need 1M batches, but the limit is
+ * 128, so we go 128 -> 16384 -> 1M).
+ */
+static void
+ExecHashHandleTooManyBatches(HashJoinTable hashtable, TupleTableSlot *slot)
+{
+	if (!hashtable->tooManyBatches)
+		return;
+
+	/* didn't run into the issue, nothing to do */
+	while (hashtable->tooManyBatches)
+	{
+		int			nbatch;
+		int			oldnbatch;
+
+		BufFile *file = hashtable->innerBatchFile[hashtable->curbatch];
+
+		Assert(file != NULL);
+
+		/* XXX stupid way to increase number of batches */
+		oldnbatch = hashtable->nbatch;
+		nbatch = Max(hashtable->nbatch, hashtable->nbatch * 2);
+
+		hashtable->nbatch = nbatch;
+		hashtable->tooManyBatches = false;
+
+		/* enlarge arrays and zero out added entries */
+		hashtable->innerBatchFile = repalloc0_array(hashtable->innerBatchFile, BufFile *, oldnbatch, nbatch);
+		hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch, nbatch);
+
+		/* set to NULL, so that we create a new / empty file for this batch */
+		hashtable->innerBatchFile[hashtable->curbatch] = NULL;
+
+		/* FIXME close/flush all current files (release the buffer) */
+		for (int i = 0; i < oldnbatch; i++)
+		{
+			if (hashtable->innerBatchFile[i] != NULL)
+				BufFileFreeBuffer(hashtable->innerBatchFile[i]);
+		}
+
+		BufFileSeek(file, 0, 0, SEEK_SET);
+
+		/* read the current tuples from the batch file, decide what
+		 * to do with them */
+		while (true)
+		{
+			uint32		hashvalue;
+			uint32		header[2];
+			size_t		nread;
+			MinimalTuple tuple;
+
+			/*
+			 * We check for interrupts here because this is typically taken as an
+			 * alternative code path to an ExecProcNode() call, which would include
+			 * such a check.
+			 */
+			CHECK_FOR_INTERRUPTS();
+
+			/*
+			 * Since both the hash value and the MinimalTuple length word are uint32,
+			 * we can read them both in one BufFileRead() call without any type
+			 * cheating.
+			 */
+			nread = BufFileReadMaybeEOF(file, header, sizeof(header), true);
+			if (nread == 0)				/* end of file */
+			{
+				ExecClearTuple(slot);
+				break;
+			}
+
+			hashvalue = header[0];
+			tuple = (MinimalTuple) palloc(header[1]);
+			tuple->t_len = header[1];
+
+			BufFileReadExact(file,
+							 (char *) tuple + sizeof(uint32),
+							 header[1] - sizeof(uint32));
+
+			ExecForceStoreMinimalTuple(tuple, slot, true);
+
+			ExecHashTableInsert(hashtable, slot, hashvalue);
+		}
+
+		/* close the old batch file */
+		BufFileClose(file);
+	}
+}
+
 /*
  * ExecParallelHashIncreaseNumBatches
  *		Every participant attached to grow_batches_barrier must run this
@@ -1625,8 +1807,11 @@ ExecHashTableInsert(HashJoinTable hashtable,
 
 	/*
 	 * decide whether to put the tuple in the hash table or a temp file
+	 *
+	 * XXX If we are already in "too many batches" situation, just write the
+	 * data directly to the temp file, even for current batch.
 	 */
-	if (batchno == hashtable->curbatch)
+	if (batchno == hashtable->curbatch && !hashtable->tooManyBatches)
 	{
 		/*
 		 * put the tuple in hash table
@@ -1678,14 +1863,29 @@ ExecHashTableInsert(HashJoinTable hashtable,
 		if (hashtable->spaceUsed +
 			hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
 			> hashtable->spaceAllowed)
-			ExecHashIncreaseNumBatches(hashtable);
+		{
+			/*
+			 * If we already hit the maximum for this phase, we can't increase
+			 * the number of batches further, so we stop and flush everything
+			 * into the temp file.
+			 */
+			if (hashtable->nbatch == hashtable->nbatch_maximum)
+			{
+				/* write everything from this batch to the temp file */
+				ExecHashDumpBatchToFile(hashtable);
+				hashtable->tooManyBatches = true;
+			}
+			else
+				ExecHashIncreaseNumBatches(hashtable);
+		}
 	}
 	else
 	{
 		/*
 		 * put the tuple into a temp file for later batches
 		 */
-		Assert(batchno > hashtable->curbatch);
+		Assert(batchno > hashtable->curbatch ||
+			   (hashtable->tooManyBatches && batchno == hashtable->curbatch));
 		ExecHashJoinSaveTuple(tuple,
 							  hashvalue,
 							  &hashtable->innerBatchFile[batchno],
@@ -2494,6 +2694,7 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
 	while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
 		ExecHashRemoveNextSkewBucket(hashtable);
 
+	/* XXX maybe should check nbatch_maximum? */
 	/* Check we are not over the total spaceAllowed, either */
 	if (hashtable->spaceUsed > hashtable->spaceAllowed)
 		ExecHashIncreaseNumBatches(hashtable);
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 5661ad76830..a7514ca5137 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -963,6 +963,107 @@ ExecEndHashJoin(HashJoinState *node)
 	ExecEndNode(innerPlanState(node));
 }
 
+/*
+ * ExecHashJoinRepartitionBatches
+ *		Build the hashes for the outer side, recursively to not need more than
+ *		HASHJOIN_BATCHES_PER_PHASE batches.
+ *
+ * XXX There's no recursion right now, we just do the first phase and leave
+ * the rest to the regular re-batching. This won't be enough if we need more
+ * than a single repartition cycle.
+ *
+ * XXX Does this need to worry about the skew buckets? Probably yes.
+ */
+static void
+ExecHashJoinRepartitionBatches(PlanState *outerNode,
+							   HashJoinState *hjstate)
+{
+	TupleTableSlot *slot;
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int		nbatch = hashtable->nbatch;
+	int64	ntuples = 0;
+
+	/* fake the value so that we calculate the first phase batch */
+	hashtable->nbatch = HASHJOIN_BATCHES_PER_PHASE;
+
+	while (true)
+	{
+		/*
+		 * Check to see if first outer tuple was already fetched by
+		 * ExecHashJoin() and not used yet.
+		 */
+		slot = hjstate->hj_FirstOuterTupleSlot;
+		if (!TupIsNull(slot))
+			hjstate->hj_FirstOuterTupleSlot = NULL;
+		else
+			slot = ExecProcNode(outerNode);
+
+		while (!TupIsNull(slot))
+		{
+			uint32		hashvalue;
+			bool		isnull;
+
+			/*
+			 * We have to compute the tuple's hash value.
+			 */
+			ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
+
+			econtext->ecxt_outertuple = slot;
+
+			ResetExprContext(econtext);
+
+			hashvalue = DatumGetUInt32(ExecEvalExprSwitchContext(hjstate->hj_OuterHash,
+																  econtext,
+																  &isnull));
+
+			if (!isnull)
+			{
+				int	bucketno;
+				int	batchno;
+				bool		shouldFree;
+				MinimalTuple mintuple;
+
+				ntuples++;
+
+				/* remember outer relation is not empty for possible rescan */
+				hjstate->hj_OuterNotEmpty = true;
+
+				ExecHashGetBucketAndBatch(hashtable, hashvalue,
+										  &bucketno, &batchno);
+
+				/*
+				 * The tuple might not belong to the current batch (where
+				 * "current batch" includes the skew buckets if any).
+				 */
+				mintuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
+
+				/*
+				 * Need to postpone this outer tuple to a later batch.
+				 * Save it in the corresponding outer-batch file.
+				 */
+				Assert(batchno >= hashtable->curbatch);
+				ExecHashJoinSaveTuple(mintuple, hashvalue,
+									  &hashtable->outerBatchFile[batchno],
+									  hashtable);
+
+				if (shouldFree)
+					heap_free_minimal_tuple(mintuple);
+			}
+
+			/*
+			 * That tuple couldn't match because of a NULL, so discard it and
+			 * continue with the next one.
+			 */
+			slot = ExecProcNode(outerNode);
+		}
+
+		break;
+	}
+
+	/* restore the correct value */
+	hashtable->nbatch = nbatch;
+}
+
 /*
  * ExecHashJoinOuterGetTuple
  *
@@ -983,8 +1084,36 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
 	HashJoinTable hashtable = hjstate->hj_HashTable;
 	int			curbatch = hashtable->curbatch;
 	TupleTableSlot *slot;
+	bool		repartitioned = false;
 
-	if (curbatch == 0)			/* if it is the first pass */
+	/*
+	 * If this is the first batch, consider performing the repartitioning. This
+	 * will build HASHJOIN_BATCHES_PER_PHASE batches, which then need to be
+	 * rebatched using the regular logic.
+	 *
+	 * This includes the first batch, which means we then need to read the
+	 * tuples from the file, just like for every other batch.
+	 *
+	 * But do that only once, which we detect by (file == NULL).
+	 */
+	if ((curbatch == 0) &&
+		(hashtable->nbatch > HASHJOIN_BATCHES_PER_PHASE) &&
+		(hashtable->outerBatchFile[0] == NULL))
+	{
+		ExecHashJoinRepartitionBatches(outerNode, hjstate);
+
+		/* make sure to reset the first batch to start reading */
+		if (hashtable->outerBatchFile[0])
+			BufFileSeek(hashtable->outerBatchFile[0], 0, 0, SEEK_SET);
+	}
+
+	/*
+	 * We need to know if this is repartitioned even for later tuples in
+	 * the first batch. We detect that by having the batch file.
+	 */
+	repartitioned = (curbatch == 0) && (hashtable->outerBatchFile[0] != NULL);
+
+	if (curbatch == 0 && !repartitioned)			/* if it is the first pass */
 	{
 		/*
 		 * Check to see if first outer tuple was already fetched by
@@ -1120,6 +1249,23 @@ ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
 	return NULL;
 }
 
+/* XXX this is very expensive, we have to walk long arrays often */
+static void
+ExecHashJoinFreeBuffers(HashJoinTable hashtable)
+{
+	if ((!hashtable->innerBatchFile) && (!hashtable->outerBatchFile))
+		return;
+
+	for (int i = hashtable->curbatch; i < hashtable->nbatch; i++)
+	{
+		if (hashtable->innerBatchFile[i])
+			BufFileFreeBuffer(hashtable->innerBatchFile[i]);
+
+		if (hashtable->outerBatchFile[i])
+			BufFileFreeBuffer(hashtable->outerBatchFile[i]);
+	}
+}
+
 /*
  * ExecHashJoinNewBatch
  *		switch to a new hashjoin batch
@@ -1139,6 +1285,8 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 	nbatch = hashtable->nbatch;
 	curbatch = hashtable->curbatch;
 
+	ExecHashJoinFreeBuffers(hashtable);
+
 	if (curbatch > 0)
 	{
 		/*
@@ -1398,6 +1546,20 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
 	return false;
 }
 
+static int
+ExecHashCountBuffers(BufFile **files, int nfiles)
+{
+	int cnt = 0;
+
+	for (int i = 0; i < nfiles; i++)
+	{
+		if (files[i] && BufFileHasBuffer(files[i]))
+			cnt++;
+	}
+
+	return cnt;
+}
+
 /*
  * ExecHashJoinSaveTuple
  *		save a tuple to a batch file.
@@ -1438,6 +1600,13 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
 		*fileptr = file;
 
 		MemoryContextSwitchTo(oldctx);
+
+		if (false)
+		{
+			int	ninner = ExecHashCountBuffers(hashtable->innerBatchFile, hashtable->nbatch);
+			int	nouter = ExecHashCountBuffers(hashtable->outerBatchFile, hashtable->nbatch);
+			elog(WARNING, "curbatch %d  inner %d  outer %d", hashtable->curbatch, ninner, nouter);
+		}
 	}
 
 	BufFileWrite(file, &hashvalue, sizeof(uint32));
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index 6449f82a72b..e155976cc23 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -100,7 +100,8 @@ struct BufFile
 	 * XXX Should ideally us PGIOAlignedBlock, but might need a way to avoid
 	 * wasting per-file alignment padding when some users create many files.
 	 */
-	PGAlignedBlock buffer;
+	MemoryContext	bufctx;
+	PGAlignedBlock *buffer;
 };
 
 static BufFile *makeBufFileCommon(int nfiles);
@@ -128,6 +129,9 @@ makeBufFileCommon(int nfiles)
 	file->pos = 0;
 	file->nbytes = 0;
 
+	file->bufctx = CurrentMemoryContext;
+	file->buffer = NULL;
+
 	return file;
 }
 
@@ -423,6 +427,33 @@ BufFileClose(BufFile *file)
 	pfree(file);
 }
 
+static void
+BufFileAllocBuffer(BufFile *file)
+{
+	if (file->buffer != NULL)
+		return;
+
+	file->buffer = MemoryContextAlloc(file->bufctx, sizeof(PGAlignedBlock));
+}
+
+void
+BufFileFreeBuffer(BufFile *file)
+{
+	if (file->buffer == NULL)
+		return;
+
+	BufFileFlush(file);
+
+	pfree(file->buffer);
+	file->buffer = NULL;
+}
+
+bool
+BufFileHasBuffer(BufFile *file)
+{
+	return (file->buffer != NULL);
+}
+
 /*
  * BufFileLoadBuffer
  *
@@ -454,12 +485,14 @@ BufFileLoadBuffer(BufFile *file)
 	else
 		INSTR_TIME_SET_ZERO(io_start);
 
+	BufFileAllocBuffer(file);
+
 	/*
 	 * Read whatever we can get, up to a full bufferload.
 	 */
 	file->nbytes = FileRead(thisfile,
-							file->buffer.data,
-							sizeof(file->buffer),
+							file->buffer->data,
+							sizeof(file->buffer->data),
 							file->curOffset,
 							WAIT_EVENT_BUFFILE_READ);
 	if (file->nbytes < 0)
@@ -535,7 +568,7 @@ BufFileDumpBuffer(BufFile *file)
 			INSTR_TIME_SET_ZERO(io_start);
 
 		bytestowrite = FileWrite(thisfile,
-								 file->buffer.data + wpos,
+								 file->buffer->data + wpos,
 								 bytestowrite,
 								 file->curOffset,
 								 WAIT_EVENT_BUFFILE_WRITE);
@@ -616,7 +649,7 @@ BufFileReadCommon(BufFile *file, void *ptr, size_t size, bool exact, bool eofOK)
 			nthistime = size;
 		Assert(nthistime > 0);
 
-		memcpy(ptr, file->buffer.data + file->pos, nthistime);
+		memcpy(ptr, file->buffer->data + file->pos, nthistime);
 
 		file->pos += nthistime;
 		ptr = (char *) ptr + nthistime;
@@ -679,6 +712,8 @@ BufFileWrite(BufFile *file, const void *ptr, size_t size)
 
 	Assert(!file->readOnly);
 
+	BufFileAllocBuffer(file);
+
 	while (size > 0)
 	{
 		if (file->pos >= BLCKSZ)
@@ -700,7 +735,7 @@ BufFileWrite(BufFile *file, const void *ptr, size_t size)
 			nthistime = size;
 		Assert(nthistime > 0);
 
-		memcpy(file->buffer.data + file->pos, ptr, nthistime);
+		memcpy(file->buffer->data + file->pos, ptr, nthistime);
 
 		file->dirty = true;
 		file->pos += nthistime;
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 67ae89c8257..1ba7a6d1b88 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -322,9 +322,11 @@ typedef struct HashJoinTableData
 	int			nbatch;			/* number of batches */
 	int			curbatch;		/* current batch #; 0 during 1st pass */
 
+	int			nbatch_maximum;		/* maximum number of batches to create */
 	int			nbatch_original;	/* nbatch when we started inner scan */
 	int			nbatch_outstart;	/* nbatch when we started outer scan */
 
+	bool		tooManyBatches;	/* did we generate too many batches? */
 	bool		growEnabled;	/* flag to shut off nbatch increases */
 
 	double		totalTuples;	/* # tuples obtained from inner plan */
@@ -362,4 +364,7 @@ typedef struct HashJoinTableData
 	dsa_pointer current_chunk_shared;
 } HashJoinTableData;
 
+
+#define HASHJOIN_BATCHES_PER_PHASE	128
+
 #endif							/* HASHJOIN_H */
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index 6b92668d90f..7dd2f394c0d 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -55,5 +55,7 @@ extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name,
 extern void BufFileDeleteFileSet(FileSet *fileset, const char *name,
 								 bool missing_ok);
 extern void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset);
+extern void BufFileFreeBuffer(BufFile *file);
+extern bool BufFileHasBuffer(BufFile *file);
 
 #endif							/* BUFFILE_H */

Reply via email to