Addressed all your comments one way or another, new patch attached. Comments on some specific points below:

On 09/12/2016 01:13 AM, Peter Geoghegan wrote:
Other things I noticed:

* You should probably point out that typically, access to batch memory
will still be sequential, despite your block-based scheme. The
preloading will now more or less make that the normal case. Any
fragmentation will now be essentially in memory, not on disk, which is
a big win.

That's not true, the "buffers" in batch memory are not accessed sequentially. When we pull the next tuple from a tape, we store it in the next free buffer. Usually, that buffer was used to hold the previous tuple that was returned from gettuple(), and was just added to the free list.

It's still quite cache-friendly, though, because we only need a small number of slots (one for each tape).

* i think you should move "bool   *mergeactive; /* active input run
source? */" within Tuplesortstate to be next to the other batch memory
stuff. No point in having separate merge and batch "sections" there
anymore.

Hmm. I think I prefer to keep the memory management stuff in a separate section. While it's true that it's currently only used during merging, it's not hard to imagine using it when building the initial runs, for example. Except for replacement selection, the pattern for building the runs is: add a bunch of tuples, sort, flush them out. It would be straightforward to use one large chunk of memory to hold all the tuples. I'm not going to do that now, but I think keeping the memory management stuff separate from merge-related state makes sense.

* I think that you need to comment on why state->tuplecontext is not
used for batch memory now. It is still useful, for multiple merge
passes, but the situation has notably changed for it.

Hmm. We don't actually use it after the initial runs at all anymore. I added a call to destroy it in mergeruns().

Now that we use the batch memory buffers for allocations < 1 kB (I pulled that number out of a hat, BTW), and we only need one allocation per tape (plus one), there's not much risk of fragmentation.

On Sun, Sep 11, 2016 at 3:13 PM, Peter Geoghegan <p...@heroku.com> wrote:
* Doesn't this code need to call MemoryContextAllocHuge() rather than palloc()?:

@@ -709,18 +765,19 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool 
forWrite)
            Assert(lt->frozen);
            datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
        }
+
+       /* Allocate a read buffer */
+       if (lt->buffer)
+           pfree(lt->buffer);
+       lt->buffer = palloc(lt->read_buffer_size);
+       lt->buffer_size = lt->read_buffer_size;

Of course, when you do that you're going to have to make the new
"buffer_size" and "read_buffer_size" fields of type "Size" (or,
possibly, "int64", to match tuplesort.c's own buffer sizing variables
ever since Noah added MaxAllocSizeHuge). Ditto for the existing "pos"
and "nbytes" fields next to "buffer_size" within the struct
LogicalTape, I think. ISTM that logtape.c blocknums can remain of type
"long", though, since that reflects an existing hardly-relevant
limitation that you're not making any worse.

True. I fixed that by putting a MaxAllocSize cap on the buffer size instead. I doubt that doing > 1 GB of read-ahead of a single tape will do any good.

I wonder if we should actually have a smaller cap there. Even 1 GB seems excessive. Might be better to start the merging sooner, rather than wait for the read of 1 GB to complete. The point of the OS readahead is that the OS will do that for us, in the background. And other processes might have better use for the memory anyway.

* It couldn't hurt to make this code paranoid about LACKMEM() becoming
true, which will cause havoc (we saw this recently in 9.6; a patch of
mine to fix that just went in):

+   /*
+    * Use all the spare memory we have available for read buffers. Divide it
+    * memory evenly among all the tapes.
+    */
+   avail_blocks = state->availMem / BLCKSZ;
+   per_tape = avail_blocks / maxTapes;
+   cutoff = avail_blocks % maxTapes;
+   if (per_tape == 0)
+   {
+       per_tape = 1;
+       cutoff = 0;
+   }
+   for (tapenum = 0; tapenum < maxTapes; tapenum++)
+   {
+       LogicalTapeAssignReadBufferSize(state->tapeset, tapenum,
+                                       (per_tape + (tapenum < cutoff ? 1 : 0)) 
* BLCKSZ);
+   }

In other words, we really don't want availMem to become < 0, since
it's int64, but a derived value is passed to
LogicalTapeAssignReadBufferSize() as an argument of type "Size". Now,
if LACKMEM() did happen it would be a bug anyway, but I recommend
defensive code also be added. Per grow_memtuples(), "We need to be
sure that we do not cause LACKMEM to become true, else the space
management algorithm will go nuts". Let's be sure that we get that
right, since, as we saw recently, especially since grow_memtuples()
will not actually have the chance to save us here (if there is a bug
along these lines, let's at least make the right "can't happen error"
complaint to user when it pops up).

Hmm. We don't really need the availMem accounting at all, after we have started merging. There is nothing we can do to free memory if we run out, and we use fairly little memory anyway. But yes, better safe than sorry. I tried to clarify the comments on that.

* It looks like your patch makes us less eager about freeing per-tape
batch memory, now held as preload buffer space within logtape.c.

ISTM that there should be some way to have the "tape exhausted" code
path within tuplesort_gettuple_common() (as well as the similar spot
within mergeonerun()) instruct logtape.c that we're done with that
tape. In other words, when mergeprereadone() (now renamed to
mergereadnext()) detects the tape is exhausted, it should have
logtape.c free its huge tape buffer immediately. Think about cases
where input is presorted, and earlier tapes can be freed quite early
on. It's worth keeping that around, (you removed the old way that this
happened, through mergebatchfreetape()).

OK. I solved that by calling LogicalTapeRewind(), when we're done reading a tape. Rewinding a tape has the side-effect of freeing the buffer. I was going to put that into mergereadnext(), but it turns out that it's tricky to figure out if there are any more runs on the same tape, because we have the "actual" tape number there, but the tp_runs is indexed by "logical" tape number. So I put the rewind calls at the end of mergeruns(), and in TSS_FINALMERGE processing, instead. It means that we don't free the buffers quite as early as we could, but I think this is good enough.

On Sun, Sep 11, 2016 at 3:13 PM, Peter Geoghegan <p...@heroku.com> wrote:
+   for (tapenum = 0; tapenum < maxTapes; tapenum++)
+   {
+       LogicalTapeAssignReadBufferSize(state->tapeset, tapenum,
+                                       (per_tape + (tapenum < cutoff ? 1 : 0)) 
* BLCKSZ);
+   }

Spotted another issue with this code just now. Shouldn't it be based
on state->tapeRange? You don't want the destTape to get memory, since
you don't use batch memory for tapes that are written to (and final
on-the-fly merges don't use their destTape at all).

logtape.c will only actually allocate the memory when reading.

(Looks again...)

Wait, you're using a local variable maxTapes here, which potentially
differs from state->maxTapes:

+   /*
+    * If we had fewer runs than tapes, refund buffers for tapes that were never
+    * allocated.
+    */
+   maxTapes = state->maxTapes;
+   if (state->currentRun < maxTapes)
+   {
+       FREEMEM(state, (maxTapes - state->currentRun) * TAPE_BUFFER_OVERHEAD);
+       maxTapes = state->currentRun;
+   }

I find this confusing, and think it's probably buggy. I don't think
you should have a local variable called maxTapes that you modify at
all, since state->maxTapes is supposed to not change once established.

I changed that so that it does actually change state->maxTapes. I considered having a separate numTapes field, that can be smaller than maxTapes, but we don't need the original maxTapes value after that point anymore, so it would've been just pro forma to track them separately. I hope the comment now explains that better.

You can't use state->currentRun like that, either, I think, because
it's the high watermark number of runs (quicksorted runs), not runs
after any particular merge phase, where we end up with fewer runs as
they're merged (we must also consider dummy runs to get this) -- we
want something like activeTapes. cf. the code you removed for the
beginmerge() finalMergeBatch case. Of course, activeTapes will vary if
there are multiple merge passes, which suggests all this code really
has no business being in mergeruns() (it should instead be in
beginmerge(), or code that beginmerge() reliably calls).

Hmm, yes, using currentRun here is wrong. It needs to be "currentRun + 1", because we need one more tape than there are runs, to hold the output.

Note that I'm not re-allocating the read buffers depending on which tapes are used in the current merge pass. I'm just dividing up the memory among all tapes. Now that the pre-reading is done in logtape.c, when we reach the end of a run on a tape, we will already have data from the next run on the same tape in the read buffer.

Immediately afterwards, you do this:

+   /* Initialize the merge tuple buffer arena.  */
+   state->batchMemoryBegin = palloc((maxTapes + 1) * MERGETUPLEBUFFER_SIZE);
+   state->batchMemoryEnd = state->batchMemoryBegin + (maxTapes + 1) * 
MERGETUPLEBUFFER_SIZE;
+   state->freeBufferHead = (MergeTupleBuffer *) state->batchMemoryBegin;
+   USEMEM(state, (maxTapes + 1) * MERGETUPLEBUFFER_SIZE);

The fact that you size the buffer based on "maxTapes + 1" also
suggests a problem. I think that the code looks like this because it
must instruct logtape.c that the destTape tape requires some buffer
(iff there is to be a non-final merge). Is that right? I hope that you
don't give the typically unused destTape tape a full share of batch
memory all the time (the same applies to any other
inactive-at-final-merge tapes).

Ah, no, the "+ 1" comes from the need to hold the tuple that we last returned to the caller in tuplesort_gettuple, until the next call. See lastReturnedTuple. I tried to clarify the comments on that.

Thanks for the thorough review! Let me know how this looks now.

- Heikki
>From 19c88b74bd0302185bd24ce1e0abcef796db5afd Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Wed, 14 Sep 2016 17:29:11 +0300
Subject: [PATCH 1/1] Change the way pre-reading in external sort's merge phase
 works.

Don't pre-read tuples into SortTuple slots during merge. Instead, use the
memory for larger read buffers in logtape.c. We're doing the same number
of READTUP() calls either way, but managing the pre-read SortTuple slots
is much more complicated. Also, the on-tape representation is more compact
than SortTuples, so we can fit more pre-read tuples into the same amount
of memory this way. And we have better cache-locality, when we use just a
small number of SortTuple slots.

Now that we only hold one tuple from each tape in the SortTuple slots, we
can greatly simplify the "batch memory" management. We now maintain a
small set of fixed-sized buffers, to hold the tuples, and fall back to
palloc() for larger tuples. We use this method during all merge phases,
not just the final merge, and also when randomAccess is requested, and
also in the TSS_SORTEDONTAPE. In other words, it's used whenever we do
an external sort.

Reviewed by Peter Geoghegan.
---
 src/backend/utils/sort/logtape.c   |  153 ++++-
 src/backend/utils/sort/tuplesort.c | 1130 ++++++++++++------------------------
 src/include/utils/logtape.h        |    1 +
 3 files changed, 492 insertions(+), 792 deletions(-)

diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 7745207..4152da1 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -52,12 +52,17 @@
  * not clear this helps much, but it can't hurt.  (XXX perhaps a LIFO
  * policy for free blocks would be better?)
  *
+ * To further make the I/Os more sequential, we can use a larger buffer
+ * when reading, and read multiple blocks from the same tape in one go,
+ * whenever the buffer becomes empty. LogicalTapeAssignReadBufferSize()
+ * can be used to set the size of the read buffer.
+ *
  * To support the above policy of writing to the lowest free block,
  * ltsGetFreeBlock sorts the list of free block numbers into decreasing
  * order each time it is asked for a block and the list isn't currently
  * sorted.  This is an efficient way to handle it because we expect cycles
  * of releasing many blocks followed by re-using many blocks, due to
- * tuplesort.c's "preread" behavior.
+ * the larger read buffer.
  *
  * Since all the bookkeeping and buffer memory is allocated with palloc(),
  * and the underlying file(s) are made with OpenTemporaryFile, all resources
@@ -79,6 +84,7 @@
 
 #include "storage/buffile.h"
 #include "utils/logtape.h"
+#include "utils/memutils.h"
 
 /*
  * Block indexes are "long"s, so we can fit this many per indirect block.
@@ -131,9 +137,18 @@ typedef struct LogicalTape
 	 * reading.
 	 */
 	char	   *buffer;			/* physical buffer (separately palloc'd) */
+	int			buffer_size;	/* allocated size of the buffer */
 	long		curBlockNumber; /* this block's logical blk# within tape */
 	int			pos;			/* next read/write position in buffer */
 	int			nbytes;			/* total # of valid bytes in buffer */
+
+	/*
+	 * Desired buffer size to use when reading.  To keep things simple, we
+	 * use a single-block buffer when writing, or when reading a frozen
+	 * tape.  But when we are reading and will only read forwards, we
+	 * allocate a larger buffer, determined by read_buffer_size.
+	 */
+	int			read_buffer_size;
 } LogicalTape;
 
 /*
@@ -228,6 +243,53 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
 }
 
 /*
+ * Read as many blocks as we can into the per-tape buffer.
+ *
+ * The caller can specify the next physical block number to read, in
+ * datablocknum, or -1 to fetch the next block number from the internal block.
+ * If datablocknum == -1, the caller must've already set curBlockNumber.
+ *
+ * Returns true if anything was read, 'false' on EOF.
+ */
+static bool
+ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt, long datablocknum)
+{
+	lt->pos = 0;
+	lt->nbytes = 0;
+
+	do
+	{
+		/* Fetch next block number (unless provided by caller) */
+		if (datablocknum == -1)
+		{
+			datablocknum = ltsRecallNextBlockNum(lts, lt->indirect, lt->frozen);
+			if (datablocknum == -1L)
+				break;			/* EOF */
+			lt->curBlockNumber++;
+		}
+
+		/* Read the block */
+		ltsReadBlock(lts, datablocknum, (void *) (lt->buffer + lt->nbytes));
+		if (!lt->frozen)
+			ltsReleaseBlock(lts, datablocknum);
+
+		if (lt->curBlockNumber < lt->numFullBlocks)
+			lt->nbytes += BLCKSZ;
+		else
+		{
+			/* EOF */
+			lt->nbytes += lt->lastBlockBytes;
+			break;
+		}
+
+		/* Advance to next block, if we have buffer space left */
+		datablocknum = -1;
+	} while (lt->nbytes < lt->buffer_size);
+
+	return (lt->nbytes > 0);
+}
+
+/*
  * qsort comparator for sorting freeBlocks[] into decreasing order.
  */
 static int
@@ -546,6 +608,8 @@ LogicalTapeSetCreate(int ntapes)
 		lt->numFullBlocks = 0L;
 		lt->lastBlockBytes = 0;
 		lt->buffer = NULL;
+		lt->buffer_size = 0;
+		lt->read_buffer_size = BLCKSZ;
 		lt->curBlockNumber = 0L;
 		lt->pos = 0;
 		lt->nbytes = 0;
@@ -628,7 +692,10 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
 
 	/* Allocate data buffer and first indirect block on first write */
 	if (lt->buffer == NULL)
+	{
 		lt->buffer = (char *) palloc(BLCKSZ);
+		lt->buffer_size = BLCKSZ;
+	}
 	if (lt->indirect == NULL)
 	{
 		lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock));
@@ -636,6 +703,7 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
 		lt->indirect->nextup = NULL;
 	}
 
+	Assert(lt->buffer_size == BLCKSZ);
 	while (size > 0)
 	{
 		if (lt->pos >= BLCKSZ)
@@ -709,18 +777,19 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
 			Assert(lt->frozen);
 			datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
 		}
+
+		/* Allocate a read buffer */
+		if (lt->buffer)
+			pfree(lt->buffer);
+		lt->buffer = palloc(lt->read_buffer_size);
+		lt->buffer_size = lt->read_buffer_size;
+
 		/* Read the first block, or reset if tape is empty */
 		lt->curBlockNumber = 0L;
 		lt->pos = 0;
 		lt->nbytes = 0;
 		if (datablocknum != -1L)
-		{
-			ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
-			if (!lt->frozen)
-				ltsReleaseBlock(lts, datablocknum);
-			lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
-				BLCKSZ : lt->lastBlockBytes;
-		}
+			ltsReadFillBuffer(lts, lt, datablocknum);
 	}
 	else
 	{
@@ -754,6 +823,13 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
 		lt->curBlockNumber = 0L;
 		lt->pos = 0;
 		lt->nbytes = 0;
+
+		if (lt->buffer)
+		{
+			pfree(lt->buffer);
+			lt->buffer = NULL;
+			lt->buffer_size = 0;
+		}
 	}
 }
 
@@ -779,20 +855,8 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
 		if (lt->pos >= lt->nbytes)
 		{
 			/* Try to load more data into buffer. */
-			long		datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
-															 lt->frozen);
-
-			if (datablocknum == -1L)
+			if (!ltsReadFillBuffer(lts, lt, -1))
 				break;			/* EOF */
-			lt->curBlockNumber++;
-			lt->pos = 0;
-			ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
-			if (!lt->frozen)
-				ltsReleaseBlock(lts, datablocknum);
-			lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
-				BLCKSZ : lt->lastBlockBytes;
-			if (lt->nbytes <= 0)
-				break;			/* EOF (possible here?) */
 		}
 
 		nthistime = lt->nbytes - lt->pos;
@@ -842,6 +906,22 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
 	lt->writing = false;
 	lt->frozen = true;
 	datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true);
+
+	/*
+	 * The seek and backspace functions assume a single block read buffer.
+	 * That's OK with current usage. A larger buffer is helpful to make the
+	 * read pattern of the backing file look more sequential to the OS, when
+	 * we're reading from multiple tapes. But at the end of a sort, when a
+	 * tape is frozen, we only read from a single tape anyway.
+	 */
+	if (!lt->buffer || lt->buffer_size != BLCKSZ)
+	{
+		if (lt->buffer)
+			pfree(lt->buffer);
+		lt->buffer = palloc(BLCKSZ);
+		lt->buffer_size = BLCKSZ;
+	}
+
 	/* Read the first block, or reset if tape is empty */
 	lt->curBlockNumber = 0L;
 	lt->pos = 0;
@@ -875,6 +955,7 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
 	lt = &lts->tapes[tapenum];
 	Assert(lt->frozen);
+	Assert(lt->buffer_size == BLCKSZ);
 
 	/*
 	 * Easy case for seek within current block.
@@ -941,6 +1022,7 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
 	lt = &lts->tapes[tapenum];
 	Assert(lt->frozen);
 	Assert(offset >= 0 && offset <= BLCKSZ);
+	Assert(lt->buffer_size == BLCKSZ);
 
 	/*
 	 * Easy case for seek within current block.
@@ -1002,6 +1084,10 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
 
 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
 	lt = &lts->tapes[tapenum];
+
+	/* With a larger buffer, 'pos' wouldn't be the same as offset within page */
+	Assert(lt->buffer_size == BLCKSZ);
+
 	*blocknum = lt->curBlockNumber;
 	*offset = lt->pos;
 }
@@ -1014,3 +1100,28 @@ LogicalTapeSetBlocks(LogicalTapeSet *lts)
 {
 	return lts->nFileBlocks;
 }
+
+/*
+ * Set buffer size to use, when reading from given tape.
+ */
+void
+LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t avail_mem)
+{
+	LogicalTape *lt;
+
+	Assert(tapenum >= 0 && tapenum < lts->nTapes);
+	lt = &lts->tapes[tapenum];
+
+	/*
+	 * The buffer size must be a multiple of BLCKSZ in size, so round the
+	 * given value down to nearest BLCKSZ. Make sure we have at least one page.
+	 * Also, don't go above MaxAllocSize, to avoid erroring out. A multi-gigabyte
+	 * buffer is unlikely to be helpful, anyway.
+	 */
+	if (avail_mem < BLCKSZ)
+		avail_mem = BLCKSZ;
+	if (avail_mem > MaxAllocSize)
+		avail_mem = MaxAllocSize;
+	avail_mem -= avail_mem % BLCKSZ;
+	lt->read_buffer_size = avail_mem;
+}
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index d600670..131dbef 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -14,13 +14,13 @@
  * sorting algorithm.  Historically, we divided the input into sorted runs
  * using replacement selection, in the form of a priority tree implemented
  * as a heap (essentially his Algorithm 5.2.3H -- although that strategy is
- * often avoided altogether), but that can now only happen first the first
+ * often avoided altogether), but now we only do that for the first
  * run.  We merge the runs using polyphase merge, Knuth's Algorithm
  * 5.4.2D.  The logical "tapes" used by Algorithm D are implemented by
  * logtape.c, which avoids space wastage by recycling disk space as soon
  * as each block is read from its "tape".
  *
- * We never form the initial runs using Knuth's recommended replacement
+ * We do not form the initial runs using Knuth's recommended replacement
  * selection data structure (Algorithm 5.4.1R), because it uses a fixed
  * number of records in memory at all times.  Since we are dealing with
  * tuples that may vary considerably in size, we want to be able to vary
@@ -36,11 +36,11 @@
  *
  * In PostgreSQL 9.6, a heap (based on Knuth's Algorithm H, with some small
  * customizations) is only used with the aim of producing just one run,
- * thereby avoiding all merging.  Only the first run can use replacement
+ * thereby avoiding all merging.  Only the first run uses replacement
  * selection, which is why there are now only two possible valid run
  * numbers, and why heapification is customized to not distinguish between
  * tuples in the second run (those will be quicksorted).  We generally
- * prefer a simple hybrid sort-merge strategy, where runs are sorted in much
+ * prefer a simple hybrid sort-merge strategy, where runs are sorted in
  * the same way as the entire input of an internal sort is sorted (using
  * qsort()).  The replacement_sort_tuples GUC controls the limited remaining
  * use of replacement selection for the first run.
@@ -74,7 +74,7 @@
  * the merge is complete.  The basic merge algorithm thus needs very little
  * memory --- only M tuples for an M-way merge, and M is constrained to a
  * small number.  However, we can still make good use of our full workMem
- * allocation by pre-reading additional tuples from each source tape.  Without
+ * allocation, to pre-read blocks from each source tape.  Without
  * prereading, our access pattern to the temporary file would be very erratic;
  * on average we'd read one block from each of M source tapes during the same
  * time that we're writing M blocks to the output tape, so there is no
@@ -84,10 +84,10 @@
  * worse when it comes time to read that tape.  A straightforward merge pass
  * thus ends up doing a lot of waiting for disk seeks.  We can improve matters
  * by prereading from each source tape sequentially, loading about workMem/M
- * bytes from each tape in turn.  Then we run the merge algorithm, writing but
- * not reading until one of the preloaded tuple series runs out.  Then we
- * switch back to preread mode, fill memory again, and repeat.  This approach
- * helps to localize both read and write accesses.
+ * bytes from each tape in turn, and making the sequential blocks immediately
+ * available for reuse.  This approach helps to localize both read and  write
+ * accesses. The pre-reading is handled by logtape.c, we just tell it how
+ * much memory to use for the buffers.
  *
  * When the caller requests random access to the sort result, we form
  * the final sorted run on a logical tape which is then "frozen", so
@@ -162,7 +162,7 @@ bool		optimize_bounded_sort = true;
  * The objects we actually sort are SortTuple structs.  These contain
  * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
  * which is a separate palloc chunk --- we assume it is just one chunk and
- * can be freed by a simple pfree() (except during final on-the-fly merge,
+ * can be freed by a simple pfree() (except during merge,
  * when memory is used in batch).  SortTuples also contain the tuple's
  * first key column in Datum/nullflag format, and an index integer.
  *
@@ -191,9 +191,8 @@ bool		optimize_bounded_sort = true;
  * it now only distinguishes RUN_FIRST and HEAP_RUN_NEXT, since replacement
  * selection is always abandoned after the first run; no other run number
  * should be represented here.  During merge passes, we re-use it to hold the
- * input tape number that each tuple in the heap was read from, or to hold the
- * index of the next tuple pre-read from the same tape in the case of pre-read
- * entries.  tupindex goes unused if the sort occurs entirely in memory.
+ * input tape number that each tuple in the heap was read from.  tupindex goes
+ * unused if the sort occurs entirely in memory.
  */
 typedef struct
 {
@@ -203,6 +202,20 @@ typedef struct
 	int			tupindex;		/* see notes above */
 } SortTuple;
 
+/*
+ * During merge, we use a pre-allocated set of fixed-size buffers to store
+ * tuples in. To avoid palloc/pfree overhead.
+ *
+ * 'nextfree' is valid when this chunk is in the free list. When in use, the
+ * buffer holds a tuple.
+ */
+#define MERGETUPLEBUFFER_SIZE 1024
+
+typedef union MergeTupleBuffer
+{
+	union MergeTupleBuffer *nextfree;
+	char		buffer[MERGETUPLEBUFFER_SIZE];
+} MergeTupleBuffer;
 
 /*
  * Possible states of a Tuplesort object.  These denote the states that
@@ -288,41 +301,28 @@ struct Tuplesortstate
 	/*
 	 * Function to write a stored tuple onto tape.  The representation of the
 	 * tuple on tape need not be the same as it is in memory; requirements on
-	 * the tape representation are given below.  After writing the tuple,
-	 * pfree() the out-of-line data (not the SortTuple struct!), and increase
-	 * state->availMem by the amount of memory space thereby released.
+	 * the tape representation are given below.  If !batchUsed, after writing
+	 * the tuple, pfree() the out-of-line data (not the SortTuple struct!),
+	 * and increase state->availMem by the amount of memory space thereby
+	 * released.
 	 */
 	void		(*writetup) (Tuplesortstate *state, int tapenum,
 										 SortTuple *stup);
 
 	/*
 	 * Function to read a stored tuple from tape back into memory. 'len' is
-	 * the already-read length of the stored tuple.  Create a palloc'd copy,
-	 * initialize tuple/datum1/isnull1 in the target SortTuple struct, and
-	 * decrease state->availMem by the amount of memory space consumed. (See
-	 * batchUsed notes for details on how memory is handled when incremental
-	 * accounting is abandoned.)
+	 * the already-read length of the stored tuple.  The tuple is stored in
+	 * the batch memory arena, or is palloc'd, see readtup_alloc().
 	 */
 	void		(*readtup) (Tuplesortstate *state, SortTuple *stup,
 										int tapenum, unsigned int len);
 
 	/*
-	 * Function to move a caller tuple.  This is usually implemented as a
-	 * memmove() shim, but function may also perform additional fix-up of
-	 * caller tuple where needed.  Batch memory support requires the movement
-	 * of caller tuples from one location in memory to another.
-	 */
-	void		(*movetup) (void *dest, void *src, unsigned int len);
-
-	/*
 	 * This array holds the tuples now in sort memory.  If we are in state
 	 * INITIAL, the tuples are in no particular order; if we are in state
 	 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
 	 * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
-	 * H.  (Note that memtupcount only counts the tuples that are part of the
-	 * heap --- during merge passes, memtuples[] entries beyond tapeRange are
-	 * never in the heap and are used to hold pre-read tuples.)  In state
-	 * SORTEDONTAPE, the array is not used.
+	 * H. In state SORTEDONTAPE, the array is not used.
 	 */
 	SortTuple  *memtuples;		/* array of SortTuple structs */
 	int			memtupcount;	/* number of tuples currently present */
@@ -332,12 +332,40 @@ struct Tuplesortstate
 	/*
 	 * Memory for tuples is sometimes allocated in batch, rather than
 	 * incrementally.  This implies that incremental memory accounting has
-	 * been abandoned.  Currently, this only happens for the final on-the-fly
-	 * merge step.  Large batch allocations can store tuples (e.g.
-	 * IndexTuples) without palloc() fragmentation and other overhead.
+	 * been abandoned.  Currently, this happens when we start merging.
+	 * Large batch allocations can store tuples (e.g. IndexTuples) without
+	 * palloc() fragmentation and other overhead.
+	 *
+	 * For the batch memory, we use one large allocation, divided into
+	 * MERGETUPLEBUFFER_SIZE chunks. The allocation is sized to hold
+	 * one chunk per tape, plus one additional chunk. We need that many
+	 * chunks to hold all the tuples kept in the heap during merge, plus
+	 * the one we have last returned from the sort.
+	 *
+	 * Initially, all the chunks are kept in a linked list, in freeBufferHead.
+	 * When a tuple is read from a tape, it is put to the next available
+	 * chunk, if it fits. If the tuple is larger than MERGETUPLEBUFFER_SIZE,
+	 * it is palloc'd instead.
+	 *
+	 * When we're done processing a tuple, we return the chunk back to the
+	 * free list, or pfree() if it was palloc'd. We know that a tuple was
+	 * allocated from the batch memory arena, if its pointer value is between
+	 * batchMemoryBegin and -End.
 	 */
 	bool		batchUsed;
 
+	char	   *batchMemoryBegin;	/* beginning of batch memory arena */
+	char	   *batchMemoryEnd;		/* end of batch memory arena */
+	MergeTupleBuffer *freeBufferHead;	/* head of free list */
+
+	/*
+	 * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
+	 * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE modes),
+	 * we remember the tuple in 'lastReturnedTuple', so that we can recycle the
+	 * memory on next gettuple call.
+	 */
+	void	   *lastReturnedTuple;
+
 	/*
 	 * While building initial runs, this indicates if the replacement
 	 * selection strategy is in use.  When it isn't, then a simple hybrid
@@ -358,42 +386,11 @@ struct Tuplesortstate
 	 */
 
 	/*
-	 * These variables are only used during merge passes.  mergeactive[i] is
+	 * This variable is only used during merge passes.  mergeactive[i] is
 	 * true if we are reading an input run from (actual) tape number i and
-	 * have not yet exhausted that run.  mergenext[i] is the memtuples index
-	 * of the next pre-read tuple (next to be loaded into the heap) for tape
-	 * i, or 0 if we are out of pre-read tuples.  mergelast[i] similarly
-	 * points to the last pre-read tuple from each tape.  mergeavailslots[i]
-	 * is the number of unused memtuples[] slots reserved for tape i, and
-	 * mergeavailmem[i] is the amount of unused space allocated for tape i.
-	 * mergefreelist and mergefirstfree keep track of unused locations in the
-	 * memtuples[] array.  The memtuples[].tupindex fields link together
-	 * pre-read tuples for each tape as well as recycled locations in
-	 * mergefreelist. It is OK to use 0 as a null link in these lists, because
-	 * memtuples[0] is part of the merge heap and is never a pre-read tuple.
+	 * have not yet exhausted that run.
 	 */
 	bool	   *mergeactive;	/* active input run source? */
-	int		   *mergenext;		/* first preread tuple for each source */
-	int		   *mergelast;		/* last preread tuple for each source */
-	int		   *mergeavailslots;	/* slots left for prereading each tape */
-	int64	   *mergeavailmem;	/* availMem for prereading each tape */
-	int			mergefreelist;	/* head of freelist of recycled slots */
-	int			mergefirstfree; /* first slot never used in this merge */
-
-	/*
-	 * Per-tape batch state, when final on-the-fly merge consumes memory from
-	 * just a few large allocations.
-	 *
-	 * Aside from the general benefits of performing fewer individual retail
-	 * palloc() calls, this also helps make merging more cache efficient,
-	 * since each tape's tuples must naturally be accessed sequentially (in
-	 * sorted order).
-	 */
-	int64		spacePerTape;	/* Space (memory) for tuples (not slots) */
-	char	  **mergetuples;	/* Each tape's memory allocation */
-	char	  **mergecurrent;	/* Current offset into each tape's memory */
-	char	  **mergetail;		/* Last item's start point for each tape */
-	char	  **mergeoverflow;	/* Retail palloc() "overflow" for each tape */
 
 	/*
 	 * Variables for Algorithm D.  Note that destTape is a "logical" tape
@@ -481,11 +478,33 @@ struct Tuplesortstate
 #endif
 };
 
+/*
+ * Is the given tuple allocated from the batch memory arena?
+ */
+#define IS_MERGETUPLE_BUFFER(state, tuple) \
+	((char *) tuple >= state->batchMemoryBegin && \
+	 (char *) tuple < state->batchMemoryEnd)
+
+/*
+ * Return the given tuple to the batch memory free list, or free it
+ * if it was palloc'd.
+ */
+#define RELEASE_MERGETUPLE_BUFFER(state, tuple) \
+	do { \
+		MergeTupleBuffer *buf = (MergeTupleBuffer *) tuple; \
+		\
+		if (IS_MERGETUPLE_BUFFER(state, tuple)) \
+		{ \
+			buf->nextfree = state->freeBufferHead; \
+			state->freeBufferHead = buf; \
+		} else \
+			pfree(tuple); \
+	} while(0)
+
 #define COMPARETUP(state,a,b)	((*(state)->comparetup) (a, b, state))
 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
 #define WRITETUP(state,tape,stup)	((*(state)->writetup) (state, tape, stup))
 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
-#define MOVETUP(dest,src,len) ((*(state)->movetup) (dest, src, len))
 #define LACKMEM(state)		((state)->availMem < 0 && !(state)->batchUsed)
 #define USEMEM(state,amt)	((state)->availMem -= (amt))
 #define FREEMEM(state,amt)	((state)->availMem += (amt))
@@ -553,16 +572,8 @@ static void inittapes(Tuplesortstate *state);
 static void selectnewtape(Tuplesortstate *state);
 static void mergeruns(Tuplesortstate *state);
 static void mergeonerun(Tuplesortstate *state);
-static void beginmerge(Tuplesortstate *state, bool finalMergeBatch);
-static void batchmemtuples(Tuplesortstate *state);
-static void mergebatch(Tuplesortstate *state, int64 spacePerTape);
-static void mergebatchone(Tuplesortstate *state, int srcTape,
-			  SortTuple *stup, bool *should_free);
-static void mergebatchfreetape(Tuplesortstate *state, int srcTape,
-				   SortTuple *rtup, bool *should_free);
-static void *mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen);
-static void mergepreread(Tuplesortstate *state);
-static void mergeprereadone(Tuplesortstate *state, int srcTape);
+static void beginmerge(Tuplesortstate *state);
+static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
 static void dumptuples(Tuplesortstate *state, bool alltuples);
 static void dumpbatch(Tuplesortstate *state, bool alltuples);
 static void make_bounded_heap(Tuplesortstate *state);
@@ -576,7 +587,7 @@ static void tuplesort_heap_delete_top(Tuplesortstate *state, bool checkIndex);
 static void reversedirection(Tuplesortstate *state);
 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
 static void markrunend(Tuplesortstate *state, int tapenum);
-static void *readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen);
+static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
 static int comparetup_heap(const SortTuple *a, const SortTuple *b,
 				Tuplesortstate *state);
 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
@@ -584,7 +595,6 @@ static void writetup_heap(Tuplesortstate *state, int tapenum,
 			  SortTuple *stup);
 static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
 			 int tapenum, unsigned int len);
-static void movetup_heap(void *dest, void *src, unsigned int len);
 static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
 				   Tuplesortstate *state);
 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
@@ -592,7 +602,6 @@ static void writetup_cluster(Tuplesortstate *state, int tapenum,
 				 SortTuple *stup);
 static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
 				int tapenum, unsigned int len);
-static void movetup_cluster(void *dest, void *src, unsigned int len);
 static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
 					   Tuplesortstate *state);
 static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
@@ -602,7 +611,6 @@ static void writetup_index(Tuplesortstate *state, int tapenum,
 			   SortTuple *stup);
 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
 			  int tapenum, unsigned int len);
-static void movetup_index(void *dest, void *src, unsigned int len);
 static int comparetup_datum(const SortTuple *a, const SortTuple *b,
 				 Tuplesortstate *state);
 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
@@ -610,7 +618,6 @@ static void writetup_datum(Tuplesortstate *state, int tapenum,
 			   SortTuple *stup);
 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
 			  int tapenum, unsigned int len);
-static void movetup_datum(void *dest, void *src, unsigned int len);
 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
 
 /*
@@ -662,10 +669,10 @@ tuplesort_begin_common(int workMem, bool randomAccess)
 	 * Caller tuple (e.g. IndexTuple) memory context.
 	 *
 	 * A dedicated child context used exclusively for caller passed tuples
-	 * eases memory management.  Resetting at key points reduces
-	 * fragmentation. Note that the memtuples array of SortTuples is allocated
-	 * in the parent context, not this context, because there is no need to
-	 * free memtuples early.
+	 * eases memory management.  Destroying it once we're done building
+	 * the initial runs reduces fragmentation.  Note that the memtuples array
+	 * of SortTuples is allocated in the parent context, not this context,
+	 * because there is no need to free memtuples early.
 	 */
 	tuplecontext = AllocSetContextCreate(sortcontext,
 										 "Caller tuples",
@@ -762,7 +769,6 @@ tuplesort_begin_heap(TupleDesc tupDesc,
 	state->copytup = copytup_heap;
 	state->writetup = writetup_heap;
 	state->readtup = readtup_heap;
-	state->movetup = movetup_heap;
 
 	state->tupDesc = tupDesc;	/* assume we need not copy tupDesc */
 	state->abbrevNext = 10;
@@ -835,7 +841,6 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
 	state->copytup = copytup_cluster;
 	state->writetup = writetup_cluster;
 	state->readtup = readtup_cluster;
-	state->movetup = movetup_cluster;
 	state->abbrevNext = 10;
 
 	state->indexInfo = BuildIndexInfo(indexRel);
@@ -927,7 +932,6 @@ tuplesort_begin_index_btree(Relation heapRel,
 	state->copytup = copytup_index;
 	state->writetup = writetup_index;
 	state->readtup = readtup_index;
-	state->movetup = movetup_index;
 	state->abbrevNext = 10;
 
 	state->heapRel = heapRel;
@@ -995,7 +999,6 @@ tuplesort_begin_index_hash(Relation heapRel,
 	state->copytup = copytup_index;
 	state->writetup = writetup_index;
 	state->readtup = readtup_index;
-	state->movetup = movetup_index;
 
 	state->heapRel = heapRel;
 	state->indexRel = indexRel;
@@ -1038,7 +1041,6 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
 	state->copytup = copytup_datum;
 	state->writetup = writetup_datum;
 	state->readtup = readtup_datum;
-	state->movetup = movetup_datum;
 	state->abbrevNext = 10;
 
 	state->datumType = datumType;
@@ -1884,14 +1886,33 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 		case TSS_SORTEDONTAPE:
 			Assert(forward || state->randomAccess);
 			Assert(!state->batchUsed);
-			*should_free = true;
+
+			/*
+			 * The buffer holding the tuple that we returned in previous
+			 * gettuple call can now be reused.
+			 */
+			if (state->lastReturnedTuple)
+			{
+				RELEASE_MERGETUPLE_BUFFER(state, state->lastReturnedTuple);
+				state->lastReturnedTuple = NULL;
+			}
+
 			if (forward)
 			{
 				if (state->eof_reached)
 					return false;
+
 				if ((tuplen = getlen(state, state->result_tape, true)) != 0)
 				{
 					READTUP(state, stup, state->result_tape, tuplen);
+
+					/*
+					 * Remember the tuple we return, so that we can recycle its
+					 * memory on next call. (This can be NULL, in the Datum case).
+					 */
+					state->lastReturnedTuple = stup->tuple;
+
+					*should_free = false;
 					return true;
 				}
 				else
@@ -1965,74 +1986,70 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 									  tuplen))
 				elog(ERROR, "bogus tuple length in backward scan");
 			READTUP(state, stup, state->result_tape, tuplen);
+
+			/*
+			 * Remember the tuple we return, so that we can recycle its
+			 * memory on next call. (This can be NULL, in the Datum case).
+			 */
+			state->lastReturnedTuple = stup->tuple;
+
+			*should_free = false;
 			return true;
 
 		case TSS_FINALMERGE:
 			Assert(forward);
 			Assert(state->batchUsed || !state->tuples);
-			/* For now, assume tuple is stored in tape's batch memory */
+			/* We are managing memory ourselves, with the batch memory arena. */
 			*should_free = false;
 
 			/*
+			 * The buffer holding the tuple that we returned in previous
+			 * gettuple call can now be reused.
+			 */
+			if (state->lastReturnedTuple)
+			{
+				RELEASE_MERGETUPLE_BUFFER(state, state->lastReturnedTuple);
+				state->lastReturnedTuple = NULL;
+			}
+
+			/*
 			 * This code should match the inner loop of mergeonerun().
 			 */
 			if (state->memtupcount > 0)
 			{
 				int			srcTape = state->memtuples[0].tupindex;
-				int			tupIndex;
-				SortTuple  *newtup;
+				SortTuple	newtup;
+
+				*stup = state->memtuples[0];
 
 				/*
-				 * Returned tuple is still counted in our memory space most of
-				 * the time.  See mergebatchone() for discussion of why caller
-				 * may occasionally be required to free returned tuple, and
-				 * how preread memory is managed with regard to edge cases
-				 * more generally.
+				 * Remember the tuple we return, so that we can recycle its
+				 * memory on next call. (This can be NULL, in the Datum case).
 				 */
-				*stup = state->memtuples[0];
-				if ((tupIndex = state->mergenext[srcTape]) == 0)
+				state->lastReturnedTuple = stup->tuple;
+
+				/*
+				 * Pull next tuple from tape, and replace the returned tuple
+				 * at top of the heap with it.
+				 */
+				if (!mergereadnext(state, srcTape, &newtup))
 				{
 					/*
-					 * out of preloaded data on this tape, try to read more
-					 *
-					 * Unlike mergeonerun(), we only preload from the single
-					 * tape that's run dry, though not before preparing its
-					 * batch memory for a new round of sequential consumption.
-					 * See mergepreread() comments.
+					 * If no more data, we've reached end of run on this tape.
+					 * Remove the top node from the heap.
 					 */
-					if (state->batchUsed)
-						mergebatchone(state, srcTape, stup, should_free);
-
-					mergeprereadone(state, srcTape);
+					tuplesort_heap_delete_top(state, false);
 
 					/*
-					 * if still no data, we've reached end of run on this tape
+					 * Rewind to free the read buffer.  It'd go away at the
+					 * end of the sort anyway, but better to release the
+					 * memory early.
 					 */
-					if ((tupIndex = state->mergenext[srcTape]) == 0)
-					{
-						/* Remove the top node from the heap */
-						tuplesort_heap_delete_top(state, false);
-						/* Free tape's buffer, avoiding dangling pointer */
-						if (state->batchUsed)
-							mergebatchfreetape(state, srcTape, stup, should_free);
-						return true;
-					}
+					LogicalTapeRewind(state->tapeset, srcTape, true);
+					return true;
 				}
-
-				/*
-				 * pull next preread tuple from list, and replace the returned
-				 * tuple at top of the heap with it.
-				 */
-				newtup = &state->memtuples[tupIndex];
-				state->mergenext[srcTape] = newtup->tupindex;
-				if (state->mergenext[srcTape] == 0)
-					state->mergelast[srcTape] = 0;
-				newtup->tupindex = srcTape;
-				tuplesort_heap_replace_top(state, newtup, false);
-				/* put the now-unused memtuples entry on the freelist */
-				newtup->tupindex = state->mergefreelist;
-				state->mergefreelist = tupIndex;
-				state->mergeavailslots[srcTape]++;
+				newtup.tupindex = srcTape;
+				tuplesort_heap_replace_top(state, &newtup, false);
 				return true;
 			}
 			return false;
@@ -2317,13 +2334,6 @@ inittapes(Tuplesortstate *state)
 	/* Compute number of tapes to use: merge order plus 1 */
 	maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
 
-	/*
-	 * We must have at least 2*maxTapes slots in the memtuples[] array, else
-	 * we'd not have room for merge heap plus preread.  It seems unlikely that
-	 * this case would ever occur, but be safe.
-	 */
-	maxTapes = Min(maxTapes, state->memtupsize / 2);
-
 	state->maxTapes = maxTapes;
 	state->tapeRange = maxTapes - 1;
 
@@ -2334,13 +2344,13 @@ inittapes(Tuplesortstate *state)
 #endif
 
 	/*
-	 * Decrease availMem to reflect the space needed for tape buffers; but
-	 * don't decrease it to the point that we have no room for tuples. (That
-	 * case is only likely to occur if sorting pass-by-value Datums; in all
-	 * other scenarios the memtuples[] array is unlikely to occupy more than
-	 * half of allowedMem.  In the pass-by-value case it's not important to
-	 * account for tuple space, so we don't care if LACKMEM becomes
-	 * inaccurate.)
+	 * Decrease availMem to reflect the space needed for tape buffers, when
+	 * writing the initial runs; but don't decrease it to the point that we
+	 * have no room for tuples. (That case is only likely to occur if sorting
+	 * pass-by-value Datums; in all other scenarios the memtuples[] array is
+	 * unlikely to occupy more than half of allowedMem.  In the pass-by-value
+	 * case it's not important to account for tuple space, so we don't care
+	 * if LACKMEM becomes inaccurate.)
 	 */
 	tapeSpace = (int64) maxTapes *TAPE_BUFFER_OVERHEAD;
 
@@ -2359,14 +2369,6 @@ inittapes(Tuplesortstate *state)
 	state->tapeset = LogicalTapeSetCreate(maxTapes);
 
 	state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
-	state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
-	state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
-	state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
-	state->mergeavailmem = (int64 *) palloc0(maxTapes * sizeof(int64));
-	state->mergetuples = (char **) palloc0(maxTapes * sizeof(char *));
-	state->mergecurrent = (char **) palloc0(maxTapes * sizeof(char *));
-	state->mergetail = (char **) palloc0(maxTapes * sizeof(char *));
-	state->mergeoverflow = (char **) palloc0(maxTapes * sizeof(char *));
 	state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
 	state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
 	state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
@@ -2478,6 +2480,12 @@ mergeruns(Tuplesortstate *state)
 				svTape,
 				svRuns,
 				svDummy;
+	char	   *p;
+	int			i;
+	int64		availBlocks;
+	int64		usedBlocks;
+	int64		blocksPerTape;
+	int			remainder;
 
 	Assert(state->status == TSS_BUILDRUNS);
 	Assert(state->memtupcount == 0);
@@ -2499,6 +2507,67 @@ mergeruns(Tuplesortstate *state)
 	}
 
 	/*
+	 * Reset tuple memory.  We've freed all the tuples that we previously
+	 * allocated.  We will use the batch memory arena from now on.
+	 */
+	MemoryContextDelete(state->tuplecontext);
+	state->tuplecontext = NULL;
+
+	/*
+	 * We no longer need a large memtuples array, only one slot per tape.
+	 * Shrink it, to make the memory available for other use. We only need one
+	 * slot per tape.
+	 */
+	FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
+	pfree(state->memtuples);
+
+	/*
+	 * If we had fewer runs than tapes, forget about the unused tapes.  We
+	 * decrease maxTapes and tapeRange to reflect the actual number of tapes
+	 * used, and refund buffers for tapes that were never allocated.  (We don't
+	 * try to shrink the various arrays that were allocated according to old
+	 * maxTapes).
+	 */
+	if (state->Level == 1)
+	{
+		int			numTapes;
+
+		numTapes = state->currentRun + 1;
+		FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
+
+		state->maxTapes = numTapes;
+		state->tapeRange = numTapes - 1;
+	}
+
+	/*
+	 * Allocate a new 'memtuples' array, for the heap.
+	 */
+	state->memtupsize = state->maxTapes;
+	state->memtuples = (SortTuple *) palloc(state->maxTapes * sizeof(SortTuple));
+	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
+
+	/*
+	 * Initialize the batch memory arena.  We need one tuple buffer per tape,
+	 * for tuples the heap, plus one to hold the tuple last returned from
+	 * tuplesort_gettuple.
+	 */
+	state->batchMemoryBegin = palloc((state->maxTapes + 1) *
+									 MERGETUPLEBUFFER_SIZE);
+	state->batchMemoryEnd = state->batchMemoryBegin +
+		(state->maxTapes + 1) * MERGETUPLEBUFFER_SIZE;
+	state->freeBufferHead = (MergeTupleBuffer *) state->batchMemoryBegin;
+	USEMEM(state, (state->maxTapes + 1) * MERGETUPLEBUFFER_SIZE);
+
+	p = state->batchMemoryBegin;
+	for (i = 0; i < state->maxTapes; i++)
+	{
+		((MergeTupleBuffer *) p)->nextfree =
+			(MergeTupleBuffer *) (p + MERGETUPLEBUFFER_SIZE);
+		p += MERGETUPLEBUFFER_SIZE;
+	}
+	((MergeTupleBuffer *) p)->nextfree = NULL;
+
+	/*
 	 * If we produced only one initial run (quite likely if the total data
 	 * volume is between 1X and 2X workMem when replacement selection is used,
 	 * but something we particular count on when input is presorted), we can
@@ -2514,6 +2583,56 @@ mergeruns(Tuplesortstate *state)
 		return;
 	}
 
+	/*
+	 * Use all the spare memory we have available for read buffers. Divide it
+	 * evenly among all the tapes.
+	 *
+	 * We use the number of *input* tapes (tapeRange) here, rather than maxTapes,
+	 * for the calculation.  At all times, we'll be reading from at most tapeRange
+	 * tapes, and one tape is used for output.  But we call
+	 * LogicalTapeAssignReadBufferSize() for every tape, so that when the tape
+	 * is read from, it will use a properly-sized buffer.
+	 */
+	availBlocks = state->availMem / BLCKSZ;
+	blocksPerTape = availBlocks / state->tapeRange;
+	remainder = availBlocks % state->tapeRange;
+
+	/*
+	 * Use one page per tape, even if we are out of memory. tuplesort_merge_order()
+	 * should've chosen the number of tapes so that this can't happen, but better
+	 * safe than sorry.  (This also protects from a negative availMem.)
+	 */
+	if (blocksPerTape < 1)
+	{
+		blocksPerTape = 1;
+		remainder = 0;
+	}
+
+	usedBlocks = 0;
+	for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
+	{
+		int64		numBlocks = blocksPerTape + (tapenum < remainder ? 1 : 0);
+
+		if (numBlocks > MaxAllocSize / BLCKSZ)
+			numBlocks = MaxAllocSize / BLCKSZ;
+		LogicalTapeAssignReadBufferSize(state->tapeset, tapenum,
+										numBlocks * BLCKSZ);
+		usedBlocks += numBlocks;
+	}
+	USEMEM(state, usedBlocks * BLCKSZ);
+
+#ifdef TRACE_SORT
+	if (trace_sort)
+		elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d tapes",
+			 (long) (usedBlocks * BLCKSZ) / 1024, state->maxTapes);
+#endif
+
+	/*
+	 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism to
+	 * track memory usage of individual tuples.
+	 */
+	state->batchUsed = true;
+
 	/* End of step D2: rewind all output tapes to prepare for merging */
 	for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
 		LogicalTapeRewind(state->tapeset, tapenum, false);
@@ -2544,7 +2663,7 @@ mergeruns(Tuplesortstate *state)
 				/* Tell logtape.c we won't be writing anymore */
 				LogicalTapeSetForgetFreeSpace(state->tapeset);
 				/* Initialize for the final merge pass */
-				beginmerge(state, state->tuples);
+				beginmerge(state);
 				state->status = TSS_FINALMERGE;
 				return;
 			}
@@ -2614,6 +2733,14 @@ mergeruns(Tuplesortstate *state)
 	state->result_tape = state->tp_tapenum[state->tapeRange];
 	LogicalTapeFreeze(state->tapeset, state->result_tape);
 	state->status = TSS_SORTEDONTAPE;
+
+	/* Release the read buffers on all the other tapes, by rewinding them. */
+	for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
+	{
+		if (tapenum == state->result_tape)
+			continue;
+		LogicalTapeRewind(state->tapeset, tapenum, true);
+	}
 }
 
 /*
@@ -2627,16 +2754,12 @@ mergeonerun(Tuplesortstate *state)
 {
 	int			destTape = state->tp_tapenum[state->tapeRange];
 	int			srcTape;
-	int			tupIndex;
-	SortTuple  *tup;
-	int64		priorAvail,
-				spaceFreed;
 
 	/*
 	 * Start the merge by loading one tuple from each active source tape into
 	 * the heap.  We can also decrease the input run/dummy run counts.
 	 */
-	beginmerge(state, false);
+	beginmerge(state);
 
 	/*
 	 * Execute merge by repeatedly extracting lowest tuple in heap, writing it
@@ -2645,52 +2768,31 @@ mergeonerun(Tuplesortstate *state)
 	 */
 	while (state->memtupcount > 0)
 	{
+		SortTuple stup;
+
 		/* write the tuple to destTape */
-		priorAvail = state->availMem;
 		srcTape = state->memtuples[0].tupindex;
 		WRITETUP(state, destTape, &state->memtuples[0]);
-		/* writetup adjusted total free space, now fix per-tape space */
-		spaceFreed = state->availMem - priorAvail;
-		state->mergeavailmem[srcTape] += spaceFreed;
-		if ((tupIndex = state->mergenext[srcTape]) == 0)
-		{
-			/* out of preloaded data on this tape, try to read more */
-			mergepreread(state);
-			/* if still no data, we've reached end of run on this tape */
-			if ((tupIndex = state->mergenext[srcTape]) == 0)
-			{
-				/* remove the written-out tuple from the heap */
-				tuplesort_heap_delete_top(state, false);
-				continue;
-			}
-		}
+
+		/* recycle the buffer of the tuple we just wrote out, for the next read */
+		RELEASE_MERGETUPLE_BUFFER(state, state->memtuples[0].tuple);
 
 		/*
 		 * pull next preread tuple from list, and replace the written-out
 		 * tuple in the heap with it.
 		 */
-		tup = &state->memtuples[tupIndex];
-		state->mergenext[srcTape] = tup->tupindex;
-		if (state->mergenext[srcTape] == 0)
-			state->mergelast[srcTape] = 0;
-		tup->tupindex = srcTape;
-		tuplesort_heap_replace_top(state, tup, false);
-		/* put the now-unused memtuples entry on the freelist */
-		tup->tupindex = state->mergefreelist;
-		state->mergefreelist = tupIndex;
-		state->mergeavailslots[srcTape]++;
+		if (!mergereadnext(state, srcTape, &stup))
+		{
+			/* we've reached end of run on this tape */
+			/* remove the written-out tuple from the heap */
+			tuplesort_heap_delete_top(state, false);
+			continue;
+		}
+		stup.tupindex = srcTape;
+		tuplesort_heap_replace_top(state, &stup, false);
 	}
 
 	/*
-	 * Reset tuple memory.  We've freed all of the tuples that we previously
-	 * allocated, but AllocSetFree will have put those chunks of memory on
-	 * particular free lists, bucketed by size class.  Thus, although all of
-	 * that memory is free, it is effectively fragmented.  Resetting the
-	 * context gets us out from under that problem.
-	 */
-	MemoryContextReset(state->tuplecontext);
-
-	/*
 	 * When the heap empties, we're done.  Write an end-of-run marker on the
 	 * output tape, and increment its count of real runs.
 	 */
@@ -2711,18 +2813,13 @@ mergeonerun(Tuplesortstate *state)
  * which tapes contain active input runs in mergeactive[].  Then, load
  * as many tuples as we can from each active input tape, and finally
  * fill the merge heap with the first tuple from each active tape.
- *
- * finalMergeBatch indicates if this is the beginning of a final on-the-fly
- * merge where a batched allocation of tuple memory is required.
  */
 static void
-beginmerge(Tuplesortstate *state, bool finalMergeBatch)
+beginmerge(Tuplesortstate *state)
 {
 	int			activeTapes;
 	int			tapenum;
 	int			srcTape;
-	int			slotsPerTape;
-	int64		spacePerTape;
 
 	/* Heap should be empty here */
 	Assert(state->memtupcount == 0);
@@ -2746,517 +2843,47 @@ beginmerge(Tuplesortstate *state, bool finalMergeBatch)
 	}
 	state->activeTapes = activeTapes;
 
-	/* Clear merge-pass state variables */
-	memset(state->mergenext, 0,
-		   state->maxTapes * sizeof(*state->mergenext));
-	memset(state->mergelast, 0,
-		   state->maxTapes * sizeof(*state->mergelast));
-	state->mergefreelist = 0;	/* nothing in the freelist */
-	state->mergefirstfree = activeTapes;		/* 1st slot avail for preread */
-
-	if (finalMergeBatch)
-	{
-		/* Free outright buffers for tape never actually allocated */
-		FREEMEM(state, (state->maxTapes - activeTapes) * TAPE_BUFFER_OVERHEAD);
-
-		/*
-		 * Grow memtuples one last time, since the palloc() overhead no longer
-		 * incurred can make a big difference
-		 */
-		batchmemtuples(state);
-	}
-
 	/*
 	 * Initialize space allocation to let each active input tape have an equal
 	 * share of preread space.
 	 */
 	Assert(activeTapes > 0);
-	slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
-	Assert(slotsPerTape > 0);
-	spacePerTape = MAXALIGN_DOWN(state->availMem / activeTapes);
-	for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
-	{
-		if (state->mergeactive[srcTape])
-		{
-			state->mergeavailslots[srcTape] = slotsPerTape;
-			state->mergeavailmem[srcTape] = spacePerTape;
-		}
-	}
-
-	/*
-	 * Preallocate tuple batch memory for each tape.  This is the memory used
-	 * for tuples themselves (not SortTuples), so it's never used by
-	 * pass-by-value datum sorts.  Memory allocation is performed here at most
-	 * once per sort, just in advance of the final on-the-fly merge step.
-	 */
-	if (finalMergeBatch)
-		mergebatch(state, spacePerTape);
-
-	/*
-	 * Preread as many tuples as possible (and at least one) from each active
-	 * tape
-	 */
-	mergepreread(state);
 
 	/* Load the merge heap with the first tuple from each input tape */
 	for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
 	{
-		int			tupIndex = state->mergenext[srcTape];
-		SortTuple  *tup;
-
-		if (tupIndex)
-		{
-			tup = &state->memtuples[tupIndex];
-			state->mergenext[srcTape] = tup->tupindex;
-			if (state->mergenext[srcTape] == 0)
-				state->mergelast[srcTape] = 0;
-			tup->tupindex = srcTape;
-			tuplesort_heap_insert(state, tup, false);
-			/* put the now-unused memtuples entry on the freelist */
-			tup->tupindex = state->mergefreelist;
-			state->mergefreelist = tupIndex;
-			state->mergeavailslots[srcTape]++;
-
-#ifdef TRACE_SORT
-			if (trace_sort && finalMergeBatch)
-			{
-				int64		perTapeKB = (spacePerTape + 1023) / 1024;
-				int64		usedSpaceKB;
-				int			usedSlots;
-
-				/*
-				 * Report how effective batchmemtuples() was in balancing the
-				 * number of slots against the need for memory for the
-				 * underlying tuples (e.g. IndexTuples).  The big preread of
-				 * all tapes when switching to FINALMERGE state should be
-				 * fairly representative of memory utilization during the
-				 * final merge step, and in any case is the only point at
-				 * which all tapes are guaranteed to have depleted either
-				 * their batch memory allowance or slot allowance.  Ideally,
-				 * both will be completely depleted for every tape by now.
-				 */
-				usedSpaceKB = (state->mergecurrent[srcTape] -
-							   state->mergetuples[srcTape] + 1023) / 1024;
-				usedSlots = slotsPerTape - state->mergeavailslots[srcTape];
-
-				elog(LOG, "tape %d initially used " INT64_FORMAT " KB of "
-					 INT64_FORMAT " KB batch (%2.3f) and %d out of %d slots "
-					 "(%2.3f)", srcTape,
-					 usedSpaceKB, perTapeKB,
-					 (double) usedSpaceKB / (double) perTapeKB,
-					 usedSlots, slotsPerTape,
-					 (double) usedSlots / (double) slotsPerTape);
-			}
-#endif
-		}
-	}
-}
-
-/*
- * batchmemtuples - grow memtuples without palloc overhead
- *
- * When called, availMem should be approximately the amount of memory we'd
- * require to allocate memtupsize - memtupcount tuples (not SortTuples/slots)
- * that were allocated with palloc() overhead, and in doing so use up all
- * allocated slots.  However, though slots and tuple memory is in balance
- * following the last grow_memtuples() call, that's predicated on the observed
- * average tuple size for the "final" grow_memtuples() call, which includes
- * palloc overhead.  During the final merge pass, where we will arrange to
- * squeeze out the palloc overhead, we might need more slots in the memtuples
- * array.
- *
- * To make that happen, arrange for the amount of remaining memory to be
- * exactly equal to the palloc overhead multiplied by the current size of
- * the memtuples array, force the grow_memtuples flag back to true (it's
- * probably but not necessarily false on entry to this routine), and then
- * call grow_memtuples.  This simulates loading enough tuples to fill the
- * whole memtuples array and then having some space left over because of the
- * elided palloc overhead.  We expect that grow_memtuples() will conclude that
- * it can't double the size of the memtuples array but that it can increase
- * it by some percentage; but if it does decide to double it, that just means
- * that we've never managed to use many slots in the memtuples array, in which
- * case doubling it shouldn't hurt anything anyway.
- */
-static void
-batchmemtuples(Tuplesortstate *state)
-{
-	int64		refund;
-	int64		availMemLessRefund;
-	int			memtupsize = state->memtupsize;
-
-	/* Caller error if we have no tapes */
-	Assert(state->activeTapes > 0);
-
-	/* For simplicity, assume no memtuples are actually currently counted */
-	Assert(state->memtupcount == 0);
-
-	/*
-	 * Refund STANDARDCHUNKHEADERSIZE per tuple.
-	 *
-	 * This sometimes fails to make memory use perfectly balanced, but it
-	 * should never make the situation worse.  Note that Assert-enabled builds
-	 * get a larger refund, due to a varying STANDARDCHUNKHEADERSIZE.
-	 */
-	refund = memtupsize * STANDARDCHUNKHEADERSIZE;
-	availMemLessRefund = state->availMem - refund;
-
-	/*
-	 * We need to be sure that we do not cause LACKMEM to become true, else
-	 * the batch allocation size could be calculated as negative, causing
-	 * havoc.  Hence, if availMemLessRefund is negative at this point, we must
-	 * do nothing.  Moreover, if it's positive but rather small, there's
-	 * little point in proceeding because we could only increase memtuples by
-	 * a small amount, not worth the cost of the repalloc's.  We somewhat
-	 * arbitrarily set the threshold at ALLOCSET_DEFAULT_INITSIZE per tape.
-	 * (Note that this does not represent any assumption about tuple sizes.)
-	 */
-	if (availMemLessRefund <=
-		(int64) state->activeTapes * ALLOCSET_DEFAULT_INITSIZE)
-		return;
-
-	/*
-	 * To establish balanced memory use after refunding palloc overhead,
-	 * temporarily have our accounting indicate that we've allocated all
-	 * memory we're allowed to less that refund, and call grow_memtuples() to
-	 * have it increase the number of slots.
-	 */
-	state->growmemtuples = true;
-	USEMEM(state, availMemLessRefund);
-	(void) grow_memtuples(state);
-	state->growmemtuples = false;
-	/* availMem must stay accurate for spacePerTape calculation */
-	FREEMEM(state, availMemLessRefund);
-	if (LACKMEM(state))
-		elog(ERROR, "unexpected out-of-memory situation in tuplesort");
-
-#ifdef TRACE_SORT
-	if (trace_sort)
-	{
-		Size		OldKb = (memtupsize * sizeof(SortTuple) + 1023) / 1024;
-		Size		NewKb = (state->memtupsize * sizeof(SortTuple) + 1023) / 1024;
-
-		elog(LOG, "grew memtuples %1.2fx from %d (%zu KB) to %d (%zu KB) for final merge",
-			 (double) NewKb / (double) OldKb,
-			 memtupsize, OldKb,
-			 state->memtupsize, NewKb);
-	}
-#endif
-}
-
-/*
- * mergebatch - initialize tuple memory in batch
- *
- * This allows sequential access to sorted tuples buffered in memory from
- * tapes/runs on disk during a final on-the-fly merge step.  Note that the
- * memory is not used for SortTuples, but for the underlying tuples (e.g.
- * MinimalTuples).
- *
- * Note that when batch memory is used, there is a simple division of space
- * into large buffers (one per active tape).  The conventional incremental
- * memory accounting (calling USEMEM() and FREEMEM()) is abandoned.  Instead,
- * when each tape's memory budget is exceeded, a retail palloc() "overflow" is
- * performed, which is then immediately detected in a way that is analogous to
- * LACKMEM().  This keeps each tape's use of memory fair, which is always a
- * goal.
- */
-static void
-mergebatch(Tuplesortstate *state, int64 spacePerTape)
-{
-	int			srcTape;
-
-	Assert(state->activeTapes > 0);
-	Assert(state->tuples);
-
-	/*
-	 * For the purposes of tuplesort's memory accounting, the batch allocation
-	 * is special, and regular memory accounting through USEMEM() calls is
-	 * abandoned (see mergeprereadone()).
-	 */
-	for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
-	{
-		char	   *mergetuples;
-
-		if (!state->mergeactive[srcTape])
-			continue;
-
-		/* Allocate buffer for each active tape */
-		mergetuples = MemoryContextAllocHuge(state->tuplecontext,
-											 spacePerTape);
-
-		/* Initialize state for tape */
-		state->mergetuples[srcTape] = mergetuples;
-		state->mergecurrent[srcTape] = mergetuples;
-		state->mergetail[srcTape] = mergetuples;
-		state->mergeoverflow[srcTape] = NULL;
-	}
+		SortTuple	tup;
 
-	state->batchUsed = true;
-	state->spacePerTape = spacePerTape;
-}
-
-/*
- * mergebatchone - prepare batch memory for one merge input tape
- *
- * This is called following the exhaustion of preread tuples for one input
- * tape.  All that actually occurs is that the state for the source tape is
- * reset to indicate that all memory may be reused.
- *
- * This routine must deal with fixing up the tuple that is about to be returned
- * to the client, due to "overflow" allocations.
- */
-static void
-mergebatchone(Tuplesortstate *state, int srcTape, SortTuple *rtup,
-			  bool *should_free)
-{
-	Assert(state->batchUsed);
-
-	/*
-	 * Tuple about to be returned to caller ("stup") is final preread tuple
-	 * from tape, just removed from the top of the heap.  Special steps around
-	 * memory management must be performed for that tuple, to make sure it
-	 * isn't overwritten early.
-	 */
-	if (!state->mergeoverflow[srcTape])
-	{
-		Size		tupLen;
-
-		/*
-		 * Mark tuple buffer range for reuse, but be careful to move final,
-		 * tail tuple to start of space for next run so that it's available to
-		 * caller when stup is returned, and remains available at least until
-		 * the next tuple is requested.
-		 */
-		tupLen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
-		state->mergecurrent[srcTape] = state->mergetuples[srcTape];
-		MOVETUP(state->mergecurrent[srcTape], state->mergetail[srcTape],
-				tupLen);
-
-		/* Make SortTuple at top of the merge heap point to new tuple */
-		rtup->tuple = (void *) state->mergecurrent[srcTape];
-
-		state->mergetail[srcTape] = state->mergecurrent[srcTape];
-		state->mergecurrent[srcTape] += tupLen;
-	}
-	else
-	{
-		/*
-		 * Handle an "overflow" retail palloc.
-		 *
-		 * This is needed when we run out of tuple memory for the tape.
-		 */
-		state->mergecurrent[srcTape] = state->mergetuples[srcTape];
-		state->mergetail[srcTape] = state->mergetuples[srcTape];
-
-		if (rtup->tuple)
+		if (mergereadnext(state, srcTape, &tup))
 		{
-			Assert(rtup->tuple == (void *) state->mergeoverflow[srcTape]);
-			/* Caller should free palloc'd tuple */
-			*should_free = true;
+			tup.tupindex = srcTape;
+			tuplesort_heap_insert(state, &tup, false);
 		}
-		state->mergeoverflow[srcTape] = NULL;
-	}
-}
-
-/*
- * mergebatchfreetape - handle final clean-up for batch memory once tape is
- * about to become exhausted
- *
- * All tuples are returned from tape, but a single final tuple, *rtup, is to be
- * passed back to caller.  Free tape's batch allocation buffer while ensuring
- * that the final tuple is managed appropriately.
- */
-static void
-mergebatchfreetape(Tuplesortstate *state, int srcTape, SortTuple *rtup,
-				   bool *should_free)
-{
-	Assert(state->batchUsed);
-	Assert(state->status == TSS_FINALMERGE);
-
-	/*
-	 * Tuple may or may not already be an overflow allocation from
-	 * mergebatchone()
-	 */
-	if (!*should_free && rtup->tuple)
-	{
-		/*
-		 * Final tuple still in tape's batch allocation.
-		 *
-		 * Return palloc()'d copy to caller, and have it freed in a similar
-		 * manner to overflow allocation.  Otherwise, we'd free batch memory
-		 * and pass back a pointer to garbage.  Note that we deliberately
-		 * allocate this in the parent tuplesort context, to be on the safe
-		 * side.
-		 */
-		Size		tuplen;
-		void	   *oldTuple = rtup->tuple;
-
-		tuplen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
-		rtup->tuple = MemoryContextAlloc(state->sortcontext, tuplen);
-		MOVETUP(rtup->tuple, oldTuple, tuplen);
-		*should_free = true;
-	}
-
-	/* Free spacePerTape-sized buffer */
-	pfree(state->mergetuples[srcTape]);
-}
-
-/*
- * mergebatchalloc - allocate memory for one tuple using a batch memory
- * "logical allocation".
- *
- * This is used for the final on-the-fly merge phase only.  READTUP() routines
- * receive memory from here in place of palloc() and USEMEM() calls.
- *
- * Tuple tapenum is passed, ensuring each tape's tuples are stored in sorted,
- * contiguous order (while allowing safe reuse of memory made available to
- * each tape).  This maximizes locality of access as tuples are returned by
- * final merge.
- *
- * Caller must not subsequently attempt to free memory returned here.  In
- * general, only mergebatch* functions know about how memory returned from
- * here should be freed, and this function's caller must ensure that batch
- * memory management code will definitely have the opportunity to do the right
- * thing during the final on-the-fly merge.
- */
-static void *
-mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen)
-{
-	Size		reserve_tuplen = MAXALIGN(tuplen);
-	char	   *ret;
-
-	/* Should overflow at most once before mergebatchone() call: */
-	Assert(state->mergeoverflow[tapenum] == NULL);
-	Assert(state->batchUsed);
-
-	/* It should be possible to use precisely spacePerTape memory at once */
-	if (state->mergecurrent[tapenum] + reserve_tuplen <=
-		state->mergetuples[tapenum] + state->spacePerTape)
-	{
-		/*
-		 * Usual case -- caller is returned pointer into its tape's buffer,
-		 * and an offset from that point is recorded as where tape has
-		 * consumed up to for current round of preloading.
-		 */
-		ret = state->mergetail[tapenum] = state->mergecurrent[tapenum];
-		state->mergecurrent[tapenum] += reserve_tuplen;
 	}
-	else
-	{
-		/*
-		 * Allocate memory, and record as tape's overflow allocation.  This
-		 * will be detected quickly, in a similar fashion to a LACKMEM()
-		 * condition, and should not happen again before a new round of
-		 * preloading for caller's tape.  Note that we deliberately allocate
-		 * this in the parent tuplesort context, to be on the safe side.
-		 *
-		 * Sometimes, this does not happen because merging runs out of slots
-		 * before running out of memory.
-		 */
-		ret = state->mergeoverflow[tapenum] =
-			MemoryContextAlloc(state->sortcontext, tuplen);
-	}
-
-	return ret;
 }
 
 /*
- * mergepreread - load tuples from merge input tapes
- *
- * This routine exists to improve sequentiality of reads during a merge pass,
- * as explained in the header comments of this file.  Load tuples from each
- * active source tape until the tape's run is exhausted or it has used up
- * its fair share of available memory.  In any case, we guarantee that there
- * is at least one preread tuple available from each unexhausted input tape.
- *
- * We invoke this routine at the start of a merge pass for initial load,
- * and then whenever any tape's preread data runs out.  Note that we load
- * as much data as possible from all tapes, not just the one that ran out.
- * This is because logtape.c works best with a usage pattern that alternates
- * between reading a lot of data and writing a lot of data, so whenever we
- * are forced to read, we should fill working memory completely.
+ * mergereadnext - read next tuple from one merge input tape
  *
- * In FINALMERGE state, we *don't* use this routine, but instead just preread
- * from the single tape that ran dry.  There's no read/write alternation in
- * that state and so no point in scanning through all the tapes to fix one.
- * (Moreover, there may be quite a lot of inactive tapes in that state, since
- * we might have had many fewer runs than tapes.  In a regular tape-to-tape
- * merge we can expect most of the tapes to be active.  Plus, only
- * FINALMERGE state has to consider memory management for a batch
- * allocation.)
+ * Returns false on EOF.
  */
-static void
-mergepreread(Tuplesortstate *state)
-{
-	int			srcTape;
-
-	for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
-		mergeprereadone(state, srcTape);
-}
-
-/*
- * mergeprereadone - load tuples from one merge input tape
- *
- * Read tuples from the specified tape until it has used up its free memory
- * or array slots; but ensure that we have at least one tuple, if any are
- * to be had.
- */
-static void
-mergeprereadone(Tuplesortstate *state, int srcTape)
+static bool
+mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
 {
 	unsigned int tuplen;
-	SortTuple	stup;
-	int			tupIndex;
-	int64		priorAvail,
-				spaceUsed;
 
 	if (!state->mergeactive[srcTape])
-		return;					/* tape's run is already exhausted */
+		return false;					/* tape's run is already exhausted */
 
-	/*
-	 * Manage per-tape availMem.  Only actually matters when batch memory not
-	 * in use.
-	 */
-	priorAvail = state->availMem;
-	state->availMem = state->mergeavailmem[srcTape];
-
-	/*
-	 * When batch memory is used if final on-the-fly merge, only mergeoverflow
-	 * test is relevant; otherwise, only LACKMEM() test is relevant.
-	 */
-	while ((state->mergeavailslots[srcTape] > 0 &&
-			state->mergeoverflow[srcTape] == NULL && !LACKMEM(state)) ||
-		   state->mergenext[srcTape] == 0)
+	/* read next tuple, if any */
+	if ((tuplen = getlen(state, srcTape, true)) == 0)
 	{
-		/* read next tuple, if any */
-		if ((tuplen = getlen(state, srcTape, true)) == 0)
-		{
-			state->mergeactive[srcTape] = false;
-			break;
-		}
-		READTUP(state, &stup, srcTape, tuplen);
-		/* find a free slot in memtuples[] for it */
-		tupIndex = state->mergefreelist;
-		if (tupIndex)
-			state->mergefreelist = state->memtuples[tupIndex].tupindex;
-		else
-		{
-			tupIndex = state->mergefirstfree++;
-			Assert(tupIndex < state->memtupsize);
-		}
-		state->mergeavailslots[srcTape]--;
-		/* store tuple, append to list for its tape */
-		stup.tupindex = 0;
-		state->memtuples[tupIndex] = stup;
-		if (state->mergelast[srcTape])
-			state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
-		else
-			state->mergenext[srcTape] = tupIndex;
-		state->mergelast[srcTape] = tupIndex;
+		state->mergeactive[srcTape] = false;
+		return false;
 	}
-	/* update per-tape and global availmem counts */
-	spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
-	state->mergeavailmem[srcTape] = state->availMem;
-	state->availMem = priorAvail - spaceUsed;
+	READTUP(state, stup, srcTape, tuplen);
+
+	return true;
 }
 
 /*
@@ -3438,15 +3065,6 @@ dumpbatch(Tuplesortstate *state, bool alltuples)
 		state->memtupcount--;
 	}
 
-	/*
-	 * Reset tuple memory.  We've freed all of the tuples that we previously
-	 * allocated.  It's important to avoid fragmentation when there is a stark
-	 * change in allocation patterns due to the use of batch memory.
-	 * Fragmentation due to AllocSetFree's bucketing by size class might be
-	 * particularly bad if this step wasn't taken.
-	 */
-	MemoryContextReset(state->tuplecontext);
-
 	markrunend(state, state->tp_tapenum[state->destTape]);
 	state->tp_runs[state->destTape]++;
 	state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
@@ -3901,38 +3519,31 @@ markrunend(Tuplesortstate *state, int tapenum)
 }
 
 /*
- * Get memory for tuple from within READTUP() routine.  Allocate
- * memory and account for that, or consume from tape's batch
- * allocation.
+ * Get memory for tuple from within READTUP() routine.
  *
- * Memory returned here in the final on-the-fly merge case is recycled
- * from tape's batch allocation.  Otherwise, callers must pfree() or
- * reset tuple child memory context, and account for that with a
- * FREEMEM().  Currently, this only ever needs to happen in WRITETUP()
- * routines.
+ * We use next free buffer from the batch memory arena, or palloc() if
+ * the tuple is too large for that.
  */
 static void *
-readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen)
+readtup_alloc(Tuplesortstate *state, Size tuplen)
 {
-	if (state->batchUsed)
-	{
-		/*
-		 * No USEMEM() call, because during final on-the-fly merge accounting
-		 * is based on tape-private state. ("Overflow" allocations are
-		 * detected as an indication that a new round or preloading is
-		 * required. Preloading marks existing contents of tape's batch buffer
-		 * for reuse.)
-		 */
-		return mergebatchalloc(state, tapenum, tuplen);
-	}
+	MergeTupleBuffer *buf;
+
+	/*
+	 * We pre-allocate enough buffers in the arena that we should never run
+	 * out.
+	 */
+	Assert(state->freeBufferHead);
+
+	if (tuplen > MERGETUPLEBUFFER_SIZE || !state->freeBufferHead)
+		return MemoryContextAlloc(state->sortcontext, tuplen);
 	else
 	{
-		char	   *ret;
+		buf = state->freeBufferHead;
+		/* Reuse this buffer */
+		state->freeBufferHead = buf->nextfree;
 
-		/* Batch allocation yet to be performed */
-		ret = MemoryContextAlloc(state->tuplecontext, tuplen);
-		USEMEM(state, GetMemoryChunkSpace(ret));
-		return ret;
+		return buf;
 	}
 }
 
@@ -4101,8 +3712,11 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
 		LogicalTapeWrite(state->tapeset, tapenum,
 						 (void *) &tuplen, sizeof(tuplen));
 
-	FREEMEM(state, GetMemoryChunkSpace(tuple));
-	heap_free_minimal_tuple(tuple);
+	if (!state->batchUsed)
+	{
+		FREEMEM(state, GetMemoryChunkSpace(tuple));
+		heap_free_minimal_tuple(tuple);
+	}
 }
 
 static void
@@ -4111,7 +3725,7 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
 {
 	unsigned int tupbodylen = len - sizeof(int);
 	unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
-	MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tapenum, tuplen);
+	MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen);
 	char	   *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
 	HeapTupleData htup;
 
@@ -4132,12 +3746,6 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
 								&stup->isnull1);
 }
 
-static void
-movetup_heap(void *dest, void *src, unsigned int len)
-{
-	memmove(dest, src, len);
-}
-
 /*
  * Routines specialized for the CLUSTER case (HeapTuple data, with
  * comparisons per a btree index definition)
@@ -4344,8 +3952,11 @@ writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
 		LogicalTapeWrite(state->tapeset, tapenum,
 						 &tuplen, sizeof(tuplen));
 
-	FREEMEM(state, GetMemoryChunkSpace(tuple));
-	heap_freetuple(tuple);
+	if (!state->batchUsed)
+	{
+		FREEMEM(state, GetMemoryChunkSpace(tuple));
+		heap_freetuple(tuple);
+	}
 }
 
 static void
@@ -4354,7 +3965,6 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
 {
 	unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
 	HeapTuple	tuple = (HeapTuple) readtup_alloc(state,
-												  tapenum,
 												  t_len + HEAPTUPLESIZE);
 
 	/* Reconstruct the HeapTupleData header */
@@ -4379,19 +3989,6 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
 									&stup->isnull1);
 }
 
-static void
-movetup_cluster(void *dest, void *src, unsigned int len)
-{
-	HeapTuple	tuple;
-
-	memmove(dest, src, len);
-
-	/* Repoint the HeapTupleData header */
-	tuple = (HeapTuple) dest;
-	tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
-}
-
-
 /*
  * Routines specialized for IndexTuple case
  *
@@ -4659,8 +4256,11 @@ writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
 		LogicalTapeWrite(state->tapeset, tapenum,
 						 (void *) &tuplen, sizeof(tuplen));
 
-	FREEMEM(state, GetMemoryChunkSpace(tuple));
-	pfree(tuple);
+	if (!state->batchUsed)
+	{
+		FREEMEM(state, GetMemoryChunkSpace(tuple));
+		pfree(tuple);
+	}
 }
 
 static void
@@ -4668,7 +4268,7 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
 			  int tapenum, unsigned int len)
 {
 	unsigned int tuplen = len - sizeof(unsigned int);
-	IndexTuple	tuple = (IndexTuple) readtup_alloc(state, tapenum, tuplen);
+	IndexTuple	tuple = (IndexTuple) readtup_alloc(state, tuplen);
 
 	LogicalTapeReadExact(state->tapeset, tapenum,
 						 tuple, tuplen);
@@ -4683,12 +4283,6 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
 								 &stup->isnull1);
 }
 
-static void
-movetup_index(void *dest, void *src, unsigned int len)
-{
-	memmove(dest, src, len);
-}
-
 /*
  * Routines specialized for DatumTuple case
  */
@@ -4755,7 +4349,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
 		LogicalTapeWrite(state->tapeset, tapenum,
 						 (void *) &writtenlen, sizeof(writtenlen));
 
-	if (stup->tuple)
+	if (!state->batchUsed && stup->tuple)
 	{
 		FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
 		pfree(stup->tuple);
@@ -4785,7 +4379,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
 	}
 	else
 	{
-		void	   *raddr = readtup_alloc(state, tapenum, tuplen);
+		void	   *raddr = readtup_alloc(state, tuplen);
 
 		LogicalTapeReadExact(state->tapeset, tapenum,
 							 raddr, tuplen);
@@ -4799,12 +4393,6 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
 							 &tuplen, sizeof(tuplen));
 }
 
-static void
-movetup_datum(void *dest, void *src, unsigned int len)
-{
-	memmove(dest, src, len);
-}
-
 /*
  * Convenience routine to free a tuple previously loaded into sort memory
  */
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
index fa1e992..03d0a6f 100644
--- a/src/include/utils/logtape.h
+++ b/src/include/utils/logtape.h
@@ -39,6 +39,7 @@ extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
 				long blocknum, int offset);
 extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
 				long *blocknum, int *offset);
+extern void LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t bufsize);
 extern long LogicalTapeSetBlocks(LogicalTapeSet *lts);
 
 #endif   /* LOGTAPE_H */
-- 
2.9.3

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to