One of the patches in Peter Geoghegan's Parallel tuplesort patch set [1]
is to put a cap on the number of tapes to use for the sort. The reason
is that each tape consumes 3 * BLCKSZ worth of memory, for the buffers.
We decide the max number of tapes upfront, so if we decide that e.g. the
max number of tapes is 1000, we reserve 1000 * 3 * BLCKSZ bytes of
memory for the buffers, and that memory cannot then be used for the
quicksorts to build the initial runs, even if it turns out later that we
needed only a few tapes.
That amounts to about 8% of the available memory. That's quite wasteful.
Peter's approach of putting an arbitrary cap on the max number of tapes
works, but I find it a bit hackish. And you still waste that 8% with
smaller work_mem settings.
When we finish writing an initial run to a tape, we keep the tape
buffers around. That's the reason we need to reserve that memory. But
there's no fundamental reason we have to do that. As I suggested in [2],
we could flush the buffers to disk, and only keep in memory the location
of the last block we wrote. If we need to write another run to the tape,
we can reload the last incomplete block from the disk, and continue writing.
Reloading the last block, requires an extra I/O. That's OK. It's quite
rare to have a multi-pass sort these days, so it's a good bet that you
don't need to do it. And if you have a very small work_mem, so that you
need to do a multi-pass sort, having some more memory available for
building the initial runs is probably worth it, and there's also a good
chance that the block is still in the OS cache.
So, here are two patches to that end:
1. The first patch changes the way we store the logical tapes on disk.
Instead of using indirect blocks, HFS style, the blocks form a linked
list. Each block has a trailer, with the block numbers of the previous
and next block of the tape. That eliminates the indirect blocks, which
simplifies the code quite a bit, and makes it simpler to implement the
pause/resume functionality in the second patch. It also immediately
reduces the memory needed for the buffers, from 3 to 1 block per tape,
as we don't need to hold the indirect blocks in memory.
2. The second patch adds a new function LogicalTapePause(). A paused
tape can be be written to, but it doesn't have an in-memory buffer. The
last, incomplete block is written to disk, but if you call
LogicalTapeWrite(), a new buffer is allocated and the last block is read
back into memory. If you don't write to the tape anymore, and call
LogicalTapeRewind() after LogicalTapePause(), nothing needs to be done,
as the last block is already on disk.
In addition to saving a little bit of memory, I'd like to do this
refactoring because it simplifies the code. It's code that has stayed
largely unchanged for the past 15 years, so I'm not too eager to touch
it, but looking at the changes coming with Peter's parallel tuplesort
patch set, I think this makes sense. The parallel tuplesort patch set
adds code for "serializing" and "deserializing" a tape, which means
writing the top indirect block to disk, so that it can be read back in
in the leader process. That's awfully similar to the "pause/resume"
functionality here, so by adding it now, we won't have to do it in the
parallel tuplesort patch. (Admittedly, it's not a whole lot of code, but
still.)
[1] CAM3SWZQKM=Pzc=cahzrixkjp2eo5q0jg1sofqqexfq647ji...@mail.gmail.com
[2] e8f44b63-4745-b855-7772-e8201906a...@iki.fi
- Heikki
>From 5fd57472432ed0641aaa5552f0486e459d03021e Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Wed, 21 Sep 2016 17:20:57 +0300
Subject: [PATCH 1/2] Simplify tape block format.
No more indirect blocks. The blocks form a linked list instead.
This saves some memory, because we don't need to have a buffer in memory to
hold the indirect block (or blocks). To reflect that, TAPE_BUFFER_OVERHEAD
is reduced from 3 to 1 buffer, which allows using more memory for building
the initial runs.
---
src/backend/utils/sort/logtape.c | 552 +++++++++----------------------------
src/backend/utils/sort/tuplesort.c | 17 +-
src/include/utils/logtape.h | 2 +-
3 files changed, 139 insertions(+), 432 deletions(-)
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index caa6960..cc58408 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -31,15 +31,8 @@
* in BLCKSZ-size blocks. Space allocation boils down to keeping track
* of which blocks in the underlying file belong to which logical tape,
* plus any blocks that are free (recycled and not yet reused).
- * The blocks in each logical tape are remembered using a method borrowed
- * from the Unix HFS filesystem: we store data block numbers in an
- * "indirect block". If an indirect block fills up, we write it out to
- * the underlying file and remember its location in a second-level indirect
- * block. In the same way second-level blocks are remembered in third-
- * level blocks, and so on if necessary (of course we're talking huge
- * amounts of data here). The topmost indirect block of a given logical
- * tape is never actually written out to the physical file, but all lower-
- * level indirect blocks will be.
+ * The blocks in each logical tape form a chain, with a prev- and next-
+ * pointer in each block.
*
* The initial write pass is guaranteed to fill the underlying file
* perfectly sequentially, no matter how data is divided into logical tapes.
@@ -87,47 +80,40 @@
#include "utils/memutils.h"
/*
- * Block indexes are "long"s, so we can fit this many per indirect block.
- * NB: we assume this is an exact fit!
- */
-#define BLOCKS_PER_INDIR_BLOCK ((int) (BLCKSZ / sizeof(long)))
-
-/*
- * We use a struct like this for each active indirection level of each
- * logical tape. If the indirect block is not the highest level of its
- * tape, the "nextup" link points to the next higher level. Only the
- * "ptrs" array is written out if we have to dump the indirect block to
- * disk. If "ptrs" is not completely full, we store -1L in the first
- * unused slot at completion of the write phase for the logical tape.
+ * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
+ *
+ * The first block of a tape has prev == -1. The last block of a tape
+ * stores the number of valid bytes on the block, subtracted by BLCKSZ.
+ * That's always negative, so prev < 0 indicates the last block.
*/
-typedef struct IndirectBlock
+typedef struct TapeBlockTrailer
{
- int nextSlot; /* next pointer slot to write or read */
- struct IndirectBlock *nextup; /* parent indirect level, or NULL if
- * top */
- long ptrs[BLOCKS_PER_INDIR_BLOCK]; /* indexes of contained blocks */
-} IndirectBlock;
+ long prev; /* previous block on this tape, or -1 on first block */
+ long next; /* next block on this tape, or # of valid bytes on last block */
+} TapeBlockTrailer;
+
+#define TapeBlockGetTrailer(buf) ((TapeBlockTrailer *) ((char *) buf + BLCKSZ - sizeof(TapeBlockTrailer)))
+#define TapeBlockPayloadSize (BLCKSZ - sizeof(TapeBlockTrailer))
+
+#define TapeBlockGetNBytes(buf) (TapeBlockGetTrailer(buf)->next < 0 ? (TapeBlockGetTrailer(buf)->next + BLCKSZ) : TapeBlockPayloadSize)
+
+#define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
/*
* This data structure represents a single "logical tape" within the set
* of logical tapes stored in the same file. We must keep track of the
- * current partially-read-or-written data block as well as the active
- * indirect block level(s).
+ * current partially-read-or-written data block, as well as the first
+ * block, so that we can rewind.
*/
typedef struct LogicalTape
{
- IndirectBlock *indirect; /* bottom of my indirect-block hierarchy */
bool writing; /* T while in write phase */
bool frozen; /* T if blocks should not be freed when read */
bool dirty; /* does buffer need to be written? */
- /*
- * The total data volume in the logical tape is numFullBlocks * BLCKSZ +
- * lastBlockBytes. BUT: we do not update lastBlockBytes during writing,
- * only at completion of a write phase.
- */
- long numFullBlocks; /* number of complete blocks in log tape */
- int lastBlockBytes; /* valid bytes in last (incomplete) block */
+ long firstBlockNumber; /* first block's physical blk# in the file */
+ long curBlockNumber; /* this block's physical blk# in the file */
+ long nextBlockNumber; /* next block of the tape (physical blk#) */
/*
* Buffer for current data block. Note we don't bother to store the
@@ -138,7 +124,6 @@ typedef struct LogicalTape
*/
char *buffer; /* physical buffer (separately palloc'd) */
int buffer_size; /* allocated size of the buffer */
- long curBlockNumber; /* this block's logical blk# within tape */
int pos; /* next read/write position in buffer */
int nbytes; /* total # of valid bytes in buffer */
@@ -190,19 +175,6 @@ static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
static long ltsGetFreeBlock(LogicalTapeSet *lts);
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
-static void ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
- long blocknum);
-static long ltsRewindIndirectBlock(LogicalTapeSet *lts,
- IndirectBlock *indirect,
- bool freezing);
-static long ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
- IndirectBlock *indirect);
-static long ltsRecallNextBlockNum(LogicalTapeSet *lts,
- IndirectBlock *indirect,
- bool frozen);
-static long ltsRecallPrevBlockNum(LogicalTapeSet *lts,
- IndirectBlock *indirect);
-static void ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt);
/*
@@ -252,39 +224,36 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
* Returns true if anything was read, 'false' on EOF.
*/
static bool
-ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt, long datablocknum)
+ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
{
lt->pos = 0;
lt->nbytes = 0;
do
{
- /* Fetch next block number (unless provided by caller) */
- if (datablocknum == -1)
- {
- datablocknum = ltsRecallNextBlockNum(lts, lt->indirect, lt->frozen);
- if (datablocknum == -1L)
- break; /* EOF */
- lt->curBlockNumber++;
- }
+ char *thisbuf = lt->buffer + lt->nbytes;
+
+ /* Fetch next block number */
+ if (lt->nextBlockNumber == -1L)
+ break; /* EOF */
/* Read the block */
- ltsReadBlock(lts, datablocknum, (void *) (lt->buffer + lt->nbytes));
+ ltsReadBlock(lts, lt->nextBlockNumber, (void *) thisbuf);
if (!lt->frozen)
- ltsReleaseBlock(lts, datablocknum);
+ ltsReleaseBlock(lts, lt->nextBlockNumber);
- if (lt->curBlockNumber < lt->numFullBlocks)
- lt->nbytes += BLCKSZ;
- else
+ lt->nbytes += TapeBlockGetNBytes(thisbuf);
+ if (TapeBlockIsLast(thisbuf))
{
+ lt->nextBlockNumber = -1L;
/* EOF */
- lt->nbytes += lt->lastBlockBytes;
break;
}
+ else
+ lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
/* Advance to next block, if we have buffer space left */
- datablocknum = -1;
- } while (lt->nbytes < lt->buffer_size);
+ } while (lt->buffer_size - lt->nbytes > BLCKSZ);
return (lt->nbytes > 0);
}
@@ -369,203 +338,6 @@ ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
}
/*
- * These routines manipulate indirect-block hierarchies. All are recursive
- * so that they don't have any specific limit on the depth of hierarchy.
- */
-
-/*
- * Record a data block number in a logical tape's lowest indirect block,
- * or record an indirect block's number in the next higher indirect level.
- */
-static void
-ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
- long blocknum)
-{
- if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK)
- {
- /*
- * This indirect block is full, so dump it out and recursively save
- * its address in the next indirection level. Create a new
- * indirection level if there wasn't one before.
- */
- long indirblock = ltsGetFreeBlock(lts);
-
- ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
- if (indirect->nextup == NULL)
- {
- indirect->nextup = (IndirectBlock *) palloc(sizeof(IndirectBlock));
- indirect->nextup->nextSlot = 0;
- indirect->nextup->nextup = NULL;
- }
- ltsRecordBlockNum(lts, indirect->nextup, indirblock);
-
- /*
- * Reset to fill another indirect block at this level.
- */
- indirect->nextSlot = 0;
- }
- indirect->ptrs[indirect->nextSlot++] = blocknum;
-}
-
-/*
- * Reset a logical tape's indirect-block hierarchy after a write pass
- * to prepare for reading. We dump out partly-filled blocks except
- * at the top of the hierarchy, and we rewind each level to the start.
- * This call returns the first data block number, or -1L if the tape
- * is empty.
- *
- * Unless 'freezing' is true, release indirect blocks to the free pool after
- * reading them.
- */
-static long
-ltsRewindIndirectBlock(LogicalTapeSet *lts,
- IndirectBlock *indirect,
- bool freezing)
-{
- /* Handle case of never-written-to tape */
- if (indirect == NULL)
- return -1L;
-
- /* Insert sentinel if block is not full */
- if (indirect->nextSlot < BLOCKS_PER_INDIR_BLOCK)
- indirect->ptrs[indirect->nextSlot] = -1L;
-
- /*
- * If block is not topmost, write it out, and recurse to obtain address of
- * first block in this hierarchy level. Read that one in.
- */
- if (indirect->nextup != NULL)
- {
- long indirblock = ltsGetFreeBlock(lts);
-
- ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
- ltsRecordBlockNum(lts, indirect->nextup, indirblock);
- indirblock = ltsRewindIndirectBlock(lts, indirect->nextup, freezing);
- Assert(indirblock != -1L);
- ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
- if (!freezing)
- ltsReleaseBlock(lts, indirblock);
- }
-
- /*
- * Reset my next-block pointer, and then fetch a block number if any.
- */
- indirect->nextSlot = 0;
- if (indirect->ptrs[0] == -1L)
- return -1L;
- return indirect->ptrs[indirect->nextSlot++];
-}
-
-/*
- * Rewind a previously-frozen indirect-block hierarchy for another read pass.
- * This call returns the first data block number, or -1L if the tape
- * is empty.
- */
-static long
-ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
- IndirectBlock *indirect)
-{
- /* Handle case of never-written-to tape */
- if (indirect == NULL)
- return -1L;
-
- /*
- * If block is not topmost, recurse to obtain address of first block in
- * this hierarchy level. Read that one in.
- */
- if (indirect->nextup != NULL)
- {
- long indirblock;
-
- indirblock = ltsRewindFrozenIndirectBlock(lts, indirect->nextup);
- Assert(indirblock != -1L);
- ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
- }
-
- /*
- * Reset my next-block pointer, and then fetch a block number if any.
- */
- indirect->nextSlot = 0;
- if (indirect->ptrs[0] == -1L)
- return -1L;
- return indirect->ptrs[indirect->nextSlot++];
-}
-
-/*
- * Obtain next data block number in the forward direction, or -1L if no more.
- *
- * Unless 'frozen' is true, release indirect blocks to the free pool after
- * reading them.
- */
-static long
-ltsRecallNextBlockNum(LogicalTapeSet *lts,
- IndirectBlock *indirect,
- bool frozen)
-{
- /* Handle case of never-written-to tape */
- if (indirect == NULL)
- return -1L;
-
- if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK ||
- indirect->ptrs[indirect->nextSlot] == -1L)
- {
- long indirblock;
-
- if (indirect->nextup == NULL)
- return -1L; /* nothing left at this level */
- indirblock = ltsRecallNextBlockNum(lts, indirect->nextup, frozen);
- if (indirblock == -1L)
- return -1L; /* nothing left at this level */
- ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
- if (!frozen)
- ltsReleaseBlock(lts, indirblock);
- indirect->nextSlot = 0;
- }
- if (indirect->ptrs[indirect->nextSlot] == -1L)
- return -1L;
- return indirect->ptrs[indirect->nextSlot++];
-}
-
-/*
- * Obtain next data block number in the reverse direction, or -1L if no more.
- *
- * Note this fetches the block# before the one last returned, no matter which
- * direction of call returned that one. If we fail, no change in state.
- *
- * This routine can only be used in 'frozen' state, so there's no need to
- * pass a parameter telling whether to release blocks ... we never do.
- */
-static long
-ltsRecallPrevBlockNum(LogicalTapeSet *lts,
- IndirectBlock *indirect)
-{
- /* Handle case of never-written-to tape */
- if (indirect == NULL)
- return -1L;
-
- if (indirect->nextSlot <= 1)
- {
- long indirblock;
-
- if (indirect->nextup == NULL)
- return -1L; /* nothing left at this level */
- indirblock = ltsRecallPrevBlockNum(lts, indirect->nextup);
- if (indirblock == -1L)
- return -1L; /* nothing left at this level */
- ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
-
- /*
- * The previous block would only have been written out if full, so we
- * need not search it for a -1 sentinel.
- */
- indirect->nextSlot = BLOCKS_PER_INDIR_BLOCK + 1;
- }
- indirect->nextSlot--;
- return indirect->ptrs[indirect->nextSlot - 1];
-}
-
-
-/*
* Create a set of logical tapes in a temporary underlying file.
*
* Each tape is initialized in write state.
@@ -594,23 +366,21 @@ LogicalTapeSetCreate(int ntapes)
/*
* Initialize per-tape structs. Note we allocate the I/O buffer and
- * first-level indirect block for a tape only when it is first actually
+ * the first block for a tape only when it is first actually
* written to. This avoids wasting memory space when tuplesort.c
* overestimates the number of tapes needed.
*/
for (i = 0; i < ntapes; i++)
{
lt = <s->tapes[i];
- lt->indirect = NULL;
lt->writing = true;
lt->frozen = false;
lt->dirty = false;
- lt->numFullBlocks = 0L;
- lt->lastBlockBytes = 0;
+ lt->firstBlockNumber = -1L;
+ lt->curBlockNumber = -1L;
lt->buffer = NULL;
lt->buffer_size = 0;
lt->read_buffer_size = BLCKSZ;
- lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
}
@@ -624,19 +394,12 @@ void
LogicalTapeSetClose(LogicalTapeSet *lts)
{
LogicalTape *lt;
- IndirectBlock *ib,
- *nextib;
int i;
BufFileClose(lts->pfile);
for (i = 0; i < lts->nTapes; i++)
{
lt = <s->tapes[i];
- for (ib = lt->indirect; ib != NULL; ib = nextib)
- {
- nextib = ib->nextup;
- pfree(ib);
- }
if (lt->buffer)
pfree(lt->buffer);
}
@@ -660,21 +423,6 @@ LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
}
/*
- * Dump the dirty buffer of a logical tape.
- */
-static void
-ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt)
-{
- long datablock = ltsGetFreeBlock(lts);
-
- Assert(lt->dirty);
- ltsWriteBlock(lts, datablock, (void *) lt->buffer);
- ltsRecordBlockNum(lts, lt->indirect, datablock);
- lt->dirty = false;
- /* Caller must do other state update as needed */
-}
-
-/*
* Write to a logical tape.
*
* There are no error returns; we ereport() on failure.
@@ -690,39 +438,53 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
lt = <s->tapes[tapenum];
Assert(lt->writing);
- /* Allocate data buffer and first indirect block on first write */
+ /* Allocate data buffer and first block on first write */
if (lt->buffer == NULL)
{
lt->buffer = (char *) palloc(BLCKSZ);
lt->buffer_size = BLCKSZ;
}
- if (lt->indirect == NULL)
+ if (lt->curBlockNumber == -1)
{
- lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock));
- lt->indirect->nextSlot = 0;
- lt->indirect->nextup = NULL;
+ Assert(lt->firstBlockNumber == -1);
+ Assert(lt->pos == 0);
+
+ lt->curBlockNumber = ltsGetFreeBlock(lts);
+ lt->firstBlockNumber = lt->curBlockNumber;
+
+ TapeBlockGetTrailer(lt->buffer)->prev = -1L;
}
Assert(lt->buffer_size == BLCKSZ);
while (size > 0)
{
- if (lt->pos >= BLCKSZ)
+ if (lt->pos >= TapeBlockPayloadSize)
{
/* Buffer full, dump it out */
- if (lt->dirty)
- ltsDumpBuffer(lts, lt);
- else
+ long nextBlockNumber;
+
+ if (!lt->dirty)
{
/* Hmm, went directly from reading to writing? */
elog(ERROR, "invalid logtape state: should be dirty");
}
- lt->numFullBlocks++;
- lt->curBlockNumber++;
+
+ /*
+ * First allocate the next block, so that we can store it
+ * in the 'next' pointer of this block.
+ */
+ nextBlockNumber = ltsGetFreeBlock(lts);
+ TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
+
+ ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+
+ TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
+ lt->curBlockNumber = nextBlockNumber;
lt->pos = 0;
lt->nbytes = 0;
}
- nthistime = BLCKSZ - lt->pos;
+ nthistime = TapeBlockPayloadSize - lt->pos;
if (nthistime > size)
nthistime = size;
Assert(nthistime > 0);
@@ -748,7 +510,6 @@ void
LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
{
LogicalTape *lt;
- long datablocknum;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
@@ -759,14 +520,14 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
{
/*
* Completion of a write phase. Flush last partial data block,
- * flush any partial indirect blocks, rewind for normal
- * (destructive) read.
+ * and rewind for normal (destructive) read.
*/
if (lt->dirty)
- ltsDumpBuffer(lts, lt);
- lt->lastBlockBytes = lt->nbytes;
+ {
+ TapeBlockGetTrailer(lt->buffer)->next = lt->nbytes - BLCKSZ;
+ ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+ }
lt->writing = false;
- datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false);
}
else
{
@@ -775,7 +536,6 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
* pass.
*/
Assert(lt->frozen);
- datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
}
/* Allocate a read buffer */
@@ -785,11 +545,10 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
lt->buffer_size = lt->read_buffer_size;
/* Read the first block, or reset if tape is empty */
- lt->curBlockNumber = 0L;
+ lt->nextBlockNumber = lt->firstBlockNumber;
lt->pos = 0;
lt->nbytes = 0;
- if (datablocknum != -1L)
- ltsReadFillBuffer(lts, lt, datablocknum);
+ ltsReadFillBuffer(lts, lt);
}
else
{
@@ -797,30 +556,15 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
* Completion of a read phase. Rewind and prepare for write.
*
* NOTE: we assume the caller has read the tape to the end; otherwise
- * untouched data and indirect blocks will not have been freed. We
- * could add more code to free any unread blocks, but in current usage
- * of this module it'd be useless code.
+ * untouched data will not have been released. We could add more code
+ * to release any unread blocks, but in current usage of this module
+ * it'd be useless code.
*/
- IndirectBlock *ib,
- *nextib;
-
Assert(!lt->writing && !lt->frozen);
- /* Must truncate the indirect-block hierarchy down to one level. */
- if (lt->indirect)
- {
- for (ib = lt->indirect->nextup; ib != NULL; ib = nextib)
- {
- nextib = ib->nextup;
- pfree(ib);
- }
- lt->indirect->nextSlot = 0;
- lt->indirect->nextup = NULL;
- }
lt->writing = true;
lt->dirty = false;
- lt->numFullBlocks = 0L;
- lt->lastBlockBytes = 0;
- lt->curBlockNumber = 0L;
+ lt->firstBlockNumber = -1L;
+ lt->curBlockNumber = -1L;
lt->pos = 0;
lt->nbytes = 0;
@@ -855,7 +599,7 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
if (lt->pos >= lt->nbytes)
{
/* Try to load more data into buffer. */
- if (!ltsReadFillBuffer(lts, lt, -1))
+ if (!ltsReadFillBuffer(lts, lt))
break; /* EOF */
}
@@ -890,22 +634,23 @@ void
LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
{
LogicalTape *lt;
- long datablocknum;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
Assert(lt->writing);
/*
- * Completion of a write phase. Flush last partial data block, flush any
- * partial indirect blocks, rewind for nondestructive read.
+ * Completion of a write phase. Flush last partial data block, and
+ * rewind for nondestructive read.
*/
if (lt->dirty)
- ltsDumpBuffer(lts, lt);
- lt->lastBlockBytes = lt->nbytes;
+ {
+ TapeBlockGetTrailer(lt->buffer)->next = lt->nbytes - BLCKSZ;
+ ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+ lt->writing = false;
+ }
lt->writing = false;
lt->frozen = true;
- datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true);
/*
* The seek and backspace functions assume a single block read buffer.
@@ -923,15 +668,20 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
}
/* Read the first block, or reset if tape is empty */
- lt->curBlockNumber = 0L;
+ lt->curBlockNumber = lt->firstBlockNumber;
lt->pos = 0;
lt->nbytes = 0;
- if (datablocknum != -1L)
+
+ if (lt->firstBlockNumber == -1L)
{
- ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
- lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
- BLCKSZ : lt->lastBlockBytes;
+ lt->nextBlockNumber = -1L;
}
+ ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+ if (TapeBlockIsLast(lt->buffer))
+ lt->nextBlockNumber = -1L;
+ else
+ lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
+ lt->nbytes = TapeBlockGetNBytes(lt->buffer);
}
/*
@@ -942,15 +692,16 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
* random access during write, and an unfrozen read tape may have
* already discarded the desired data!
*
- * Return value is TRUE if seek successful, FALSE if there isn't that much
- * data before the current point (in which case there's no state change).
+ * Return value is TRUE if seek successful, FALSE if we are at the very
+ * beginning of the tape, in which case there's no state change.
+ * Note that backspacing beyond the beginning of the tape is not allowed,
+ * and will cause an error, except for the special case that we are
+ * currently positioned at the very beginning of the tape.
*/
bool
LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
{
LogicalTape *lt;
- long nblocks;
- int newpos;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
@@ -967,40 +718,35 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
}
/*
- * Not-so-easy case. Figure out whether it's possible at all.
+ * Not-so-easy case. Walk back the chain of blocks. This implementation
+ * would be pretty inefficient for long seeks, but we really aren't expecting
+ * that (a seek over one tuple is typical).
*/
+ if (lt->pos == 0 && TapeBlockGetTrailer(lt->buffer)->prev)
+ return false; /* beginning of tape */
+
size -= (size_t) lt->pos; /* part within this block */
- nblocks = size / BLCKSZ;
- size = size % BLCKSZ;
- if (size)
- {
- nblocks++;
- newpos = (int) (BLCKSZ - size);
- }
- else
- newpos = 0;
- if (nblocks > lt->curBlockNumber)
- return false; /* a seek too far... */
- /*
- * OK, we need to back up nblocks blocks. This implementation would be
- * pretty inefficient for long seeks, but we really aren't expecting that
- * (a seek over one tuple is typical).
- */
- while (nblocks-- > 0)
+ while (size > TapeBlockPayloadSize)
{
- long datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
+ long datablocknum = TapeBlockGetTrailer(lt->buffer)->prev;
if (datablocknum == -1L)
elog(ERROR, "unexpected end of tape");
- lt->curBlockNumber--;
- if (nblocks == 0)
- {
- ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
- lt->nbytes = BLCKSZ;
- }
+
+ ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+
+ if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
+ elog(ERROR, "broken tape");
+
+ lt->nbytes = TapeBlockPayloadSize;
+ lt->curBlockNumber = datablocknum;
+
+ size -= TapeBlockPayloadSize;
}
- lt->pos = newpos;
+
+ lt->pos = TapeBlockPayloadSize - size;
+
return true;
}
@@ -1009,10 +755,10 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
*
* *Only* a frozen-for-read tape can be seeked.
*
- * Return value is TRUE if seek successful, FALSE if there isn't that much
- * data in the tape (in which case there's no state change).
+ * Must be called with a block/offset previously returned by
+ * LogicalTapeTell().
*/
-bool
+void
LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
long blocknum, int offset)
{
@@ -1021,53 +767,15 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
Assert(lt->frozen);
- Assert(offset >= 0 && offset <= BLCKSZ);
Assert(lt->buffer_size == BLCKSZ);
+ Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
- /*
- * Easy case for seek within current block.
- */
- if (blocknum == lt->curBlockNumber && offset <= lt->nbytes)
- {
- lt->pos = offset;
- return true;
- }
-
- /*
- * Not-so-easy case. Figure out whether it's possible at all.
- */
- if (blocknum < 0 || blocknum > lt->numFullBlocks ||
- (blocknum == lt->numFullBlocks && offset > lt->lastBlockBytes))
- return false;
-
- /*
- * OK, advance or back up to the target block. This implementation would
- * be pretty inefficient for long seeks, but we really aren't expecting
- * that (a seek over one tuple is typical).
- */
- while (lt->curBlockNumber > blocknum)
- {
- long datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
-
- if (datablocknum == -1L)
- elog(ERROR, "unexpected end of tape");
- if (--lt->curBlockNumber == blocknum)
- ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
- }
- while (lt->curBlockNumber < blocknum)
- {
- long datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
- lt->frozen);
+ if (blocknum != lt->curBlockNumber)
+ ltsReadBlock(lts, blocknum, (void *) lt->buffer);
- if (datablocknum == -1L)
- elog(ERROR, "unexpected end of tape");
- if (++lt->curBlockNumber == blocknum)
- ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
- }
- lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
- BLCKSZ : lt->lastBlockBytes;
+ if (offset > lt->nbytes)
+ elog(ERROR, "invalid tape seek position");
lt->pos = offset;
- return true;
}
/*
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 5ff81ed..5c84aee 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -239,15 +239,15 @@ typedef enum
* Parameters for calculation of number of tapes to use --- see inittapes()
* and tuplesort_merge_order().
*
- * In this calculation we assume that each tape will cost us about 3 blocks
- * worth of buffer space (which is an underestimate for very large data
- * volumes, but it's probably close enough --- see logtape.c).
+ * In this calculation we assume that each tape will cost us about 1 blocks
+ * worth of buffer space. This ignores the overhead of all the other data
+ * structures needed for each tape, but it's probably close enough.
*
* MERGE_BUFFER_SIZE is how much data we'd like to read from each input
* tape during a preread cycle (see discussion at top of file).
*/
#define MINORDER 6 /* minimum merge order */
-#define TAPE_BUFFER_OVERHEAD (BLCKSZ * 3)
+#define TAPE_BUFFER_OVERHEAD BLCKSZ
#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
/*
@@ -3240,11 +3240,10 @@ tuplesort_restorepos(Tuplesortstate *state)
state->eof_reached = state->markpos_eof;
break;
case TSS_SORTEDONTAPE:
- if (!LogicalTapeSeek(state->tapeset,
- state->result_tape,
- state->markpos_block,
- state->markpos_offset))
- elog(ERROR, "tuplesort_restorepos failed");
+ LogicalTapeSeek(state->tapeset,
+ state->result_tape,
+ state->markpos_block,
+ state->markpos_offset);
state->eof_reached = state->markpos_eof;
break;
default:
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
index 362a619..92e8245 100644
--- a/src/include/utils/logtape.h
+++ b/src/include/utils/logtape.h
@@ -35,7 +35,7 @@ extern void LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite);
extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum);
extern bool LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
size_t size);
-extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
+extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
long blocknum, int offset);
extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
long *blocknum, int *offset);
--
2.9.3
>From 7bd61c84a11b6a57f0dec0650c498a24f3810e98 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Wed, 21 Sep 2016 17:22:53 +0300
Subject: [PATCH 2/2] Pause/resume support.
When a tape is "paused", we write the final block to disk, and release the
buffer, but still allow writing to it again later. This saves memory
in the initial run building phase, as we can release the buffer of all
tapes except the one we're currently writing to, which frees up the memory
for the quicksort. This is particularly important because we used to
reserve the memory for maxTapes, i.e. the number of tapes that we might
want to use, not the actual number of tapes used.
---
src/backend/utils/sort/logtape.c | 70 +++++++++++++++++++++++++++++++++-----
src/backend/utils/sort/tuplesort.c | 22 +++++++-----
src/include/utils/logtape.h | 1 +
3 files changed, 77 insertions(+), 16 deletions(-)
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index cc58408..0a4cebc 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -110,6 +110,8 @@ typedef struct LogicalTape
bool writing; /* T while in write phase */
bool frozen; /* T if blocks should not be freed when read */
bool dirty; /* does buffer need to be written? */
+ bool paused; /* a paused tape is writeable, but with
+ * no buffer allocated */
long firstBlockNumber; /* first block's physical blk# in the file */
long curBlockNumber; /* this block's physical blk# in the file */
@@ -376,6 +378,7 @@ LogicalTapeSetCreate(int ntapes)
lt->writing = true;
lt->frozen = false;
lt->dirty = false;
+ lt->paused = false;
lt->firstBlockNumber = -1L;
lt->curBlockNumber = -1L;
lt->buffer = NULL;
@@ -443,6 +446,22 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
{
lt->buffer = (char *) palloc(BLCKSZ);
lt->buffer_size = BLCKSZ;
+
+ /* if the tape was paused, resume it now. */
+ if (lt->paused)
+ {
+ ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+
+ /* 'pos' and 'nbytes' should still be valid */
+ Assert(lt->nbytes == TapeBlockGetNBytes(lt->buffer));
+ lt->paused = false;
+ }
+ else
+ {
+ Assert(lt->curBlockNumber == -1);
+ lt->pos = 0;
+ lt->nbytes = 0;
+ }
}
if (lt->curBlockNumber == -1)
{
@@ -463,12 +482,6 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
/* Buffer full, dump it out */
long nextBlockNumber;
- if (!lt->dirty)
- {
- /* Hmm, went directly from reading to writing? */
- elog(ERROR, "invalid logtape state: should be dirty");
- }
-
/*
* First allocate the next block, so that we can store it
* in the 'next' pointer of this block.
@@ -501,6 +514,36 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
}
/*
+ * Pause writing to a tape.
+ *
+ * This writes the last partial data block to disk, and releases the memory
+ * allocated for the write buffer, but does not rewind it. If you you call
+ * LogicalTapeWrite() on a paused tape, it will be un-paused implicitly,
+ * and the last parital block is read back into memory.
+ */
+void
+LogicalTapePause(LogicalTapeSet *lts, int tapenum)
+{
+ LogicalTape *lt;
+
+ Assert(tapenum >= 0 && tapenum < lts->nTapes);
+ lt = <s->tapes[tapenum];
+
+ Assert(lt->writing);
+
+ /* Flush last partial data block. */
+ TapeBlockGetTrailer(lt->buffer)->next = lt->nbytes - BLCKSZ;
+ ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+ lt->dirty = false;
+
+ pfree(lt->buffer);
+ lt->buffer = NULL;
+
+ lt->paused = true;
+ /* Note: leave 'pos' and 'nbytes' untouched */
+}
+
+/*
* Rewind logical tape and switch from writing to reading or vice versa.
*
* Unless the tape has been "frozen" in read state, forWrite must be the
@@ -516,7 +559,17 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
if (!forWrite)
{
- if (lt->writing)
+ if (lt->paused)
+ {
+ /*
+ * A paused tape is flushed to disk already, we just have to
+ * change the state in memory to indicate that we're reading
+ * it now, and allocate a buffer for the reading.
+ */
+ Assert(lt->writing);
+ lt->paused = false;
+ }
+ else if (lt->writing)
{
/*
* Completion of a write phase. Flush last partial data block,
@@ -527,7 +580,6 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
TapeBlockGetTrailer(lt->buffer)->next = lt->nbytes - BLCKSZ;
ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
}
- lt->writing = false;
}
else
{
@@ -537,6 +589,7 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
*/
Assert(lt->frozen);
}
+ lt->writing = false;
/* Allocate a read buffer */
if (lt->buffer)
@@ -650,6 +703,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
lt->writing = false;
}
lt->writing = false;
+ lt->paused = false;
lt->frozen = true;
/*
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 5c84aee..823cb41 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -2355,15 +2355,15 @@ inittapes(Tuplesortstate *state)
#endif
/*
- * Decrease availMem to reflect the space needed for tape buffers, when
- * writing the initial runs; but don't decrease it to the point that we
- * have no room for tuples. (That case is only likely to occur if sorting
- * pass-by-value Datums; in all other scenarios the memtuples[] array is
- * unlikely to occupy more than half of allowedMem. In the pass-by-value
- * case it's not important to account for tuple space, so we don't care if
- * LACKMEM becomes inaccurate.)
+ * Decrease availMem to reflect the space needed for tape buffers of the
+ * output tape, when writing the initial runs; but don't decrease it to the
+ * point that we have no room for tuples. (That case is only likely to
+ * occur if sorting* pass-by-value Datums; in all other scenarios the
+ * memtuples[] array is unlikely to occupy more than half of allowedMem.
+ * In the pass-by-value case it's not important to account for tuple space,
+ * so we don't care if LACKMEM becomes inaccurate.)
*/
- tapeSpace = (int64) maxTapes *TAPE_BUFFER_OVERHEAD;
+ tapeSpace = (int64) TAPE_BUFFER_OVERHEAD;
if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
USEMEM(state, tapeSpace);
@@ -2455,6 +2455,12 @@ selectnewtape(Tuplesortstate *state)
int j;
int a;
+ /*
+ * Pause the old tape, to make the memory used for its buffer available
+ * for building the next run.
+ */
+ LogicalTapePause(state->tapeset, state->destTape);
+
/* Step D3: advance j (destTape) */
if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
{
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
index 92e8245..6e3e5be 100644
--- a/src/include/utils/logtape.h
+++ b/src/include/utils/logtape.h
@@ -32,6 +32,7 @@ extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
void *ptr, size_t size);
extern void LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite);
+extern void LogicalTapePause(LogicalTapeSet *lts, int tapenum);
extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum);
extern bool LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
size_t size);
--
2.9.3
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers