On Tue, 2020-02-04 at 15:08 -0800, Peter Geoghegan wrote:
> Have you tested this against tuplesort.c, particularly parallel
> CREATE
> INDEX? It would be worth trying to measure any performance impact.
> Note that most parallel CREATE INDEX tuplesorts will do a merge
> within
> each worker, and one big merge in the leader. It's much more likely
> to
> have multiple passes than a regular serial external sort.
I did not observe any performance regression when creating an index in
parallel over 20M ints (random ints in random order). I tried 2
parallel workers with work_mem=4MB and also 4 parallel workers with
work_mem=256kB.
> Have you thought about integer overflow in your heap related
> routines?
> This isn't as unlikely as you might think. See commit 512f67c8, for
> example.
It's dealing with blocks rather than tuples, so it's a bit less likely.
But changed it to use "unsigned long" instead.
> Have you thought about the MaxAllocSize restriction as it concerns
> lts->freeBlocks? Will that be okay when you have many more tapes than
> before?
I added a check. If it exceeds MaxAllocSize, before trying to perform
the allocation, just leak the block rather than adding it to the
freelist. Perhaps there's a usecase for an extraordinarily-long
freelist, but it's outside the scope of this patch.
> LogicalTapeSetExtend() seems to work in a way that assumes that the
> tape is frozen. It would be good to document that assumption, and
> possible enforce it by way of an assertion. The same remark applies
> to
> any other assumptions you're making there.
Can you explain? I am not freezing any tapes in Hash Aggregation, so
what about LogicalTapeSetExtend() assumes the tape is frozen?
Attached new logtape.c patches.
Regards,
Jeff Davis
From d3593ff34c83c20c75165624faf6d84803390b36 Mon Sep 17 00:00:00 2001
From: Jeff Davis <[email protected]>
Date: Fri, 31 Jan 2020 16:43:41 -0800
Subject: [PATCH 1/3] Logical Tape Set: lazily allocate read buffer.
The write buffer was already lazily-allocated, so this is more
symmetric. It also means that a freshly-rewound tape (whether for
reading or writing) is not consuming memory for the buffer.
---
src/backend/utils/sort/logtape.c | 27 ++++++++++++++++++++-------
1 file changed, 20 insertions(+), 7 deletions(-)
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 42cfb1f9f98..ba6d6e1f80a 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -773,15 +773,12 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
lt->buffer_size = 0;
if (lt->firstBlockNumber != -1L)
{
- lt->buffer = palloc(buffer_size);
+ /*
+ * The buffer is lazily allocated in LogicalTapeRead(), but we set the
+ * size here.
+ */
lt->buffer_size = buffer_size;
}
-
- /* Read the first block, or reset if tape is empty */
- lt->nextBlockNumber = lt->firstBlockNumber;
- lt->pos = 0;
- lt->nbytes = 0;
- ltsReadFillBuffer(lts, lt);
}
/*
@@ -830,6 +827,22 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
lt = <s->tapes[tapenum];
Assert(!lt->writing);
+ if (lt->buffer == NULL)
+ {
+ /* lazily allocate buffer */
+ if (lt->firstBlockNumber != -1L)
+ {
+ Assert(lt->buffer_size > 0);
+ lt->buffer = palloc(lt->buffer_size);
+ }
+
+ /* Read the first block, or reset if tape is empty */
+ lt->nextBlockNumber = lt->firstBlockNumber;
+ lt->pos = 0;
+ lt->nbytes = 0;
+ ltsReadFillBuffer(lts, lt);
+ }
+
while (size > 0)
{
if (lt->pos >= lt->nbytes)
--
2.17.1
From 667335c98e3ee830b042801b29b62053325070d2 Mon Sep 17 00:00:00 2001
From: Jeff Davis <[email protected]>
Date: Fri, 31 Jan 2020 16:44:40 -0800
Subject: [PATCH 2/3] Logical Tape Set: change freelist to min heap.
Previously, the freelist of blocks was tracked as an
occasionally-sorted array. A min heap is more resilient to larger
freelists or more frequent changes between reading and writing.
---
src/backend/utils/sort/logtape.c | 160 ++++++++++++++++++++-----------
1 file changed, 104 insertions(+), 56 deletions(-)
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index ba6d6e1f80a..8d934f6d44e 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -49,12 +49,8 @@
* when reading, and read multiple blocks from the same tape in one go,
* whenever the buffer becomes empty.
*
- * To support the above policy of writing to the lowest free block,
- * ltsGetFreeBlock sorts the list of free block numbers into decreasing
- * order each time it is asked for a block and the list isn't currently
- * sorted. This is an efficient way to handle it because we expect cycles
- * of releasing many blocks followed by re-using many blocks, due to
- * the larger read buffer.
+ * To support the above policy of writing to the lowest free block, the
+ * freelist is a min heap.
*
* Since all the bookkeeping and buffer memory is allocated with palloc(),
* and the underlying file(s) are made with OpenTemporaryFile, all resources
@@ -170,7 +166,7 @@ struct LogicalTapeSet
/*
* File size tracking. nBlocksWritten is the size of the underlying file,
* in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated
- * by ltsGetFreeBlock(), and it is always greater than or equal to
+ * by ltsReleaseBlock(), and it is always greater than or equal to
* nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are
* blocks that have been allocated for a tape, but have not been written
* to the underlying file yet. nHoleBlocks tracks the total number of
@@ -188,17 +184,11 @@ struct LogicalTapeSet
* If forgetFreeSpace is true then any freed blocks are simply forgotten
* rather than being remembered in freeBlocks[]. See notes for
* LogicalTapeSetForgetFreeSpace().
- *
- * If blocksSorted is true then the block numbers in freeBlocks are in
- * *decreasing* order, so that removing the last entry gives us the lowest
- * free block. We re-sort the blocks whenever a block is demanded; this
- * should be reasonably efficient given the expected usage pattern.
*/
bool forgetFreeSpace; /* are we remembering free blocks? */
- bool blocksSorted; /* is freeBlocks[] currently in order? */
- long *freeBlocks; /* resizable array */
- int nFreeBlocks; /* # of currently free blocks */
- int freeBlocksLen; /* current allocated length of freeBlocks[] */
+ long *freeBlocks; /* resizable array holding minheap */
+ long nFreeBlocks; /* # of currently free blocks */
+ Size freeBlocksLen; /* current allocated length of freeBlocks[] */
/* The array of logical tapes. */
int nTapes; /* # of logical tapes in set */
@@ -321,46 +311,88 @@ ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
return (lt->nbytes > 0);
}
-/*
- * qsort comparator for sorting freeBlocks[] into decreasing order.
- */
-static int
-freeBlocks_cmp(const void *a, const void *b)
+static inline void
+swap_nodes(long *heap, unsigned long a, unsigned long b)
+{
+ unsigned long swap;
+
+ swap = heap[a];
+ heap[a] = heap[b];
+ heap[b] = swap;
+}
+
+static inline unsigned long
+left_offset(unsigned long i)
+{
+ return 2 * i + 1;
+}
+
+static inline unsigned long
+right_offset(unsigned i)
+{
+ return 2 * i + 2;
+}
+
+static inline unsigned long
+parent_offset(unsigned long i)
{
- long ablk = *((const long *) a);
- long bblk = *((const long *) b);
-
- /* can't just subtract because long might be wider than int */
- if (ablk < bblk)
- return 1;
- if (ablk > bblk)
- return -1;
- return 0;
+ return (i - 1) / 2;
}
/*
- * Select a currently unused block for writing to.
+ * Select the lowest currently unused block by taking the first element from
+ * the freelist min heap.
*/
static long
ltsGetFreeBlock(LogicalTapeSet *lts)
{
- /*
- * If there are multiple free blocks, we select the one appearing last in
- * freeBlocks[] (after sorting the array if needed). If there are none,
- * assign the next block at the end of the file.
- */
- if (lts->nFreeBlocks > 0)
+ long *heap = lts->freeBlocks;
+ long blocknum;
+ int heapsize;
+ unsigned long pos;
+
+ /* freelist empty; allocate a new block */
+ if (lts->nFreeBlocks == 0)
+ return lts->nBlocksAllocated++;
+
+ if (lts->nFreeBlocks == 1)
{
- if (!lts->blocksSorted)
- {
- qsort((void *) lts->freeBlocks, lts->nFreeBlocks,
- sizeof(long), freeBlocks_cmp);
- lts->blocksSorted = true;
- }
- return lts->freeBlocks[--lts->nFreeBlocks];
+ lts->nFreeBlocks--;
+ return lts->freeBlocks[0];
}
- else
- return lts->nBlocksAllocated++;
+
+ /* take top of minheap */
+ blocknum = heap[0];
+
+ /* replace with end of minheap array */
+ heap[0] = heap[--lts->nFreeBlocks];
+
+ /* sift down */
+ pos = 0;
+ heapsize = lts->nFreeBlocks;
+ while (true)
+ {
+ unsigned long left = left_offset(pos);
+ unsigned long right = right_offset(pos);
+ unsigned long min_child;
+
+ if (left < heapsize && right < heapsize)
+ min_child = (heap[left] < heap[right]) ? left : right;
+ else if (left < heapsize)
+ min_child = left;
+ else if (right < heapsize)
+ min_child = right;
+ else
+ break;
+
+ if (heap[min_child] >= heap[pos])
+ break;
+
+ swap_nodes(heap, min_child, pos);
+ pos = min_child;
+ }
+
+ return blocknum;
}
/*
@@ -369,7 +401,8 @@ ltsGetFreeBlock(LogicalTapeSet *lts)
static void
ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
{
- int ndx;
+ long *heap;
+ unsigned long pos;
/*
* Do nothing if we're no longer interested in remembering free space.
@@ -382,19 +415,35 @@ ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
*/
if (lts->nFreeBlocks >= lts->freeBlocksLen)
{
+ /*
+ * If the freelist becomes very large, just return and leak this free
+ * block.
+ */
+ if (lts->freeBlocksLen * 2 > MaxAllocSize)
+ return;
+
lts->freeBlocksLen *= 2;
lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
lts->freeBlocksLen * sizeof(long));
}
- /*
- * Add blocknum to array, and mark the array unsorted if it's no longer in
- * decreasing order.
- */
- ndx = lts->nFreeBlocks++;
- lts->freeBlocks[ndx] = blocknum;
- if (ndx > 0 && lts->freeBlocks[ndx - 1] < blocknum)
- lts->blocksSorted = false;
+ heap = lts->freeBlocks;
+ pos = lts->nFreeBlocks;
+
+ /* place entry at end of minheap array */
+ heap[pos] = blocknum;
+ lts->nFreeBlocks++;
+
+ /* sift up */
+ while (pos != 0)
+ {
+ unsigned long parent = parent_offset(pos);
+ if (heap[parent] < heap[pos])
+ break;
+
+ swap_nodes(heap, parent, pos);
+ pos = parent;
+ }
}
/*
@@ -524,7 +573,6 @@ LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset,
lts->nBlocksWritten = 0L;
lts->nHoleBlocks = 0L;
lts->forgetFreeSpace = false;
- lts->blocksSorted = true; /* a zero-length array is sorted ... */
lts->freeBlocksLen = 32; /* reasonable initial guess */
lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
lts->nFreeBlocks = 0;
--
2.17.1
From 6e7b9f8b246150e36e7f7df513dbfcbef9f6102d Mon Sep 17 00:00:00 2001
From: Jeff Davis <[email protected]>
Date: Fri, 31 Jan 2020 16:42:38 -0800
Subject: [PATCH 3/3] Logical Tape Set: add API to extend with additional
tapes.
---
src/backend/utils/sort/logtape.c | 71 +++++++++++++++++++++-----------
src/include/utils/logtape.h | 2 +
2 files changed, 50 insertions(+), 23 deletions(-)
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 8d934f6d44e..7556abdb4aa 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -201,6 +201,7 @@ static long ltsGetFreeBlock(LogicalTapeSet *lts);
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
SharedFileSet *fileset);
+static void ltsInitTape(LogicalTape *lt);
/*
@@ -535,6 +536,30 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
}
+/*
+ * Initialize per-tape struct. Note we allocate the I/O buffer and the first
+ * block for a tape only when it is first actually written to. This avoids
+ * wasting memory space when tuplesort.c overestimates the number of tapes
+ * needed.
+ */
+static void
+ltsInitTape(LogicalTape *lt)
+{
+ lt->writing = true;
+ lt->frozen = false;
+ lt->dirty = false;
+ lt->firstBlockNumber = -1L;
+ lt->curBlockNumber = -1L;
+ lt->nextBlockNumber = -1L;
+ lt->offsetBlockNumber = 0L;
+ lt->buffer = NULL;
+ lt->buffer_size = 0;
+ /* palloc() larger than MaxAllocSize would fail */
+ lt->max_size = MaxAllocSize;
+ lt->pos = 0;
+ lt->nbytes = 0;
+}
+
/*
* Create a set of logical tapes in a temporary underlying file.
*
@@ -560,7 +585,6 @@ LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset,
int worker)
{
LogicalTapeSet *lts;
- LogicalTape *lt;
int i;
/*
@@ -578,29 +602,8 @@ LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset,
lts->nFreeBlocks = 0;
lts->nTapes = ntapes;
- /*
- * Initialize per-tape structs. Note we allocate the I/O buffer and the
- * first block for a tape only when it is first actually written to. This
- * avoids wasting memory space when tuplesort.c overestimates the number
- * of tapes needed.
- */
for (i = 0; i < ntapes; i++)
- {
- lt = <s->tapes[i];
- lt->writing = true;
- lt->frozen = false;
- lt->dirty = false;
- lt->firstBlockNumber = -1L;
- lt->curBlockNumber = -1L;
- lt->nextBlockNumber = -1L;
- lt->offsetBlockNumber = 0L;
- lt->buffer = NULL;
- lt->buffer_size = 0;
- /* palloc() larger than MaxAllocSize would fail */
- lt->max_size = MaxAllocSize;
- lt->pos = 0;
- lt->nbytes = 0;
- }
+ ltsInitTape(<s->tapes[i]);
/*
* Create temp BufFile storage as required.
@@ -1004,6 +1007,28 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
}
}
+/*
+ * Add additional tapes to this tape set.
+ */
+LogicalTapeSet *
+LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional)
+{
+ int i;
+ int nTapesOrig = lts->nTapes;
+ Size newSize;
+
+ lts->nTapes += nAdditional;
+ newSize = offsetof(LogicalTapeSet, tapes) +
+ lts->nTapes * sizeof(LogicalTape);
+
+ lts = (LogicalTapeSet *) repalloc(lts, newSize);
+
+ for (i = nTapesOrig; i < lts->nTapes; i++)
+ ltsInitTape(<s->tapes[i]);
+
+ return lts;
+}
+
/*
* Backspace the tape a given number of bytes. (We also support a more
* general seek interface, see below.)
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
index 695d2c00ee4..3ebe52239f8 100644
--- a/src/include/utils/logtape.h
+++ b/src/include/utils/logtape.h
@@ -67,6 +67,8 @@ extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum,
extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum);
extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum,
TapeShare *share);
+extern LogicalTapeSet *LogicalTapeSetExtend(LogicalTapeSet *lts,
+ int nAdditional);
extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
size_t size);
extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
--
2.17.1