On 10/06/2016 06:44 PM, Peter Geoghegan wrote:
While the fix you pushed was probably a good idea anyway, I still think you should not use swhichtate->maxTapes to exhaustively call LogicalTapeAssignReadBufferSize() on every tape, even non-active tapes. That's the confusing part.
Admittedly that's confusing. Thinking about this some more, I came up with the attached. I removed the separate LogicalTapeAssignReadBufferSize() call altogether - the read buffer size is now passed as argument to the LogicalTapeRewindForRead() call.
I split LogicalTapeRewind() into two: LogicalTapeRewindForRead() and LogicalTapeRewindForWrite(). A boolean argument like 'forWrite' is mentally challenging, because when reading a call site, you have to remember which way round the boolean is. And now you also pass the extra buffer_size argument when rewinding for reading, but not when writing.
I gave up on the logic to calculate the quotient and reminder of the available memory, and assigning one extra buffer to some of the tapes. I'm now just rounding it down to the nearest BLCKSZ boundary. That simplifies the code further, although we're now not using all the memory we could. I'm pretty sure that's OK, although I haven't done any performance testing of this.
- Heikki
>From 9862ff0cd184afc50515854b267e3a6456547ac6 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas <heikki.linnakan...@iki.fi> Date: Mon, 10 Oct 2016 15:20:05 +0300 Subject: [PATCH 1/1] Simplify the code for logical tape read buffers. Pass the buffer size as argument to LogicalTapeRewindForRead, rather than setting it earlier with the separate LogicTapeAssignReadBufferSize call. This way, the buffer size is set closer to where it's actually used, which makes the code easier to understand. This makes the calculation for how much memory to use for the buffers less precise. We now use the same amount of memory for every tape, rounded down to the nearest BLCKSZ boundary, instead of using one more block for some tapes, to get the total up to exact amount of memory available. That should be OK, merging isn't too sensitive to the exact amount of memory used. --- src/backend/utils/sort/logtape.c | 214 ++++++++++++++++++------------------- src/backend/utils/sort/tuplesort.c | 119 ++++++--------------- src/include/utils/logtape.h | 5 +- 3 files changed, 137 insertions(+), 201 deletions(-) diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index 6cc06b2..25090c7 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -141,14 +141,6 @@ typedef struct LogicalTape long curBlockNumber; /* this block's logical blk# within tape */ int pos; /* next read/write position in buffer */ int nbytes; /* total # of valid bytes in buffer */ - - /* - * Desired buffer size to use when reading. To keep things simple, we use - * a single-block buffer when writing, or when reading a frozen tape. But - * when we are reading and will only read forwards, we allocate a larger - * buffer, determined by read_buffer_size. - */ - int read_buffer_size; } LogicalTape; /* @@ -609,7 +601,6 @@ LogicalTapeSetCreate(int ntapes) lt->lastBlockBytes = 0; lt->buffer = NULL; lt->buffer_size = 0; - lt->read_buffer_size = BLCKSZ; lt->curBlockNumber = 0L; lt->pos = 0; lt->nbytes = 0; @@ -739,13 +730,20 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, } /* - * Rewind logical tape and switch from writing to reading or vice versa. + * Rewind logical tape and switch from writing to reading. * - * Unless the tape has been "frozen" in read state, forWrite must be the - * opposite of the previous tape state. + * The tape must currently be in writing state, or "frozen" in read state. + * + * 'buffer_size' specifies how much memory to use for the read buffer. It + * does not include the memory needed for the indirect blocks. Regardless + * of the argument, the actual amount of memory used is between BLCKSZ and + * MaxAllocSize, and is a multiple of BLCKSZ. The given value is rounded + * down and truncated to fit those constraints, if necessary. If the tape + * is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ byte + * buffer is used. */ void -LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite) +LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size) { LogicalTape *lt; long datablocknum; @@ -753,88 +751,111 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite) Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; - if (!forWrite) + /* + * Round and cap the buffer_size if needed. + */ + if (lt->frozen) + buffer_size = BLCKSZ; + else { - if (lt->writing) - { - /* - * Completion of a write phase. Flush last partial data block, - * flush any partial indirect blocks, rewind for normal - * (destructive) read. - */ - if (lt->dirty) - ltsDumpBuffer(lts, lt); - lt->lastBlockBytes = lt->nbytes; - lt->writing = false; - datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false); - } - else - { - /* - * This is only OK if tape is frozen; we rewind for (another) read - * pass. - */ - Assert(lt->frozen); - datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect); - } - - /* Allocate a read buffer (unless the tape is empty) */ - if (lt->buffer) - pfree(lt->buffer); - lt->buffer = NULL; - lt->buffer_size = 0; - if (datablocknum != -1L) - { - lt->buffer = palloc(lt->read_buffer_size); - lt->buffer_size = lt->read_buffer_size; - } + /* need at least one block */ + if (buffer_size < BLCKSZ) + buffer_size = BLCKSZ; + /* + * palloc() larger than MaxAllocSize would fail (a multi-gigabyte + * buffer is unlikely to be helpful, anyway) + */ + if (buffer_size > MaxAllocSize) + buffer_size = MaxAllocSize; + /* round down to BLCKSZ boundary */ + buffer_size -= buffer_size % BLCKSZ; + } - /* Read the first block, or reset if tape is empty */ - lt->curBlockNumber = 0L; - lt->pos = 0; - lt->nbytes = 0; - if (datablocknum != -1L) - ltsReadFillBuffer(lts, lt, datablocknum); + if (lt->writing) + { + /* + * Completion of a write phase. Flush last partial data block, + * flush any partial indirect blocks, rewind for normal + * (destructive) read. + */ + if (lt->dirty) + ltsDumpBuffer(lts, lt); + lt->lastBlockBytes = lt->nbytes; + lt->writing = false; + datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false); } else { /* - * Completion of a read phase. Rewind and prepare for write. - * - * NOTE: we assume the caller has read the tape to the end; otherwise - * untouched data and indirect blocks will not have been freed. We - * could add more code to free any unread blocks, but in current usage - * of this module it'd be useless code. + * This is only OK if tape is frozen; we rewind for (another) read + * pass. */ - IndirectBlock *ib, - *nextib; + Assert(lt->frozen); + datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect); + } - Assert(!lt->writing && !lt->frozen); - /* Must truncate the indirect-block hierarchy down to one level. */ - if (lt->indirect) - { - for (ib = lt->indirect->nextup; ib != NULL; ib = nextib) - { - nextib = ib->nextup; - pfree(ib); - } - lt->indirect->nextSlot = 0; - lt->indirect->nextup = NULL; - } - lt->writing = true; - lt->dirty = false; - lt->numFullBlocks = 0L; - lt->lastBlockBytes = 0; - lt->curBlockNumber = 0L; - lt->pos = 0; - lt->nbytes = 0; + /* Allocate a read buffer (unless the tape is empty) */ + if (lt->buffer) + pfree(lt->buffer); + lt->buffer = NULL; + lt->buffer_size = 0; + if (datablocknum != -1L) + { + lt->buffer = palloc(buffer_size); + lt->buffer_size = buffer_size; + } - if (lt->buffer) + /* Read the first block, or reset if tape is empty */ + lt->curBlockNumber = 0L; + lt->pos = 0; + lt->nbytes = 0; + if (datablocknum != -1L) + ltsReadFillBuffer(lts, lt, datablocknum); +} + +/* + * Rewind logical tape and switch from reading to writing. + * + * NOTE: we assume the caller has read the tape to the end; otherwise + * untouched data and indirect blocks will not have been freed. We + * could add more code to free any unread blocks, but in current usage + * of this module it'd be useless code. + */ +void +LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum) +{ + LogicalTape *lt; + IndirectBlock *ib, + *nextib; + + Assert(tapenum >= 0 && tapenum < lts->nTapes); + lt = <s->tapes[tapenum]; + + Assert(!lt->writing && !lt->frozen); + /* Must truncate the indirect-block hierarchy down to one level. */ + if (lt->indirect) + { + for (ib = lt->indirect->nextup; ib != NULL; ib = nextib) { - pfree(lt->buffer); - lt->buffer = NULL; - lt->buffer_size = 0; + nextib = ib->nextup; + pfree(ib); } + lt->indirect->nextSlot = 0; + lt->indirect->nextup = NULL; + } + lt->writing = true; + lt->dirty = false; + lt->numFullBlocks = 0L; + lt->lastBlockBytes = 0; + lt->curBlockNumber = 0L; + lt->pos = 0; + lt->nbytes = 0; + + if (lt->buffer) + { + pfree(lt->buffer); + lt->buffer = NULL; + lt->buffer_size = 0; } } @@ -1105,28 +1126,3 @@ LogicalTapeSetBlocks(LogicalTapeSet *lts) { return lts->nFileBlocks; } - -/* - * Set buffer size to use, when reading from given tape. - */ -void -LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t avail_mem) -{ - LogicalTape *lt; - - Assert(tapenum >= 0 && tapenum < lts->nTapes); - lt = <s->tapes[tapenum]; - - /* - * The buffer size must be a multiple of BLCKSZ in size, so round the - * given value down to nearest BLCKSZ. Make sure we have at least one - * page. Also, don't go above MaxAllocSize, to avoid erroring out. A - * multi-gigabyte buffer is unlikely to be helpful, anyway. - */ - if (avail_mem < BLCKSZ) - avail_mem = BLCKSZ; - if (avail_mem > MaxAllocSize) - avail_mem = MaxAllocSize; - avail_mem -= avail_mem % BLCKSZ; - lt->read_buffer_size = avail_mem; -} diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index dd83e7a..56e9518 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -366,6 +366,8 @@ struct Tuplesortstate char *slabMemoryEnd; /* end of slab memory arena */ SlabSlot *slabFreeHead; /* head of free list */ + size_t read_buffer_size; + /* * When we return a tuple to the caller in tuplesort_gettuple_XXX, that * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE @@ -579,7 +581,6 @@ static bool useselection(Tuplesortstate *state); static void inittapes(Tuplesortstate *state); static void selectnewtape(Tuplesortstate *state); static void init_slab_allocator(Tuplesortstate *state, int numSlots); -static void init_tape_buffers(Tuplesortstate *state, int numInputTapes); static void mergeruns(Tuplesortstate *state); static void mergeonerun(Tuplesortstate *state); static void beginmerge(Tuplesortstate *state); @@ -2056,7 +2057,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, * end of the sort anyway, but better to release the * memory early. */ - LogicalTapeRewind(state->tapeset, srcTape, true); + LogicalTapeRewindForWrite(state->tapeset, srcTape); return true; } newtup.tupindex = srcTape; @@ -2512,72 +2513,6 @@ init_slab_allocator(Tuplesortstate *state, int numSlots) } /* - * Divide all remaining work memory (availMem) as read buffers, for all - * the tapes that will be used during the merge. - * - * We use the number of possible *input* tapes here, rather than maxTapes, - * for the calculation. At all times, we'll be reading from at most - * numInputTapes tapes, and one tape is used for output (unless we do an - * on-the-fly final merge, in which case we don't have an output tape). - */ -static void -init_tape_buffers(Tuplesortstate *state, int numInputTapes) -{ - int64 availBlocks; - int64 blocksPerTape; - int remainder; - int tapenum; - - /* - * Divide availMem evenly among the number of input tapes. - */ - availBlocks = state->availMem / BLCKSZ; - blocksPerTape = availBlocks / numInputTapes; - remainder = availBlocks % numInputTapes; - USEMEM(state, availBlocks * BLCKSZ); - -#ifdef TRACE_SORT - if (trace_sort) - elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes", - (availBlocks * BLCKSZ) / 1024, numInputTapes); -#endif - - /* - * Use one page per tape, even if we are out of memory. - * tuplesort_merge_order() should've chosen the number of tapes so that - * this can't happen, but better safe than sorry. (This also protects - * from a negative availMem.) - */ - if (blocksPerTape < 1) - { - blocksPerTape = 1; - remainder = 0; - } - - /* - * Set the buffers for the tapes. - * - * In a multi-phase merge, the tape that is initially used as an output - * tape, will later be rewound and read from, and should also use a large - * buffer at that point. So we must loop up to maxTapes, not just - * numInputTapes! - * - * If there are fewer runs than tapes, we will set the buffer size also - * for tapes that will go completely unused, but that's harmless. - * LogicalTapeAssignReadBufferSize() doesn't allocate the buffer - * immediately, it just sets the size that will be used, when the tape is - * rewound for read, and the tape isn't empty. - */ - for (tapenum = 0; tapenum < state->maxTapes; tapenum++) - { - int64 numBlocks = blocksPerTape + (tapenum < remainder ? 1 : 0); - - LogicalTapeAssignReadBufferSize(state->tapeset, tapenum, - numBlocks * BLCKSZ); - } -} - -/* * mergeruns -- merge all the completed initial runs. * * This implements steps D5, D6 of Algorithm D. All input data has @@ -2679,25 +2614,32 @@ mergeruns(Tuplesortstate *state) } /* - * Use all the spare memory we have available for read buffers for the - * tapes. + * Use all the spare memory we have available for read buffers among the + * input tapes. * * We do this only after checking for the case that we produced only one * initial run, because there is no need to use a large read buffer when * we're reading from a single tape. With one tape, the I/O pattern will * be the same regardless of the buffer size. * - * We don't try to "rebalance" the amount of memory among tapes, when we - * start a new merge phase, even if some tapes can be inactive in the - * phase. That would be hard, because logtape.c doesn't know where one - * run ends and another begins. When a new merge phase begins, and a tape - * doesn't participate in it, its buffer nevertheless already contains - * tuples from the next run on same tape, so we cannot release the buffer. - * That's OK in practice, merge performance isn't that sensitive to the - * amount of buffers used, and most merge phases use all or almost all - * tapes, anyway. + * We don't try to "rebalance" the memory among tapes, when we start a new + * merge phase, even if some tapes are inactive in the new phase. That + * would be hard, because logtape.c doesn't know where one run ends and + * another begins. When a new merge phase begins, and a tape doesn't + * participate in it, its buffer nevertheless already contains tuples from + * the next run on same tape, so we cannot release the buffer. That's OK + * in practice, merge performance isn't that sensitive to the amount of + * buffers used, and most merge phases use all or almost all tapes, + * anyway. */ - init_tape_buffers(state, numInputTapes); +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes", + (state->availMem) / 1024, numInputTapes); +#endif + + state->read_buffer_size = state->availMem / numInputTapes; + USEMEM(state, state->availMem); /* * Allocate a new 'memtuples' array, for the heap. It will hold one tuple @@ -2709,7 +2651,7 @@ mergeruns(Tuplesortstate *state) /* End of step D2: rewind all output tapes to prepare for merging */ for (tapenum = 0; tapenum < state->tapeRange; tapenum++) - LogicalTapeRewind(state->tapeset, tapenum, false); + LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size); for (;;) { @@ -2772,11 +2714,10 @@ mergeruns(Tuplesortstate *state) if (--state->Level == 0) break; /* rewind output tape T to use as new input */ - LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange], - false); + LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange], + state->read_buffer_size); /* rewind used-up input tape P, and prepare it for write pass */ - LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1], - true); + LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]); state->tp_runs[state->tapeRange - 1] = 0; /* @@ -2812,7 +2753,7 @@ mergeruns(Tuplesortstate *state) for (tapenum = 0; tapenum < state->maxTapes; tapenum++) { if (tapenum != state->result_tape) - LogicalTapeRewind(state->tapeset, tapenum, true); + LogicalTapeRewindForWrite(state->tapeset, tapenum); } } @@ -3174,9 +3115,9 @@ tuplesort_rescan(Tuplesortstate *state) state->markpos_eof = false; break; case TSS_SORTEDONTAPE: - LogicalTapeRewind(state->tapeset, - state->result_tape, - false); + LogicalTapeRewindForRead(state->tapeset, + state->result_tape, + 0); state->eof_reached = false; state->markpos_block = 0L; state->markpos_offset = 0; diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h index 362a619..10eb3a2 100644 --- a/src/include/utils/logtape.h +++ b/src/include/utils/logtape.h @@ -31,7 +31,8 @@ extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size); extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size); -extern void LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite); +extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size); +extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum); extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum); extern bool LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size); @@ -39,8 +40,6 @@ extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, long blocknum, int offset); extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum, long *blocknum, int *offset); -extern void LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, - size_t bufsize); extern long LogicalTapeSetBlocks(LogicalTapeSet *lts); #endif /* LOGTAPE_H */ -- 2.9.3
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers