On Wed, Apr 24, 2019 at 02:36:33AM +0200, Tomas Vondra wrote:

...

I still think the idea with an "overflow batch" is worth considering,
because it'd allow us to keep the memory usage within work_mem. And
after getting familiar with the hash join code again (haven't messed
with it since 9.5 or so) I think it should not be all that difficult.
I'll give it a try over the weekend if I get bored for a while.


OK, so I took a stab at this, and overall it seems to be workable. The
patches I have are nowhere near committable, but I think the approach
works fairly well - the memory is kept in check, and the performance is
comparable to the "ballancing" approach tested before.

To explain it a bit, the idea is that we can compute how many BufFile
structures we can keep in memory - we can't use more than work_mem/2 for
that, because then we'd mostly eliminate space for the actual data. For
example with 4MB, we know we can keep 128 batches - we need 128 for
outer and inner side, so 256 in total, and 256*8kB = 2MB.

And then, we just increase the number of batches but instead of adding
the BufFile entries, we split batches into slices that we can keep in
memory (say, the 128 batches). And we keep BufFiles for the current one
and an "overflow file" for the other slices. After processing a slice,
we simply switch to the next one, and use the overflow file as a temp
file for the first batch - we redistribute it into the other batches in
the slice and another overflow file.

That's what the v3 patch (named 'single overflow file') does. I does
work, but unfortunately it significantly inflates the amount of data
written to temporary files. Assume we need e.g. 1024 batches, but only
128 fit into memory. That means we'll need 8 slices, and during the
first pass we'll handle 1/8 of the data and write 7/8 to the overflow
file.  Then after processing the slice and switching to the next one, we
repeat this dance - 1/8 gets processed, 6/8 written to another overflow
file. So essentially we "forward" about

   7/8 + 6/8 + 5/8 + ... + 1/8 = 28/8 = 3.5

of data between slices, and we need to re-shuffle data in each slice,
which amounts to additional 1x data. That's pretty significant overhead,
as will be clear from the measurements I'll present shortly.

But luckily, there's a simple solution to this - instead of writing the
data into a single overflow file, we can create one overflow file for
each slice. That will leave us with the ~1x of additional writes when
distributing data into batches in the current slice, but it eliminates
the main source of write amplification - awalanche-like forwarding of
data between slices.

This relaxes the memory limit a bit again, because we can't really keep
the number of overflow files constrained by work_mem, but we should only
need few of them (much less than when adding one file per batch right
away). For example with 128 in-memory batches, this reduces the amount
of necessary memory 128x.

And this is what v4 (per-slice overflow file) does, pretty much.


Two more comments, regarding memory accounting in previous patches. It
was a bit broken, because we actually need 2x the number of BufFiles. We
needed nbatch files for outer side and nbatch files for inner side, but
we only considered one of those - both when deciding when to increase
the number of batches / increase spaceAllowed, and when reporting the
memory usage. So with large number of batches the reported amount of
used memory was roughly 1/2 of the actual value :-/

The memory accounting was a bit bogus for another reason - spaceUsed
simply tracks the amount of memory for hash table contents. But at the
end we were simply adding the current space for BufFile stuff, ignoring
the fact that that's likely much larger than when the spacePeak value
got stored. For example we might have kept early spaceUsed when it was
almost work_mem, and then added the final large BufFile allocation.

I've fixed both issues in the patches attached to this message. It does
not make a huge difference in practice, but it makes it easier to
compare values between patches.


Now, some test results - I've repeated the simple test with uniform data
set, which is pretty much ideal for hash joins (no unexlectedly large
batches that can't be split, etc.). I've done this with 1M, 5M, 10M, 25M
and 50M rows in the large table (which gets picked for the "hash" side),
and measured how much memory gets used, how many batches, how long it
takes and how much data gets written to temp files.

See the hashjoin-test.sh script for more details.

So, here are the results with work_mem = 4MB (so the number of in-memory
batches for the last two entries is 128). The columns are:

* nbatch - the final number of batches
* memory - memory usage, as reported by explain analyze
* time - duration of the query (without explain analyze) in seconds
* size - size of the large table
* temp - amount of data written to temp files
* amplif - write amplification (temp / size)


 1M rows
 ===================================================================
                 nbatch  memory   time  size (MB)  temp (MB)  amplif
 -------------------------------------------------------------------
 master             256    7681    3.3        730        899    1.23
 rebalance          256    7711    3.3        730        884    1.21
 single file       1024    4161    7.2        730       3168    4.34
 per-slice file    1024    4161    4.7        730       1653    2.26


 5M rows
 ===================================================================
                 nbatch  memory   time  size (MB)  temp (MB)  amplif
 -------------------------------------------------------------------
 master            2048   36353     22       3652       5276    1.44
 rebalance          512   16515     18       3652       4169    1.14
 single file       4096    4353    156       3652      53897   14.76
 per-slice file    4096    4353     28       3652       8106    2.21


 10M rows
 ===================================================================
                 nbatch  memory   time  size (MB)  temp (MB)  amplif
 -------------------------------------------------------------------
 master            4096   69121     61       7303      10556    1.45
 rebalance          512   24326     46       7303       7405    1.01
 single file       8192    4636    762       7303     211234   28.92
 per-slice file    8192    4636     65       7303      16278    2.23


 25M rows
 ===================================================================
                 nbatch  memory   time  size (MB)  temp (MB)  amplif
 -------------------------------------------------------------------
 master            8192  134657    190       7303      24279    1.33
 rebalance         1024   36611    158       7303      20024    1.10
 single file      16384    6011   4054       7303    1046174   57.32
 per-slice file   16384    6011    207       7303      39073    2.14


 50M rows
 ===================================================================
                 nbatch  memory   time  size (MB)  temp (MB)  amplif
 -------------------------------------------------------------------
 master           16384  265729    531      36500      48519   1.33
 rebalance         2048   53241    447      36500      48077   1.32
 single file          -       -      -      36500          -      -
 per-slice file   32768    8125    451      36500      78662   2.16


From those numbers it's pretty clear that per-slice overflow file does
by far the best job in enforcing work_mem and minimizing the amount of
data spilled to temp files. It does write a bit more data than both
master and the simple rebalancing, but that's the cost for enforcing
work_mem more strictly. It's generally a bit slower than those two
approaches, although on the largest scale it's actually a bit faster
than master. I think that's pretty acceptable, considering this is meant
to address extreme underestimates where we currently just eat memory.

The case with single overflow file performs rather poorly - I haven't
even collected data from the largest scale, but considering it spilled
1TB of temp files with a dataset half the size, that's not an issue.
(Note that this does not mean it needs 1TB of temp space, those writes
are spread over time and the files are created/closed as we go. The
system only has ~100GB of free disk space.)


Gunther, could you try the v2 and v4 patches on your data set? That
would be an interesting data point, I think.


regards

--
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6ffaa751f2..4d5a6872cc 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable 
hashtable,
 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
 
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
 
 /* ----------------------------------------------------------------
  *             ExecHash
@@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node)
        if (hashtable->nbuckets != hashtable->nbuckets_optimal)
                ExecHashIncreaseNumBuckets(hashtable);
 
-       /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
-       hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
-       if (hashtable->spaceUsed > hashtable->spacePeak)
-               hashtable->spacePeak = hashtable->spaceUsed;
+       /* refresh info about peak used memory */
+       ExecHashUpdateSpacePeak(hashtable);
 
        hashtable->partialTuples = hashtable->totalTuples;
 }
@@ -1647,12 +1646,56 @@ ExecHashTableInsert(HashJoinTable hashtable,
 
                /* Account for space used, and back off if we've used too much 
*/
                hashtable->spaceUsed += hashTupleSize;
-               if (hashtable->spaceUsed > hashtable->spacePeak)
-                       hashtable->spacePeak = hashtable->spaceUsed;
+
+               /* refresh info about peak used memory */
+               ExecHashUpdateSpacePeak(hashtable);
+
+               /*
+                * Consider increasing number of batches.
+                *
+                * Each batch requires a non-trivial amount of memory, because 
BufFile
+                * includes a PGAlignedBlock (typically 8kB buffer). So when 
doubling
+                * the number of batches, we need to be careful and only allow 
that if
+                * it actually has a chance of reducing memory usage.
+                *
+                * In particular, doubling the number of batches is pointless 
when
+                *
+                *              (spaceUsed / 2) < (nbatches * sizeof(BufFile))
+                *
+                * because we expect to save roughly 1/2 of memory currently 
used for
+                * data (rows) at the price of doubling the memory used for 
BufFile.
+                *
+                * We can't stop adding batches entirely, because that would 
just mean
+                * the batches would need more and more memory. So we need to 
increase
+                * the number of batches, even if we can't enforce work_mem 
properly.
+                * The goal is to minimize the overall memory usage of the hash 
join.
+                *
+                * Note: This applies mostly to cases of significant 
underestimates,
+                * resulting in an explosion of the number of batches. The 
properly
+                * estimated cases should generally end up using merge join 
based on
+                * high cost of the batched hash join.
+                */
                if (hashtable->spaceUsed +
-                       hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+                       hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+                       hashtable->nbatch * sizeof(PGAlignedBlock) * 2
                        > hashtable->spaceAllowed)
+               {
                        ExecHashIncreaseNumBatches(hashtable);
+
+                       /*
+                        * Consider increasing the resize threshold.
+                        *
+                        * For well estimated cases this does nothing, because 
batches are
+                        * expected to account only for small fraction of 
work_mem. But if
+                        * we significantly underestimate the number of 
batches, we may end
+                        * up in a situation where BufFile alone exceed 
work_mem. So move
+                        * the threshold a bit, until the next point where 
it'll make sense
+                        * to consider adding batches again.
+                        */
+                       hashtable->spaceAllowed
+                               = Max(hashtable->spaceAllowed,
+                                         hashtable->nbatch * 
sizeof(PGAlignedBlock) * 3);
+               }
        }
        else
        {
@@ -1893,6 +1936,21 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
        }
 }
 
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+       Size    spaceUsed = hashtable->spaceUsed;
+
+       /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+       spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+       /* Account for memory used for batch files (inner + outer) */
+       spaceUsed += hashtable->nbatch * sizeof(PGAlignedBlock) * 2;
+
+       if (spaceUsed > hashtable->spacePeak)
+               hashtable->spacePeak = spaceUsed;
+}
+
 /*
  * ExecScanHashBucket
  *             scan a hash bucket for matches to the current outer tuple
@@ -2272,8 +2330,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash 
*node, int mcvsToUse)
                        + mcvsToUse * sizeof(int);
                hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
                        + mcvsToUse * sizeof(int);
-               if (hashtable->spaceUsed > hashtable->spacePeak)
-                       hashtable->spacePeak = hashtable->spaceUsed;
+
+               /* refresh info about peak used memory */
+               ExecHashUpdateSpacePeak(hashtable);
 
                /*
                 * Create a skew bucket for each MCV hash value.
@@ -2322,8 +2381,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash 
*node, int mcvsToUse)
                        hashtable->nSkewBuckets++;
                        hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
                        hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
-                       if (hashtable->spaceUsed > hashtable->spacePeak)
-                               hashtable->spacePeak = hashtable->spaceUsed;
+
+                       /* refresh info about peak used memory */
+                       ExecHashUpdateSpacePeak(hashtable);
                }
 
                free_attstatsslot(&sslot);
@@ -2411,8 +2471,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
        /* Account for space used, and back off if we've used too much */
        hashtable->spaceUsed += hashTupleSize;
        hashtable->spaceUsedSkew += hashTupleSize;
-       if (hashtable->spaceUsed > hashtable->spacePeak)
-               hashtable->spacePeak = hashtable->spaceUsed;
+
+       /* refresh info about peak used memory */
+       ExecHashUpdateSpacePeak(hashtable);
+
        while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
                ExecHashRemoveNextSkewBucket(hashtable);
 
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 799a22e9d5..c957043599 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2612,6 +2612,8 @@ show_hash_info(HashState *hashstate, ExplainState *es)
                                                                   
hinstrument.nbatch, es);
                        ExplainPropertyInteger("Original Hash Batches", NULL,
                                                                   
hinstrument.nbatch_original, es);
+                       ExplainPropertyInteger("In-Memory Hash Batches", NULL,
+                                                                  
hinstrument.nbatch_original, es);
                        ExplainPropertyInteger("Peak Memory Usage", "kB",
                                                                   spacePeakKb, 
es);
                }
@@ -2619,21 +2621,38 @@ show_hash_info(HashState *hashstate, ExplainState *es)
                                 hinstrument.nbuckets_original != 
hinstrument.nbuckets)
                {
                        appendStringInfoSpaces(es->str, es->indent * 2);
-                       appendStringInfo(es->str,
-                                                        "Buckets: %d 
(originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
-                                                        hinstrument.nbuckets,
-                                                        
hinstrument.nbuckets_original,
-                                                        hinstrument.nbatch,
-                                                        
hinstrument.nbatch_original,
-                                                        spacePeakKb);
+                       if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+                               appendStringInfo(es->str,
+                                                                "Buckets: %d 
(originally %d)  Batches: %d (originally %d, in-memory %d)  Memory Usage: 
%ldkB\n",
+                                                                
hinstrument.nbuckets,
+                                                                
hinstrument.nbuckets_original,
+                                                                
hinstrument.nbatch,
+                                                                
hinstrument.nbatch_original,
+                                                                
hinstrument.nbatch_inmemory,
+                                                                spacePeakKb);
+                       else
+                               appendStringInfo(es->str,
+                                                                "Buckets: %d 
(originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
+                                                                
hinstrument.nbuckets,
+                                                                
hinstrument.nbuckets_original,
+                                                                
hinstrument.nbatch,
+                                                                
hinstrument.nbatch_original,
+                                                                spacePeakKb);
                }
                else
                {
                        appendStringInfoSpaces(es->str, es->indent * 2);
-                       appendStringInfo(es->str,
-                                                        "Buckets: %d  Batches: 
%d  Memory Usage: %ldkB\n",
-                                                        hinstrument.nbuckets, 
hinstrument.nbatch,
-                                                        spacePeakKb);
+                       if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+                               appendStringInfo(es->str,
+                                                                "Buckets: %d  
Batches: %d (in-memory: %d)  Memory Usage: %ldkB\n",
+                                                                
hinstrument.nbuckets, hinstrument.nbatch,
+                                                                
hinstrument.nbatch_inmemory,
+                                                                spacePeakKb);
+                       else
+                               appendStringInfo(es->str,
+                                                                "Buckets: %d  
Batches: %d  Memory Usage: %ldkB\n",
+                                                                
hinstrument.nbuckets, hinstrument.nbatch,
+                                                                spacePeakKb);
                }
        }
 }
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6ffaa751f2..044d360fd4 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable 
hashtable,
 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
 
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
 
 /* ----------------------------------------------------------------
  *             ExecHash
@@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node)
        if (hashtable->nbuckets != hashtable->nbuckets_optimal)
                ExecHashIncreaseNumBuckets(hashtable);
 
-       /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
-       hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
-       if (hashtable->spaceUsed > hashtable->spacePeak)
-               hashtable->spacePeak = hashtable->spaceUsed;
+       /* refresh info about peak used memory */
+       ExecHashUpdateSpacePeak(hashtable);
 
        hashtable->partialTuples = hashtable->totalTuples;
 }
@@ -433,6 +432,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, 
bool keepNulls)
        size_t          space_allowed;
        int                     nbuckets;
        int                     nbatch;
+       int                     nbatch_inmemory;
        double          rows;
        int                     num_skew_mcvs;
        int                     log2_nbuckets;
@@ -462,7 +462,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, 
bool keepNulls)
                                                        state->parallel_state 
!= NULL ?
                                                        
state->parallel_state->nparticipants - 1 : 0,
                                                        &space_allowed,
-                                                       &nbuckets, &nbatch, 
&num_skew_mcvs);
+                                                       &nbuckets, &nbatch, 
&nbatch_inmemory,
+                                                       &num_skew_mcvs);
 
        /* nbuckets must be a power of 2 */
        log2_nbuckets = my_log2(nbuckets);
@@ -489,6 +490,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, 
bool keepNulls)
        hashtable->nSkewBuckets = 0;
        hashtable->skewBucketNums = NULL;
        hashtable->nbatch = nbatch;
+       hashtable->nbatch_inmemory = nbatch_inmemory;
        hashtable->curbatch = 0;
        hashtable->nbatch_original = nbatch;
        hashtable->nbatch_outstart = nbatch;
@@ -498,6 +500,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, 
bool keepNulls)
        hashtable->skewTuples = 0;
        hashtable->innerBatchFile = NULL;
        hashtable->outerBatchFile = NULL;
+       hashtable->innerOverflowFile = NULL;
+       hashtable->outerOverflowFile = NULL;
        hashtable->spaceUsed = 0;
        hashtable->spacePeak = 0;
        hashtable->spaceAllowed = space_allowed;
@@ -559,14 +563,16 @@ ExecHashTableCreate(HashState *state, List 
*hashOperators, bool keepNulls)
 
        if (nbatch > 1 && hashtable->parallel_state == NULL)
        {
+               int     cnt = Min(nbatch, nbatch_inmemory);
+
                /*
                 * allocate and initialize the file arrays in hashCxt (not 
needed for
                 * parallel case which uses shared tuplestores instead of raw 
files)
                 */
                hashtable->innerBatchFile = (BufFile **)
-                       palloc0(nbatch * sizeof(BufFile *));
+                       palloc0(cnt * sizeof(BufFile *));
                hashtable->outerBatchFile = (BufFile **)
-                       palloc0(nbatch * sizeof(BufFile *));
+                       palloc0(cnt * sizeof(BufFile *));
                /* The files will not be opened until needed... */
                /* ... but make sure we have temp tablespaces established for 
them */
                PrepareTempTablespaces();
@@ -665,6 +671,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool 
useskew,
                                                size_t *space_allowed,
                                                int *numbuckets,
                                                int *numbatches,
+                                               int *numbatches_inmemory,
                                                int *num_skew_mcvs)
 {
        int                     tupsize;
@@ -675,6 +682,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool 
useskew,
        long            max_pointers;
        long            mppow2;
        int                     nbatch = 1;
+       int                     nbatch_inmemory = 1;
        int                     nbuckets;
        double          dbuckets;
 
@@ -795,6 +803,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool 
useskew,
                                                                        
space_allowed,
                                                                        
numbuckets,
                                                                        
numbatches,
+                                                                       
numbatches_inmemory,
                                                                        
num_skew_mcvs);
                        return;
                }
@@ -831,11 +840,22 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, 
bool useskew,
                        nbatch <<= 1;
        }
 
+       /*
+        * See how many batches we can fit into memory (driven mostly by size
+        * of BufFile, with PGAlignedBlock being the largest part of that).
+        * We need one BufFile for inner and outer side, so we count it twice
+        * for each batch, and we stop once we exceed (work_mem/2).
+        */
+       while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2
+                       <= (work_mem * 1024L / 2))
+               nbatch_inmemory *= 2;
+
        Assert(nbuckets > 0);
        Assert(nbatch > 0);
 
        *numbuckets = nbuckets;
        *numbatches = nbatch;
+       *numbatches_inmemory = nbatch_inmemory;
 }
 
 
@@ -857,13 +877,21 @@ ExecHashTableDestroy(HashJoinTable hashtable)
         */
        if (hashtable->innerBatchFile != NULL)
        {
-               for (i = 1; i < hashtable->nbatch; i++)
+               int nbatch = Min(hashtable->nbatch, hashtable->nbatch_inmemory);
+
+               for (i = 1; i < nbatch; i++)
                {
                        if (hashtable->innerBatchFile[i])
                                BufFileClose(hashtable->innerBatchFile[i]);
                        if (hashtable->outerBatchFile[i])
                                BufFileClose(hashtable->outerBatchFile[i]);
                }
+
+               if (hashtable->innerOverflowFile)
+                       BufFileClose(hashtable->innerOverflowFile);
+
+               if (hashtable->outerOverflowFile)
+                       BufFileClose(hashtable->outerOverflowFile);
        }
 
        /* Release working memory (batchCxt is a child, so it goes away too) */
@@ -909,6 +937,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 
        if (hashtable->innerBatchFile == NULL)
        {
+               /* XXX nbatch=1, no need to deal with nbatch_inmemory here */
+
                /* we had no file arrays before */
                hashtable->innerBatchFile = (BufFile **)
                        palloc0(nbatch * sizeof(BufFile *));
@@ -919,15 +949,23 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
        }
        else
        {
+               int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+
                /* enlarge arrays and zero out added entries */
                hashtable->innerBatchFile = (BufFile **)
-                       repalloc(hashtable->innerBatchFile, nbatch * 
sizeof(BufFile *));
+                       repalloc(hashtable->innerBatchFile, nbatch_tmp * 
sizeof(BufFile *));
                hashtable->outerBatchFile = (BufFile **)
-                       repalloc(hashtable->outerBatchFile, nbatch * 
sizeof(BufFile *));
-               MemSet(hashtable->innerBatchFile + oldnbatch, 0,
-                          (nbatch - oldnbatch) * sizeof(BufFile *));
-               MemSet(hashtable->outerBatchFile + oldnbatch, 0,
-                          (nbatch - oldnbatch) * sizeof(BufFile *));
+                       repalloc(hashtable->outerBatchFile, nbatch_tmp * 
sizeof(BufFile *));
+
+               if (oldnbatch < nbatch_tmp)
+               {
+                       MemSet(hashtable->innerBatchFile + oldnbatch, 0,
+                                  (nbatch_tmp - oldnbatch) * sizeof(BufFile 
*));
+                       MemSet(hashtable->outerBatchFile + oldnbatch, 0,
+                                  (nbatch_tmp - oldnbatch) * sizeof(BufFile 
*));
+               }
+
+               /* no need to initialize the overflow files explicitly */
        }
 
        MemoryContextSwitchTo(oldcxt);
@@ -999,11 +1037,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
                        }
                        else
                        {
+                               BufFile **batchFile;
+
                                /* dump it out */
                                Assert(batchno > curbatch);
+
+                               batchFile = ExecHashGetBatchFile(hashtable, 
batchno,
+                                                                               
                 hashtable->innerBatchFile,
+                                                                               
                 &hashtable->innerOverflowFile);
+
                                
ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
                                                                          
hashTuple->hashvalue,
-                                                                         
&hashtable->innerBatchFile[batchno]);
+                                                                         
batchFile);
 
                                hashtable->spaceUsed -= hashTupleSize;
                                nfreed++;
@@ -1647,22 +1692,33 @@ ExecHashTableInsert(HashJoinTable hashtable,
 
                /* Account for space used, and back off if we've used too much 
*/
                hashtable->spaceUsed += hashTupleSize;
-               if (hashtable->spaceUsed > hashtable->spacePeak)
-                       hashtable->spacePeak = hashtable->spaceUsed;
+
+               /* refresh info about peak used memory */
+               ExecHashUpdateSpacePeak(hashtable);
+
+               /* Consider increasing number of batches if we filled work_mem. 
*/
                if (hashtable->spaceUsed +
-                       hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+                       hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+                       Min(hashtable->nbatch, hashtable->nbatch_inmemory) * 
sizeof(PGAlignedBlock) * 2 /* inner + outer */
                        > hashtable->spaceAllowed)
                        ExecHashIncreaseNumBatches(hashtable);
        }
        else
        {
+               BufFile **batchFile;
+
                /*
                 * put the tuple into a temp file for later batches
                 */
                Assert(batchno > hashtable->curbatch);
+
+               batchFile = ExecHashGetBatchFile(hashtable, batchno,
+                                                                               
 hashtable->innerBatchFile,
+                                                                               
 &hashtable->innerOverflowFile);
+
                ExecHashJoinSaveTuple(tuple,
                                                          hashvalue,
-                                                         
&hashtable->innerBatchFile[batchno]);
+                                                         batchFile);
        }
 }
 
@@ -1893,6 +1949,100 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
        }
 }
 
+int
+ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno)
+{
+       int     slice,
+               curslice;
+
+       if (hashtable->nbatch <= hashtable->nbatch_inmemory)
+               return batchno;
+
+       slice = batchno / hashtable->nbatch_inmemory;
+       curslice = hashtable->curbatch / hashtable->nbatch_inmemory;
+
+       /* slices can't go backwards */
+       Assert(slice >= curslice);
+
+       /* overflow slice */
+       if (slice > curslice)
+               return -1;
+
+       /* current slice, compute index in the current array */
+       return (batchno % hashtable->nbatch_inmemory);
+}
+
+BufFile **
+ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+                                        BufFile **batchFiles, BufFile 
**overflowFile)
+{
+       int             idx = ExecHashGetBatchIndex(hashtable, batchno);
+
+       if (idx == -1)
+               return overflowFile;
+
+       return &batchFiles[idx];
+}
+
+void
+ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable)
+{
+       memset(hashtable->innerBatchFile, 0,
+                  hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+       hashtable->innerBatchFile[0] = hashtable->innerOverflowFile;
+       hashtable->innerOverflowFile = NULL;
+
+       memset(hashtable->outerBatchFile, 0,
+                  hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+       hashtable->outerBatchFile[0] = hashtable->outerOverflowFile;
+       hashtable->outerOverflowFile = NULL;
+}
+
+int
+ExecHashSwitchToNextBatch(HashJoinTable hashtable)
+{
+       int             batchidx;
+
+       hashtable->curbatch++;
+
+       /* see if we skipped to the next batch slice */
+       batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch);
+
+       /* Can't be -1, current batch is in the current slice by definition. */
+       Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
+
+       /*
+        * If we skipped to the next slice of batches, reset the array of files
+        * and use the overflow file as the first batch.
+        */
+       if (batchidx == 0)
+               ExecHashSwitchToNextBatchSlice(hashtable);
+
+       return hashtable->curbatch;
+}
+
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+       Size    spaceUsed = hashtable->spaceUsed;
+
+       /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+       spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+       /* Account for memory used for batch files (inner + outer) */
+       spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
+                                sizeof(PGAlignedBlock) * 2;
+
+       /* Account for slice files (inner + outer) */
+       spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) *
+                                sizeof(PGAlignedBlock) * 2;
+
+       if (spaceUsed > hashtable->spacePeak)
+               hashtable->spacePeak = spaceUsed;
+}
+
 /*
  * ExecScanHashBucket
  *             scan a hash bucket for matches to the current outer tuple
@@ -2272,8 +2422,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash 
*node, int mcvsToUse)
                        + mcvsToUse * sizeof(int);
                hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
                        + mcvsToUse * sizeof(int);
-               if (hashtable->spaceUsed > hashtable->spacePeak)
-                       hashtable->spacePeak = hashtable->spaceUsed;
+
+               /* refresh info about peak used memory */
+               ExecHashUpdateSpacePeak(hashtable);
 
                /*
                 * Create a skew bucket for each MCV hash value.
@@ -2322,8 +2473,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash 
*node, int mcvsToUse)
                        hashtable->nSkewBuckets++;
                        hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
                        hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
-                       if (hashtable->spaceUsed > hashtable->spacePeak)
-                               hashtable->spacePeak = hashtable->spaceUsed;
+
+                       /* refresh info about peak used memory */
+                       ExecHashUpdateSpacePeak(hashtable);
                }
 
                free_attstatsslot(&sslot);
@@ -2411,8 +2563,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
        /* Account for space used, and back off if we've used too much */
        hashtable->spaceUsed += hashTupleSize;
        hashtable->spaceUsedSkew += hashTupleSize;
-       if (hashtable->spaceUsed > hashtable->spacePeak)
-               hashtable->spacePeak = hashtable->spaceUsed;
+
+       /* refresh info about peak used memory */
+       ExecHashUpdateSpacePeak(hashtable);
+
        while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
                ExecHashRemoveNextSkewBucket(hashtable);
 
@@ -2488,10 +2642,17 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
                }
                else
                {
+                       BufFile **batchFile;
+
                        /* Put the tuple into a temp file for later batches */
                        Assert(batchno > hashtable->curbatch);
+
+                       batchFile = ExecHashGetBatchFile(hashtable, batchno,
+                                                                               
         hashtable->innerBatchFile,
+                                                                               
         &hashtable->innerOverflowFile);
+
                        ExecHashJoinSaveTuple(tuple, hashvalue,
-                                                                 
&hashtable->innerBatchFile[batchno]);
+                                                                 batchFile);
                        pfree(hashTuple);
                        hashtable->spaceUsed -= tupleSize;
                        hashtable->spaceUsedSkew -= tupleSize;
@@ -2640,6 +2801,7 @@ ExecHashGetInstrumentation(HashInstrumentation 
*instrument,
        instrument->nbuckets_original = hashtable->nbuckets_original;
        instrument->nbatch = hashtable->nbatch;
        instrument->nbatch_original = hashtable->nbatch_original;
+       instrument->nbatch_inmemory = Min(hashtable->nbatch, 
hashtable->nbatch_inmemory);
        instrument->space_peak = hashtable->spacePeak;
 }
 
diff --git a/src/backend/executor/nodeHashjoin.c 
b/src/backend/executor/nodeHashjoin.c
index 5922e60eed..e59d4d8003 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -389,15 +389,22 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
                                if (batchno != hashtable->curbatch &&
                                        node->hj_CurSkewBucketNo == 
INVALID_SKEW_BUCKET_NO)
                                {
+                                       BufFile   **batchFile;
+
                                        /*
                                         * Need to postpone this outer tuple to 
a later batch.
                                         * Save it in the corresponding 
outer-batch file.
                                         */
                                        Assert(parallel_state == NULL);
                                        Assert(batchno > hashtable->curbatch);
+
+                                       batchFile = 
ExecHashGetBatchFile(hashtable, batchno,
+                                                                               
                         hashtable->outerBatchFile,
+                                                                               
                         &hashtable->outerOverflowFile);
+
                                        
ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot),
                                                                                
  hashvalue,
-                                                                               
  &hashtable->outerBatchFile[batchno]);
+                                                                               
  batchFile);
 
                                        /* Loop around, staying in 
HJ_NEED_NEW_OUTER state */
                                        continue;
@@ -849,17 +856,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
        }
        else if (curbatch < hashtable->nbatch)
        {
-               BufFile    *file = hashtable->outerBatchFile[curbatch];
+               BufFile    **file = ExecHashGetBatchFile(hashtable, curbatch,
+                                                                               
                 hashtable->outerBatchFile,
+                                                                               
                 &hashtable->outerOverflowFile);
 
                /*
                 * In outer-join cases, we could get here even though the batch 
file
                 * is empty.
                 */
-               if (file == NULL)
+               if (*file == NULL)
                        return NULL;
 
                slot = ExecHashJoinGetSavedTuple(hjstate,
-                                                                               
 file,
+                                                                               
 *file,
                                                                                
 hashvalue,
                                                                                
 hjstate->hj_OuterTupleSlot);
                if (!TupIsNull(slot))
@@ -946,9 +955,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
        BufFile    *innerFile;
        TupleTableSlot *slot;
        uint32          hashvalue;
+       int                     batchidx;
+       int                     curbatch_old;
 
        nbatch = hashtable->nbatch;
        curbatch = hashtable->curbatch;
+       curbatch_old = curbatch;
+
+       /* index of the old batch */
+       batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
+       /* has to be in the current slice of batches */
+       Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
 
        if (curbatch > 0)
        {
@@ -956,9 +974,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
                 * We no longer need the previous outer batch file; close it 
right
                 * away to free disk space.
                 */
-               if (hashtable->outerBatchFile[curbatch])
-                       BufFileClose(hashtable->outerBatchFile[curbatch]);
-               hashtable->outerBatchFile[curbatch] = NULL;
+               if (hashtable->outerBatchFile[batchidx])
+                       BufFileClose(hashtable->outerBatchFile[batchidx]);
+               hashtable->outerBatchFile[batchidx] = NULL;
        }
        else                                            /* we just finished the 
first batch */
        {
@@ -992,45 +1010,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
         * scan, we have to rescan outer batches in case they contain tuples 
that
         * need to be reassigned.
         */
-       curbatch++;
+       curbatch = ExecHashSwitchToNextBatch(hashtable);
+       batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
        while (curbatch < nbatch &&
-                  (hashtable->outerBatchFile[curbatch] == NULL ||
-                       hashtable->innerBatchFile[curbatch] == NULL))
+                  (hashtable->outerBatchFile[batchidx] == NULL ||
+                       hashtable->innerBatchFile[batchidx] == NULL))
        {
-               if (hashtable->outerBatchFile[curbatch] &&
+               if (hashtable->outerBatchFile[batchidx] &&
                        HJ_FILL_OUTER(hjstate))
                        break;                          /* must process due to 
rule 1 */
-               if (hashtable->innerBatchFile[curbatch] &&
+               if (hashtable->innerBatchFile[batchidx] &&
                        HJ_FILL_INNER(hjstate))
                        break;                          /* must process due to 
rule 1 */
-               if (hashtable->innerBatchFile[curbatch] &&
+               if (hashtable->innerBatchFile[batchidx] &&
                        nbatch != hashtable->nbatch_original)
                        break;                          /* must process due to 
rule 2 */
-               if (hashtable->outerBatchFile[curbatch] &&
+               if (hashtable->outerBatchFile[batchidx] &&
                        nbatch != hashtable->nbatch_outstart)
                        break;                          /* must process due to 
rule 3 */
                /* We can ignore this batch. */
                /* Release associated temp files right away. */
-               if (hashtable->innerBatchFile[curbatch])
-                       BufFileClose(hashtable->innerBatchFile[curbatch]);
-               hashtable->innerBatchFile[curbatch] = NULL;
-               if (hashtable->outerBatchFile[curbatch])
-                       BufFileClose(hashtable->outerBatchFile[curbatch]);
-               hashtable->outerBatchFile[curbatch] = NULL;
-               curbatch++;
+               if (hashtable->innerBatchFile[batchidx])
+                       BufFileClose(hashtable->innerBatchFile[batchidx]);
+               hashtable->innerBatchFile[batchidx] = NULL;
+               if (hashtable->outerBatchFile[batchidx])
+                       BufFileClose(hashtable->outerBatchFile[batchidx]);
+               hashtable->outerBatchFile[batchidx] = NULL;
+
+               curbatch = ExecHashSwitchToNextBatch(hashtable);
+               batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
        }
 
        if (curbatch >= nbatch)
+       {
+               hashtable->curbatch = curbatch_old;
                return false;                   /* no more batches */
-
-       hashtable->curbatch = curbatch;
+       }
 
        /*
         * Reload the hash table with the new inner batch (which could be empty)
         */
        ExecHashTableReset(hashtable);
 
-       innerFile = hashtable->innerBatchFile[curbatch];
+       innerFile = hashtable->innerBatchFile[batchidx];
 
        if (innerFile != NULL)
        {
@@ -1056,15 +1079,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
                 * needed
                 */
                BufFileClose(innerFile);
-               hashtable->innerBatchFile[curbatch] = NULL;
+               hashtable->innerBatchFile[batchidx] = NULL;
        }
 
        /*
         * Rewind outer batch file (if present), so that we can start reading 
it.
         */
-       if (hashtable->outerBatchFile[curbatch] != NULL)
+       if (hashtable->outerBatchFile[batchidx] != NULL)
        {
-               if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, 
SEEK_SET))
+               if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0L, 
SEEK_SET))
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                                         errmsg("could not rewind hash-join 
temporary file: %m")));
diff --git a/src/backend/optimizer/path/costsize.c 
b/src/backend/optimizer/path/costsize.c
index c7400941ee..e324869c09 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -3170,6 +3170,7 @@ initial_cost_hashjoin(PlannerInfo *root, 
JoinCostWorkspace *workspace,
        int                     num_hashclauses = list_length(hashclauses);
        int                     numbuckets;
        int                     numbatches;
+       int                     numbatches_inmemory;
        int                     num_skew_mcvs;
        size_t          space_allowed;  /* unused */
 
@@ -3219,6 +3220,7 @@ initial_cost_hashjoin(PlannerInfo *root, 
JoinCostWorkspace *workspace,
                                                        &space_allowed,
                                                        &numbuckets,
                                                        &numbatches,
+                                                       &numbatches_inmemory,
                                                        &num_skew_mcvs);
 
        /*
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index a9f9872a78..ef60df5024 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -308,6 +308,7 @@ typedef struct HashJoinTableData
        int                *skewBucketNums; /* array indexes of active skew 
buckets */
 
        int                     nbatch;                 /* number of batches */
+       int                     nbatch_inmemory;        /* max number of 
in-memory batches */
        int                     curbatch;               /* current batch #; 0 
during 1st pass */
 
        int                     nbatch_original;        /* nbatch when we 
started inner scan */
@@ -329,6 +330,9 @@ typedef struct HashJoinTableData
        BufFile   **innerBatchFile; /* buffered virtual temp file per batch */
        BufFile   **outerBatchFile; /* buffered virtual temp file per batch */
 
+       BufFile    *innerOverflowFile;  /* temp file for overflow batch batch */
+       BufFile    *outerOverflowFile;  /* temp file for overflow batch batch */
+
        /*
         * Info about the datatype-specific hash functions for the datatypes 
being
         * hashed. These are arrays of the same length as the number of hash 
join
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 8d700c06c5..78389bc0cf 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -16,6 +16,7 @@
 
 #include "access/parallel.h"
 #include "nodes/execnodes.h"
+#include "storage/buffile.h"
 
 struct SharedHashJoinBatch;
 
@@ -53,6 +54,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable 
hashtable,
                                                  uint32 hashvalue,
                                                  int *bucketno,
                                                  int *batchno);
+extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno);
+extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+                                        BufFile **files, BufFile **overflow);
+extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable);
+extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable);
 extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext 
*econtext);
 extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
@@ -66,6 +72,7 @@ extern void ExecChooseHashTableSize(double ntuples, int 
tupwidth, bool useskew,
                                                size_t *space_allowed,
                                                int *numbuckets,
                                                int *numbatches,
+                                               int *numbatches_inmemory,
                                                int *num_skew_mcvs);
 extern int     ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 
hashvalue);
 extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 9959c9e31f..6c53c5abd2 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2115,6 +2115,7 @@ typedef struct HashInstrumentation
        int                     nbuckets_original;      /* planned number of 
buckets */
        int                     nbatch;                 /* number of batches at 
end of execution */
        int                     nbatch_original;        /* planned number of 
batches */
+       int                     nbatch_inmemory;        /* number of batches 
kept in memory */
        size_t          space_peak;             /* speak memory usage in bytes 
*/
 } HashInstrumentation;
 
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 799a22e9d5..c957043599 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2612,6 +2612,8 @@ show_hash_info(HashState *hashstate, ExplainState *es)
                                                                   
hinstrument.nbatch, es);
                        ExplainPropertyInteger("Original Hash Batches", NULL,
                                                                   
hinstrument.nbatch_original, es);
+                       ExplainPropertyInteger("In-Memory Hash Batches", NULL,
+                                                                  
hinstrument.nbatch_original, es);
                        ExplainPropertyInteger("Peak Memory Usage", "kB",
                                                                   spacePeakKb, 
es);
                }
@@ -2619,21 +2621,38 @@ show_hash_info(HashState *hashstate, ExplainState *es)
                                 hinstrument.nbuckets_original != 
hinstrument.nbuckets)
                {
                        appendStringInfoSpaces(es->str, es->indent * 2);
-                       appendStringInfo(es->str,
-                                                        "Buckets: %d 
(originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
-                                                        hinstrument.nbuckets,
-                                                        
hinstrument.nbuckets_original,
-                                                        hinstrument.nbatch,
-                                                        
hinstrument.nbatch_original,
-                                                        spacePeakKb);
+                       if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+                               appendStringInfo(es->str,
+                                                                "Buckets: %d 
(originally %d)  Batches: %d (originally %d, in-memory %d)  Memory Usage: 
%ldkB\n",
+                                                                
hinstrument.nbuckets,
+                                                                
hinstrument.nbuckets_original,
+                                                                
hinstrument.nbatch,
+                                                                
hinstrument.nbatch_original,
+                                                                
hinstrument.nbatch_inmemory,
+                                                                spacePeakKb);
+                       else
+                               appendStringInfo(es->str,
+                                                                "Buckets: %d 
(originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
+                                                                
hinstrument.nbuckets,
+                                                                
hinstrument.nbuckets_original,
+                                                                
hinstrument.nbatch,
+                                                                
hinstrument.nbatch_original,
+                                                                spacePeakKb);
                }
                else
                {
                        appendStringInfoSpaces(es->str, es->indent * 2);
-                       appendStringInfo(es->str,
-                                                        "Buckets: %d  Batches: 
%d  Memory Usage: %ldkB\n",
-                                                        hinstrument.nbuckets, 
hinstrument.nbatch,
-                                                        spacePeakKb);
+                       if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+                               appendStringInfo(es->str,
+                                                                "Buckets: %d  
Batches: %d (in-memory: %d)  Memory Usage: %ldkB\n",
+                                                                
hinstrument.nbuckets, hinstrument.nbatch,
+                                                                
hinstrument.nbatch_inmemory,
+                                                                spacePeakKb);
+                       else
+                               appendStringInfo(es->str,
+                                                                "Buckets: %d  
Batches: %d  Memory Usage: %ldkB\n",
+                                                                
hinstrument.nbuckets, hinstrument.nbatch,
+                                                                spacePeakKb);
                }
        }
 }
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6ffaa751f2..4364eb7cdd 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable 
hashtable,
 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
 
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
 
 /* ----------------------------------------------------------------
  *             ExecHash
@@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node)
        if (hashtable->nbuckets != hashtable->nbuckets_optimal)
                ExecHashIncreaseNumBuckets(hashtable);
 
-       /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
-       hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
-       if (hashtable->spaceUsed > hashtable->spacePeak)
-               hashtable->spacePeak = hashtable->spaceUsed;
+       /* refresh info about peak used memory */
+       ExecHashUpdateSpacePeak(hashtable);
 
        hashtable->partialTuples = hashtable->totalTuples;
 }
@@ -433,6 +432,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, 
bool keepNulls)
        size_t          space_allowed;
        int                     nbuckets;
        int                     nbatch;
+       int                     nbatch_inmemory;
        double          rows;
        int                     num_skew_mcvs;
        int                     log2_nbuckets;
@@ -462,7 +462,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, 
bool keepNulls)
                                                        state->parallel_state 
!= NULL ?
                                                        
state->parallel_state->nparticipants - 1 : 0,
                                                        &space_allowed,
-                                                       &nbuckets, &nbatch, 
&num_skew_mcvs);
+                                                       &nbuckets, &nbatch, 
&nbatch_inmemory,
+                                                       &num_skew_mcvs);
 
        /* nbuckets must be a power of 2 */
        log2_nbuckets = my_log2(nbuckets);
@@ -489,6 +490,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, 
bool keepNulls)
        hashtable->nSkewBuckets = 0;
        hashtable->skewBucketNums = NULL;
        hashtable->nbatch = nbatch;
+       hashtable->nbatch_inmemory = nbatch_inmemory;
        hashtable->curbatch = 0;
        hashtable->nbatch_original = nbatch;
        hashtable->nbatch_outstart = nbatch;
@@ -498,6 +500,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, 
bool keepNulls)
        hashtable->skewTuples = 0;
        hashtable->innerBatchFile = NULL;
        hashtable->outerBatchFile = NULL;
+       hashtable->innerOverflowFiles = NULL;
+       hashtable->outerOverflowFiles = NULL;
        hashtable->spaceUsed = 0;
        hashtable->spacePeak = 0;
        hashtable->spaceAllowed = space_allowed;
@@ -559,16 +563,30 @@ ExecHashTableCreate(HashState *state, List 
*hashOperators, bool keepNulls)
 
        if (nbatch > 1 && hashtable->parallel_state == NULL)
        {
+               int     cnt = Min(nbatch, nbatch_inmemory);
+
                /*
                 * allocate and initialize the file arrays in hashCxt (not 
needed for
                 * parallel case which uses shared tuplestores instead of raw 
files)
                 */
                hashtable->innerBatchFile = (BufFile **)
-                       palloc0(nbatch * sizeof(BufFile *));
+                       palloc0(cnt * sizeof(BufFile *));
                hashtable->outerBatchFile = (BufFile **)
-                       palloc0(nbatch * sizeof(BufFile *));
+                       palloc0(cnt * sizeof(BufFile *));
                /* The files will not be opened until needed... */
                /* ... but make sure we have temp tablespaces established for 
them */
+
+               /* also allocate files for overflow batches */
+               if (nbatch > nbatch_inmemory)
+               {
+                       int nslices = (nbatch / nbatch_inmemory);
+
+                       hashtable->innerOverflowFiles = (BufFile **)
+                               palloc0(nslices * sizeof(BufFile *));
+                       hashtable->outerOverflowFiles = (BufFile **)
+                               palloc0(nslices * sizeof(BufFile *));
+               }
+
                PrepareTempTablespaces();
        }
 
@@ -665,6 +683,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool 
useskew,
                                                size_t *space_allowed,
                                                int *numbuckets,
                                                int *numbatches,
+                                               int *numbatches_inmemory,
                                                int *num_skew_mcvs)
 {
        int                     tupsize;
@@ -675,6 +694,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool 
useskew,
        long            max_pointers;
        long            mppow2;
        int                     nbatch = 1;
+       int                     nbatch_inmemory = 1;
        int                     nbuckets;
        double          dbuckets;
 
@@ -795,6 +815,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool 
useskew,
                                                                        
space_allowed,
                                                                        
numbuckets,
                                                                        
numbatches,
+                                                                       
numbatches_inmemory,
                                                                        
num_skew_mcvs);
                        return;
                }
@@ -831,11 +852,22 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, 
bool useskew,
                        nbatch <<= 1;
        }
 
+       /*
+        * See how many batches we can fit into memory (driven mostly by size
+        * of BufFile, with PGAlignedBlock being the largest part of that).
+        * We need one BufFile for inner and outer side, so we count it twice
+        * for each batch, and we stop once we exceed (work_mem/2).
+        */
+       while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2
+                       <= (work_mem * 1024L / 2))
+               nbatch_inmemory *= 2;
+
        Assert(nbuckets > 0);
        Assert(nbatch > 0);
 
        *numbuckets = nbuckets;
        *numbatches = nbatch;
+       *numbatches_inmemory = nbatch_inmemory;
 }
 
 
@@ -857,13 +889,27 @@ ExecHashTableDestroy(HashJoinTable hashtable)
         */
        if (hashtable->innerBatchFile != NULL)
        {
-               for (i = 1; i < hashtable->nbatch; i++)
+               int n = Min(hashtable->nbatch, hashtable->nbatch_inmemory);
+
+               for (i = 1; i < n; i++)
                {
                        if (hashtable->innerBatchFile[i])
                                BufFileClose(hashtable->innerBatchFile[i]);
                        if (hashtable->outerBatchFile[i])
                                BufFileClose(hashtable->outerBatchFile[i]);
                }
+
+               /* number of batch slices */
+               n = hashtable->nbatch / hashtable->nbatch_inmemory;
+
+               for (i = 1; i < n; i++)
+               {
+                       if (hashtable->innerOverflowFiles[i])
+                               BufFileClose(hashtable->innerOverflowFiles[i]);
+
+                       if (hashtable->outerOverflowFiles[i])
+                               BufFileClose(hashtable->outerOverflowFiles[i]);
+               }
        }
 
        /* Release working memory (batchCxt is a child, so it goes away too) */
@@ -909,6 +955,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 
        if (hashtable->innerBatchFile == NULL)
        {
+               /* XXX nbatch=1, no need to deal with nbatch_inmemory here */
+
                /* we had no file arrays before */
                hashtable->innerBatchFile = (BufFile **)
                        palloc0(nbatch * sizeof(BufFile *));
@@ -919,15 +967,50 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
        }
        else
        {
+               int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+
                /* enlarge arrays and zero out added entries */
                hashtable->innerBatchFile = (BufFile **)
-                       repalloc(hashtable->innerBatchFile, nbatch * 
sizeof(BufFile *));
+                       repalloc(hashtable->innerBatchFile, nbatch_tmp * 
sizeof(BufFile *));
                hashtable->outerBatchFile = (BufFile **)
-                       repalloc(hashtable->outerBatchFile, nbatch * 
sizeof(BufFile *));
-               MemSet(hashtable->innerBatchFile + oldnbatch, 0,
-                          (nbatch - oldnbatch) * sizeof(BufFile *));
-               MemSet(hashtable->outerBatchFile + oldnbatch, 0,
-                          (nbatch - oldnbatch) * sizeof(BufFile *));
+                       repalloc(hashtable->outerBatchFile, nbatch_tmp * 
sizeof(BufFile *));
+
+               if (oldnbatch < nbatch_tmp)
+               {
+                       MemSet(hashtable->innerBatchFile + oldnbatch, 0,
+                                  (nbatch_tmp - oldnbatch) * sizeof(BufFile 
*));
+                       MemSet(hashtable->outerBatchFile + oldnbatch, 0,
+                                  (nbatch_tmp - oldnbatch) * sizeof(BufFile 
*));
+               }
+
+               if (nbatch_tmp > hashtable->nbatch_inmemory)
+               {
+                       int nslices = (nbatch / hashtable->nbatch_inmemory);
+
+                       if (hashtable->innerOverflowFiles == NULL)
+                       {
+                               hashtable->innerOverflowFiles = (BufFile **)
+                                       palloc0(nslices * sizeof(BufFile *));
+                               hashtable->outerOverflowFiles = (BufFile **)
+                                       palloc0(nslices * sizeof(BufFile *));
+                       }
+                       else
+                       {
+                               hashtable->innerOverflowFiles = (BufFile **)
+                                       repalloc(hashtable->innerOverflowFiles,
+                                                        nslices * 
sizeof(BufFile *));
+                               hashtable->outerOverflowFiles = (BufFile **)
+                                       repalloc(hashtable->outerOverflowFiles,
+                                                        nslices * 
sizeof(BufFile *));
+
+                               /* we double the number of batches, so we know 
the old
+                                * value was nslices/2 exactly */
+                               memset(hashtable->innerOverflowFiles + 
nslices/2, 0,
+                                          (nslices/2) * sizeof(BufFile *));
+                               memset(hashtable->outerOverflowFiles + 
nslices/2, 0,
+                                          (nslices/2) * sizeof(BufFile *));
+                       }
+               }
        }
 
        MemoryContextSwitchTo(oldcxt);
@@ -999,11 +1082,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
                        }
                        else
                        {
+                               BufFile **batchFile;
+
                                /* dump it out */
                                Assert(batchno > curbatch);
+
+                               batchFile = ExecHashGetBatchFile(hashtable, 
batchno,
+                                                                               
                 hashtable->innerBatchFile,
+                                                                               
                 hashtable->innerOverflowFiles);
+
                                
ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
                                                                          
hashTuple->hashvalue,
-                                                                         
&hashtable->innerBatchFile[batchno]);
+                                                                         
batchFile);
 
                                hashtable->spaceUsed -= hashTupleSize;
                                nfreed++;
@@ -1647,22 +1737,33 @@ ExecHashTableInsert(HashJoinTable hashtable,
 
                /* Account for space used, and back off if we've used too much 
*/
                hashtable->spaceUsed += hashTupleSize;
-               if (hashtable->spaceUsed > hashtable->spacePeak)
-                       hashtable->spacePeak = hashtable->spaceUsed;
+
+               /* refresh info about peak used memory */
+               ExecHashUpdateSpacePeak(hashtable);
+
+               /* Consider increasing number of batches if we filled work_mem. 
*/
                if (hashtable->spaceUsed +
-                       hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+                       hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+                       Min(hashtable->nbatch, hashtable->nbatch_inmemory) * 
sizeof(PGAlignedBlock) * 2 /* inner + outer */
                        > hashtable->spaceAllowed)
                        ExecHashIncreaseNumBatches(hashtable);
        }
        else
        {
+               BufFile **batchFile;
+
                /*
                 * put the tuple into a temp file for later batches
                 */
                Assert(batchno > hashtable->curbatch);
+
+               batchFile = ExecHashGetBatchFile(hashtable, batchno,
+                                                                               
 hashtable->innerBatchFile,
+                                                                               
 hashtable->innerOverflowFiles);
+
                ExecHashJoinSaveTuple(tuple,
                                                          hashvalue,
-                                                         
&hashtable->innerBatchFile[batchno]);
+                                                         batchFile);
        }
 }
 
@@ -1893,6 +1994,108 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
        }
 }
 
+int
+ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno)
+{
+       int     slice,
+               curslice;
+
+       if (hashtable->nbatch <= hashtable->nbatch_inmemory)
+               return batchno;
+
+       slice = batchno / hashtable->nbatch_inmemory;
+       curslice = hashtable->curbatch / hashtable->nbatch_inmemory;
+
+       /* slices can't go backwards */
+       Assert(slice >= curslice);
+
+       /* overflow slice */
+       if (slice > curslice)
+               return -1;
+
+       /* current slice, compute index in the current array */
+       return (batchno % hashtable->nbatch_inmemory);
+}
+
+BufFile **
+ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+                                        BufFile **batchFiles, BufFile 
**overflowFiles)
+{
+       int             idx = ExecHashGetBatchIndex(hashtable, batchno);
+
+       /* get the right overflow file */
+       if (idx == -1)
+       {
+               int slice = (batchno / hashtable->nbatch_inmemory);
+
+               return &overflowFiles[slice];
+       }
+
+       /* batch file in the current slice */
+       return &batchFiles[idx];
+}
+
+void
+ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable)
+{
+       int     slice = (hashtable->curbatch / hashtable->nbatch_inmemory);
+
+       memset(hashtable->innerBatchFile, 0,
+                  hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+       hashtable->innerBatchFile[0] = hashtable->innerOverflowFiles[slice];
+       hashtable->innerOverflowFiles[slice] = NULL;
+
+       memset(hashtable->outerBatchFile, 0,
+                  hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+       hashtable->outerBatchFile[0] = hashtable->outerOverflowFiles[slice];
+       hashtable->outerOverflowFiles[slice] = NULL;
+}
+
+int
+ExecHashSwitchToNextBatch(HashJoinTable hashtable)
+{
+       int             batchidx;
+
+       hashtable->curbatch++;
+
+       /* see if we skipped to the next batch slice */
+       batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch);
+
+       /* Can't be -1, current batch is in the current slice by definition. */
+       Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
+
+       /*
+        * If we skipped to the next slice of batches, reset the array of files
+        * and use the overflow file as the first batch.
+        */
+       if (batchidx == 0)
+               ExecHashSwitchToNextBatchSlice(hashtable);
+
+       return hashtable->curbatch;
+}
+
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+       Size    spaceUsed = hashtable->spaceUsed;
+
+       /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+       spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+       /* Account for memory used for batch files (inner + outer) */
+       spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
+                                sizeof(PGAlignedBlock) * 2;
+
+       /* Account for slice files (inner + outer) */
+       spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) *
+                                sizeof(PGAlignedBlock) * 2;
+
+       if (spaceUsed > hashtable->spacePeak)
+               hashtable->spacePeak = spaceUsed;
+}
+
 /*
  * ExecScanHashBucket
  *             scan a hash bucket for matches to the current outer tuple
@@ -2272,8 +2475,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash 
*node, int mcvsToUse)
                        + mcvsToUse * sizeof(int);
                hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
                        + mcvsToUse * sizeof(int);
-               if (hashtable->spaceUsed > hashtable->spacePeak)
-                       hashtable->spacePeak = hashtable->spaceUsed;
+
+               /* refresh info about peak used memory */
+               ExecHashUpdateSpacePeak(hashtable);
 
                /*
                 * Create a skew bucket for each MCV hash value.
@@ -2322,8 +2526,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash 
*node, int mcvsToUse)
                        hashtable->nSkewBuckets++;
                        hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
                        hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
-                       if (hashtable->spaceUsed > hashtable->spacePeak)
-                               hashtable->spacePeak = hashtable->spaceUsed;
+
+                       /* refresh info about peak used memory */
+                       ExecHashUpdateSpacePeak(hashtable);
                }
 
                free_attstatsslot(&sslot);
@@ -2411,8 +2616,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
        /* Account for space used, and back off if we've used too much */
        hashtable->spaceUsed += hashTupleSize;
        hashtable->spaceUsedSkew += hashTupleSize;
-       if (hashtable->spaceUsed > hashtable->spacePeak)
-               hashtable->spacePeak = hashtable->spaceUsed;
+
+       /* refresh info about peak used memory */
+       ExecHashUpdateSpacePeak(hashtable);
+
        while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
                ExecHashRemoveNextSkewBucket(hashtable);
 
@@ -2488,10 +2695,17 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
                }
                else
                {
+                       BufFile **batchFile;
+
                        /* Put the tuple into a temp file for later batches */
                        Assert(batchno > hashtable->curbatch);
+
+                       batchFile = ExecHashGetBatchFile(hashtable, batchno,
+                                                                               
         hashtable->innerBatchFile,
+                                                                               
         hashtable->innerOverflowFiles);
+
                        ExecHashJoinSaveTuple(tuple, hashvalue,
-                                                                 
&hashtable->innerBatchFile[batchno]);
+                                                                 batchFile);
                        pfree(hashTuple);
                        hashtable->spaceUsed -= tupleSize;
                        hashtable->spaceUsedSkew -= tupleSize;
@@ -2640,6 +2854,7 @@ ExecHashGetInstrumentation(HashInstrumentation 
*instrument,
        instrument->nbuckets_original = hashtable->nbuckets_original;
        instrument->nbatch = hashtable->nbatch;
        instrument->nbatch_original = hashtable->nbatch_original;
+       instrument->nbatch_inmemory = Min(hashtable->nbatch, 
hashtable->nbatch_inmemory);
        instrument->space_peak = hashtable->spacePeak;
 }
 
diff --git a/src/backend/executor/nodeHashjoin.c 
b/src/backend/executor/nodeHashjoin.c
index 5922e60eed..a8db71925b 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -389,15 +389,22 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
                                if (batchno != hashtable->curbatch &&
                                        node->hj_CurSkewBucketNo == 
INVALID_SKEW_BUCKET_NO)
                                {
+                                       BufFile   **batchFile;
+
                                        /*
                                         * Need to postpone this outer tuple to 
a later batch.
                                         * Save it in the corresponding 
outer-batch file.
                                         */
                                        Assert(parallel_state == NULL);
                                        Assert(batchno > hashtable->curbatch);
+
+                                       batchFile = 
ExecHashGetBatchFile(hashtable, batchno,
+                                                                               
                         hashtable->outerBatchFile,
+                                                                               
                         hashtable->outerOverflowFiles);
+
                                        
ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot),
                                                                                
  hashvalue,
-                                                                               
  &hashtable->outerBatchFile[batchno]);
+                                                                               
  batchFile);
 
                                        /* Loop around, staying in 
HJ_NEED_NEW_OUTER state */
                                        continue;
@@ -849,17 +856,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
        }
        else if (curbatch < hashtable->nbatch)
        {
-               BufFile    *file = hashtable->outerBatchFile[curbatch];
+               BufFile    **file = ExecHashGetBatchFile(hashtable, curbatch,
+                                                                               
                 hashtable->outerBatchFile,
+                                                                               
                 hashtable->outerOverflowFiles);
 
                /*
                 * In outer-join cases, we could get here even though the batch 
file
                 * is empty.
                 */
-               if (file == NULL)
+               if (*file == NULL)
                        return NULL;
 
                slot = ExecHashJoinGetSavedTuple(hjstate,
-                                                                               
 file,
+                                                                               
 *file,
                                                                                
 hashvalue,
                                                                                
 hjstate->hj_OuterTupleSlot);
                if (!TupIsNull(slot))
@@ -946,9 +955,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
        BufFile    *innerFile;
        TupleTableSlot *slot;
        uint32          hashvalue;
+       int                     batchidx;
+       int                     curbatch_old;
 
        nbatch = hashtable->nbatch;
        curbatch = hashtable->curbatch;
+       curbatch_old = curbatch;
+
+       /* index of the old batch */
+       batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
+       /* has to be in the current slice of batches */
+       Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
 
        if (curbatch > 0)
        {
@@ -956,9 +974,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
                 * We no longer need the previous outer batch file; close it 
right
                 * away to free disk space.
                 */
-               if (hashtable->outerBatchFile[curbatch])
-                       BufFileClose(hashtable->outerBatchFile[curbatch]);
-               hashtable->outerBatchFile[curbatch] = NULL;
+               if (hashtable->outerBatchFile[batchidx])
+                       BufFileClose(hashtable->outerBatchFile[batchidx]);
+               hashtable->outerBatchFile[batchidx] = NULL;
        }
        else                                            /* we just finished the 
first batch */
        {
@@ -992,45 +1010,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
         * scan, we have to rescan outer batches in case they contain tuples 
that
         * need to be reassigned.
         */
-       curbatch++;
+       curbatch = ExecHashSwitchToNextBatch(hashtable);
+       batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
        while (curbatch < nbatch &&
-                  (hashtable->outerBatchFile[curbatch] == NULL ||
-                       hashtable->innerBatchFile[curbatch] == NULL))
+                  (hashtable->outerBatchFile[batchidx] == NULL ||
+                       hashtable->innerBatchFile[batchidx] == NULL))
        {
-               if (hashtable->outerBatchFile[curbatch] &&
+               if (hashtable->outerBatchFile[batchidx] &&
                        HJ_FILL_OUTER(hjstate))
                        break;                          /* must process due to 
rule 1 */
-               if (hashtable->innerBatchFile[curbatch] &&
+               if (hashtable->innerBatchFile[batchidx] &&
                        HJ_FILL_INNER(hjstate))
                        break;                          /* must process due to 
rule 1 */
-               if (hashtable->innerBatchFile[curbatch] &&
+               if (hashtable->innerBatchFile[batchidx] &&
                        nbatch != hashtable->nbatch_original)
                        break;                          /* must process due to 
rule 2 */
-               if (hashtable->outerBatchFile[curbatch] &&
+               if (hashtable->outerBatchFile[batchidx] &&
                        nbatch != hashtable->nbatch_outstart)
                        break;                          /* must process due to 
rule 3 */
                /* We can ignore this batch. */
                /* Release associated temp files right away. */
-               if (hashtable->innerBatchFile[curbatch])
-                       BufFileClose(hashtable->innerBatchFile[curbatch]);
-               hashtable->innerBatchFile[curbatch] = NULL;
-               if (hashtable->outerBatchFile[curbatch])
-                       BufFileClose(hashtable->outerBatchFile[curbatch]);
-               hashtable->outerBatchFile[curbatch] = NULL;
-               curbatch++;
+               if (hashtable->innerBatchFile[batchidx])
+                       BufFileClose(hashtable->innerBatchFile[batchidx]);
+               hashtable->innerBatchFile[batchidx] = NULL;
+               if (hashtable->outerBatchFile[batchidx])
+                       BufFileClose(hashtable->outerBatchFile[batchidx]);
+               hashtable->outerBatchFile[batchidx] = NULL;
+
+               curbatch = ExecHashSwitchToNextBatch(hashtable);
+               batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
        }
 
        if (curbatch >= nbatch)
+       {
+               hashtable->curbatch = curbatch_old;
                return false;                   /* no more batches */
-
-       hashtable->curbatch = curbatch;
+       }
 
        /*
         * Reload the hash table with the new inner batch (which could be empty)
         */
        ExecHashTableReset(hashtable);
 
-       innerFile = hashtable->innerBatchFile[curbatch];
+       innerFile = hashtable->innerBatchFile[batchidx];
 
        if (innerFile != NULL)
        {
@@ -1056,15 +1079,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
                 * needed
                 */
                BufFileClose(innerFile);
-               hashtable->innerBatchFile[curbatch] = NULL;
+               hashtable->innerBatchFile[batchidx] = NULL;
        }
 
        /*
         * Rewind outer batch file (if present), so that we can start reading 
it.
         */
-       if (hashtable->outerBatchFile[curbatch] != NULL)
+       if (hashtable->outerBatchFile[batchidx] != NULL)
        {
-               if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, 
SEEK_SET))
+               if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0L, 
SEEK_SET))
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                                         errmsg("could not rewind hash-join 
temporary file: %m")));
diff --git a/src/backend/optimizer/path/costsize.c 
b/src/backend/optimizer/path/costsize.c
index c7400941ee..e324869c09 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -3170,6 +3170,7 @@ initial_cost_hashjoin(PlannerInfo *root, 
JoinCostWorkspace *workspace,
        int                     num_hashclauses = list_length(hashclauses);
        int                     numbuckets;
        int                     numbatches;
+       int                     numbatches_inmemory;
        int                     num_skew_mcvs;
        size_t          space_allowed;  /* unused */
 
@@ -3219,6 +3220,7 @@ initial_cost_hashjoin(PlannerInfo *root, 
JoinCostWorkspace *workspace,
                                                        &space_allowed,
                                                        &numbuckets,
                                                        &numbatches,
+                                                       &numbatches_inmemory,
                                                        &num_skew_mcvs);
 
        /*
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index a9f9872a78..311a0980ee 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -308,6 +308,7 @@ typedef struct HashJoinTableData
        int                *skewBucketNums; /* array indexes of active skew 
buckets */
 
        int                     nbatch;                 /* number of batches */
+       int                     nbatch_inmemory;        /* max number of 
in-memory batches */
        int                     curbatch;               /* current batch #; 0 
during 1st pass */
 
        int                     nbatch_original;        /* nbatch when we 
started inner scan */
@@ -329,6 +330,9 @@ typedef struct HashJoinTableData
        BufFile   **innerBatchFile; /* buffered virtual temp file per batch */
        BufFile   **outerBatchFile; /* buffered virtual temp file per batch */
 
+       BufFile   **innerOverflowFiles; /* temp file for inner overflow batches 
*/
+       BufFile   **outerOverflowFiles; /* temp file for outer overflow batches 
*/
+
        /*
         * Info about the datatype-specific hash functions for the datatypes 
being
         * hashed. These are arrays of the same length as the number of hash 
join
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 8d700c06c5..bb6b24a1b4 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -16,6 +16,7 @@
 
 #include "access/parallel.h"
 #include "nodes/execnodes.h"
+#include "storage/buffile.h"
 
 struct SharedHashJoinBatch;
 
@@ -53,6 +54,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable 
hashtable,
                                                  uint32 hashvalue,
                                                  int *bucketno,
                                                  int *batchno);
+extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno);
+extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+                                        BufFile **batchFiles, BufFile 
**overflowFiles);
+extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable);
+extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable);
 extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext 
*econtext);
 extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
@@ -66,6 +72,7 @@ extern void ExecChooseHashTableSize(double ntuples, int 
tupwidth, bool useskew,
                                                size_t *space_allowed,
                                                int *numbuckets,
                                                int *numbatches,
+                                               int *numbatches_inmemory,
                                                int *num_skew_mcvs);
 extern int     ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 
hashvalue);
 extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 9959c9e31f..6c53c5abd2 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2115,6 +2115,7 @@ typedef struct HashInstrumentation
        int                     nbuckets_original;      /* planned number of 
buckets */
        int                     nbatch;                 /* number of batches at 
end of execution */
        int                     nbatch_original;        /* planned number of 
batches */
+       int                     nbatch_inmemory;        /* number of batches 
kept in memory */
        size_t          space_peak;             /* speak memory usage in bytes 
*/
 } HashInstrumentation;
 

Attachment: hashjoin-test.sh
Description: Bourne shell script

Reply via email to