On Wed, 2006-03-22 at 07:48 +0000, Simon Riggs wrote:
> On Tue, 2006-03-21 at 17:47 -0500, Tom Lane wrote:
>
> > I'm fairly unconvinced about Simon's underlying premise --- that we
> > can't make good use of work_mem in sorting after the run building phase
> > --- anyway.
>
> We can make good use of memory, but there does come a point in final
> merging where too much is of no further benefit. That point seems to be
> at about 256 blocks per tape; patch enclosed for testing. (256 blocks
> per tape roughly doubles performance over 32 blocks at that stage).
>
> That is never the case during run building - more is always better.
>
> > If we cut back our memory usage
> Simon inserts the words: "too far"
> > then we'll be forcing a
> > significantly more-random access pattern to the temp file(s) during
> > merging, because we won't be able to pre-read as much at a time.
>
> Yes, thats right.
>
> If we have 512MB of memory that gives us enough for 2000 tapes, yet the
> initial runs might only build a few runs. There's just no way that all
> 512MB of memory is needed to optimise the performance of reading in a
> few tapes at time of final merge.
>
> I'm suggesting we always keep 2MB per active tape, or the full
> allocation, whichever is lower. In the above example that could release
> over 500MB of memory, which more importantly can be reused by subsequent
> sorts if/when they occur.
>
>
> Enclose two patches:
> 1. mergebuffers.patch allows measurement of the effects of different
> merge buffer sizes, current default=32
>
> 2. reassign2.patch which implements the two kinds of resource
> deallocation/reassignment proposed.
Missed couple of minor points in patch: reassign3.patch attached ro
completely replace reassign2.patch.
Recent test results show that with a 512MB test sort we can reclaim 97%
of memory during final merge with only a noise level (+2%) increase in
overall elapsed time. (Thats just an example, your mileage may vary). So
a large query would use and keep about 536MB memory rather than 1536MB.
Best Regards, Simon Riggs
Index: src/backend/utils/sort/tuplesort.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v
retrieving revision 1.65
diff -c -r1.65 tuplesort.c
*** src/backend/utils/sort/tuplesort.c 10 Mar 2006 23:19:00 -0000 1.65
--- src/backend/utils/sort/tuplesort.c 22 Mar 2006 09:34:58 -0000
***************
*** 179,186 ****
*/
#define MINORDER 6 /* minimum merge order */
#define TAPE_BUFFER_OVERHEAD (BLCKSZ * 3)
! #define MERGE_BUFFER_SIZE (BLCKSZ * 32)
!
/*
* Private state of a Tuplesort operation.
*/
--- 179,187 ----
*/
#define MINORDER 6 /* minimum merge order */
#define TAPE_BUFFER_OVERHEAD (BLCKSZ * 3)
! #define OPTIMAL_MERGE_BUFFER_SIZE (BLCKSZ * 32)
! #define PREFERRED_MERGE_BUFFER_SIZE (BLCKSZ * 256)
! #define REUSE_SPACE_LIMIT RELSEG_SIZE
/*
* Private state of a Tuplesort operation.
*/
***************
*** 255,260 ****
--- 256,270 ----
*/
int currentRun;
+ /*
+ * These variables are used during final merge to reassign resources
+ * as they become available for each tape
+ */
+ int lastPrereadTape; /* last tape preread from */
+ int numPrereads; /* num times last tape has been selected */
+ int reassignableSlots; /* how many slots can be reassigned */
+ long reassignableMem; /* how much memory can be reassigned */
+
/*
* Unless otherwise noted, all pointer variables below are pointers
* to arrays of length maxTapes, holding per-tape data.
***************
*** 294,299 ****
--- 304,310 ----
int *tp_runs; /* # of real runs on each tape */
int *tp_dummy; /* # of dummy runs for each tape (D[]) */
int *tp_tapenum; /* Actual tape numbers (TAPE[]) */
+
int activeTapes; /* # of active input tapes in merge pass */
/*
***************
*** 398,408 ****
--- 409,423 ----
static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
+ static void grow_memtuples(Tuplesortstate *state);
+ static void shrink_memtuples(Tuplesortstate *state);
static void inittapes(Tuplesortstate *state);
static void selectnewtape(Tuplesortstate *state);
static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state);
+ static void assignResourcesUniformly(Tuplesortstate *state, bool initialAssignment);
+ static void reassignresources(Tuplesortstate *state, int srcTape);
static void mergepreread(Tuplesortstate *state);
static void mergeprereadone(Tuplesortstate *state, int srcTape);
static void dumptuples(Tuplesortstate *state, bool alltuples);
***************
*** 727,733 ****
* moves around with tuple addition/removal, this might result in thrashing.
* Small increases in the array size are likely to be pretty inefficient.
*/
! static bool
grow_memtuples(Tuplesortstate *state)
{
/*
--- 742,748 ----
* moves around with tuple addition/removal, this might result in thrashing.
* Small increases in the array size are likely to be pretty inefficient.
*/
! static void
grow_memtuples(Tuplesortstate *state)
{
/*
***************
*** 740,752 ****
* this assumption should be good. But let's check it.)
*/
if (state->availMem <= (long) (state->memtupsize * sizeof(SortTuple)))
! return false;
/*
* On a 64-bit machine, allowedMem could be high enough to get us into
* trouble with MaxAllocSize, too.
*/
if ((Size) (state->memtupsize * 2) >= MaxAllocSize / sizeof(SortTuple))
! return false;
FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
state->memtupsize *= 2;
--- 755,767 ----
* this assumption should be good. But let's check it.)
*/
if (state->availMem <= (long) (state->memtupsize * sizeof(SortTuple)))
! return;
/*
* On a 64-bit machine, allowedMem could be high enough to get us into
* trouble with MaxAllocSize, too.
*/
if ((Size) (state->memtupsize * 2) >= MaxAllocSize / sizeof(SortTuple))
! return;
FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
state->memtupsize *= 2;
***************
*** 756,762 ****
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
if (LACKMEM(state))
elog(ERROR, "unexpected out-of-memory situation during sort");
! return true;
}
/*
--- 771,849 ----
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
if (LACKMEM(state))
elog(ERROR, "unexpected out-of-memory situation during sort");
! return;
! }
!
! static void
! shrink_memtuples(Tuplesortstate *state)
! {
! int oldMemTupSize = state->memtupsize;
! int newMemTupSize;
! long oldMemTuplesAlloc = GetMemoryChunkSpace(state->memtuples);
!
! if (state->memtupcount > 0)
! {
! elog(LOG, "unexpected attempt to shrink sort memory");
! return;
! }
!
! Assert(state->status == TSS_FINALMERGE || state->status == TSS_SORTEDONTAPE);
!
! /*
! * Set new size, based upon earlier memory usage
! *
! * We don't care exactly how many tuples are stored, however we already
! * know that the existing setting filled available memory. The merge
! * order was set according to OPTIMAL_MERGE_BUFFER_SIZE, on the assumption
! * that we might have to cope with a merge of maxTapes. However, we
! * would prefer to merge using PREFERRED_MERGE_BUFFER_SIZE for each active
! * tape. Any more memory than this is likely to be a waste, so we can
! * reduce memory in proporton to the ratio of activeTapes to maxTapes
! * and thus allow it to be reused by subsequent sorts or hashes within
! * the same execution. For large memory settings this will be a major
! * win, since we may have allocated enough memory for 1000s of tapes,
! * yet might only need to merge back on a few tapes.
! */
! newMemTupSize = (int) ((state->memtupsize * state->activeTapes *
! (PREFERRED_MERGE_BUFFER_SIZE / OPTIMAL_MERGE_BUFFER_SIZE))/
! state->tapeRange);
!
! if (newMemTupSize >= state->memtupsize)
! return;
!
! pfree(state->memtuples);
! FREEMEM(state, oldMemTuplesAlloc);
!
! /*
! * We don't need a memtuples array at all if we have randomAccess
! */
! if (state->status == TSS_SORTEDONTAPE)
! {
! #ifdef TRACE_SORT
! if (trace_sort)
! elog(LOG, "releasing sort resources prior to enabling randomAccess: %s",
! pg_rusage_show(&state->ru_start));
! #endif
! state->memtupsize = 0;
! return;
! }
!
! state->memtupsize = newMemTupSize;
! state->memtuples = (SortTuple *)
! palloc(state->memtupsize * sizeof(SortTuple));
! USEMEM(state, oldMemTuplesAlloc);
!
! #ifdef TRACE_SORT
! if (trace_sort)
! elog(LOG, "shrinking resources to %d%% (from %d to %d slots): %s",
! (100*newMemTupSize) / oldMemTupSize,
! oldMemTupSize, state->memtupsize,
! pg_rusage_show(&state->ru_start));
! #endif
!
! if (LACKMEM(state))
! elog(ERROR, "unexpected out-of-memory situation during sort");
! return;
}
/*
***************
*** 834,840 ****
*/
if (state->memtupcount >= state->memtupsize - 1)
{
! (void) grow_memtuples(state);
Assert(state->memtupcount < state->memtupsize);
}
state->memtuples[state->memtupcount++] = *tuple;
--- 921,927 ----
*/
if (state->memtupcount >= state->memtupsize - 1)
{
! grow_memtuples(state);
Assert(state->memtupcount < state->memtupsize);
}
state->memtuples[state->memtupcount++] = *tuple;
***************
*** 1115,1120 ****
--- 1202,1209 ----
tuplesort_heap_siftup(state, false);
if ((tupIndex = state->mergenext[srcTape]) == 0)
{
+ reassignresources(state, srcTape);
+
/*
* out of preloaded data on this tape, try to read more
*
***************
*** 1125,1133 ****
--- 1214,1236 ----
/*
* if still no data, we've reached end of run on this tape
+ * so we can permanently reassign the resources used by
+ * srcTape onto any remaining tapes for remainder of the
+ * final merge
*/
if ((tupIndex = state->mergenext[srcTape]) == 0)
+ {
+ state->reassignableSlots += state->mergeavailslots[srcTape];
+ state->reassignableMem += state->mergeavailmem[srcTape];
+ state->numPrereads = 0;
+ #ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "final merge: tape %d exhausted: %s",
+ srcTape,
+ pg_rusage_show(&state->ru_start));
+ #endif
return true;
+ }
}
/* pull next preread tuple from list, insert in heap */
newtup = &state->memtuples[tupIndex];
***************
*** 1150,1155 ****
--- 1253,1365 ----
}
/*
+ * Assign space uniformly across remaining active tapes
+ */
+ static void
+ assignResourcesUniformly(Tuplesortstate *state, bool initialAssignment)
+ {
+ int slotsPerTape;
+ long spacePerTape;
+ int activeTapes = state->activeTapes;
+ int srcTape;
+
+ Assert(activeTapes > 0);
+
+ if (initialAssignment)
+ {
+ slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
+ Assert(slotsPerTape > 0);
+ spacePerTape = state->availMem / activeTapes;
+
+ for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
+ {
+ if (state->mergeactive[srcTape])
+ {
+ state->mergeavailslots[srcTape] = slotsPerTape;
+ state->mergeavailmem[srcTape] = spacePerTape;
+ }
+ }
+ }
+ else
+ {
+ slotsPerTape = state->reassignableSlots / activeTapes;
+ if (slotsPerTape == 0)
+ return;
+ spacePerTape = state->reassignableMem / activeTapes;
+ #ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "reassigning resources; each tape gets: +%d slots, +%ld mem: %s",
+ slotsPerTape,
+ spacePerTape,
+ pg_rusage_show(&state->ru_start));
+ #endif
+
+ for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
+ {
+ if (state->mergeactive[srcTape])
+ {
+ state->mergeavailslots[srcTape] += slotsPerTape;
+ state->mergeavailmem[srcTape] += spacePerTape;
+ if (--activeTapes <= 0)
+ break;
+ }
+ }
+ }
+
+ state->reassignableSlots = 0;
+ state->reassignableMem = 0;
+ }
+
+ /*
+ * If we have any reassignable resources from earlier tapes that have been
+ * made empty by previous final run prereads, consider how to reassign them
+ */
+ static void
+ reassignresources(Tuplesortstate *state, int srcTape)
+ {
+ if (state->reassignableSlots <= state->activeTapes)
+ return;
+
+ /*
+ * We expect each tape to need to be preread about 2*number of tapes in
+ * final merge. That gives us time to decide whether we should allocate the
+ * reassignable resources evenly, or whether we should give them all to a
+ * single tape, which may be appropriate in some cases
+ */
+ if (state->numPrereads == 0 ||
+ state->lastPrereadTape == srcTape ||
+ state->activeTapes == 1)
+ {
+ state->numPrereads++;
+ state->lastPrereadTape = srcTape;
+
+ /*
+ * If confirm that the emerging pattern of prereads
+ * is consistently on a single tape, then hand
+ * over all the spare resources to that tape
+ */
+ if (state->numPrereads > 3 || state->activeTapes == 1)
+ {
+ #ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "reassigning memory to tape %d, +%d slots, +%ld mem: %s",
+ srcTape,
+ state->reassignableSlots,
+ state->reassignableMem,
+ pg_rusage_show(&state->ru_start));
+ #endif
+ state->mergeavailslots[srcTape] += state->reassignableSlots;
+ state->mergeavailmem[srcTape] += state->reassignableMem;
+ state->reassignableSlots = 0;
+ state->reassignableMem = 0;
+ state->numPrereads = 0;
+ }
+ }
+ else
+ assignResourcesUniformly(state, false);
+ }
+
+ /*
* Fetch the next tuple in either forward or back direction.
* Returns NULL if no more tuples. If *should_free is set, the
* caller must pfree the returned tuple when done with it.
***************
*** 1231,1237 ****
* the MERGE_BUFFER_SIZE workspace.
*/
mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
! (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
/* Even in minimum memory, use at least a MINORDER merge */
mOrder = Max(mOrder, MINORDER);
--- 1441,1447 ----
* the MERGE_BUFFER_SIZE workspace.
*/
mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
! (OPTIMAL_MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
/* Even in minimum memory, use at least a MINORDER merge */
mOrder = Max(mOrder, MINORDER);
***************
*** 1436,1443 ****
/* Tell logtape.c we won't be writing anymore */
LogicalTapeSetForgetFreeSpace(state->tapeset);
/* Initialize for the final merge pass */
- beginmerge(state);
state->status = TSS_FINALMERGE;
return;
}
}
--- 1646,1653 ----
/* Tell logtape.c we won't be writing anymore */
LogicalTapeSetForgetFreeSpace(state->tapeset);
/* Initialize for the final merge pass */
state->status = TSS_FINALMERGE;
+ beginmerge(state);
return;
}
}
***************
*** 1506,1511 ****
--- 1716,1722 ----
state->result_tape = state->tp_tapenum[state->tapeRange];
LogicalTapeFreeze(state->tapeset, state->result_tape);
state->status = TSS_SORTEDONTAPE;
+ shrink_memtuples(state);
}
/*
***************
*** 1523,1528 ****
--- 1734,1740 ----
SortTuple *tup;
long priorAvail,
spaceFreed;
+ int numtapes;
/*
* Start the merge by loading one tuple from each active source tape into
***************
*** 1530,1535 ****
--- 1742,1749 ----
*/
beginmerge(state);
+ numtapes = state->activeTapes;
+
/*
* Execute merge by repeatedly extracting lowest tuple in heap, writing it
* out, and replacing it with next tuple from same tape (if there is
***************
*** 1576,1582 ****
#ifdef TRACE_SORT
if (trace_sort)
! elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
pg_rusage_show(&state->ru_start));
#endif
}
--- 1790,1796 ----
#ifdef TRACE_SORT
if (trace_sort)
! elog(LOG, "finished %d-way merge step: %s", numtapes,
pg_rusage_show(&state->ru_start));
#endif
}
***************
*** 1595,1602 ****
int activeTapes;
int tapenum;
int srcTape;
- int slotsPerTape;
- long spacePerTape;
/* Heap should be empty here */
Assert(state->memtupcount == 0);
--- 1809,1814 ----
***************
*** 1628,1649 ****
state->mergefreelist = 0; /* nothing in the freelist */
state->mergefirstfree = activeTapes; /* 1st slot avail for preread */
! /*
! * Initialize space allocation to let each active input tape have an equal
! * share of preread space.
! */
! Assert(activeTapes > 0);
! slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
! Assert(slotsPerTape > 0);
! spacePerTape = state->availMem / activeTapes;
! for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
! {
! if (state->mergeactive[srcTape])
! {
! state->mergeavailslots[srcTape] = slotsPerTape;
! state->mergeavailmem[srcTape] = spacePerTape;
! }
! }
/*
* Preread as many tuples as possible (and at least one) from each active
--- 1840,1849 ----
state->mergefreelist = 0; /* nothing in the freelist */
state->mergefirstfree = activeTapes; /* 1st slot avail for preread */
! if (state->status == TSS_FINALMERGE)
! shrink_memtuples(state);
!
! assignResourcesUniformly(state, true);
/*
* Preread as many tuples as possible (and at least one) from each active
***************
*** 1722,1727 ****
--- 1922,1928 ----
if (!state->mergeactive[srcTape])
return; /* tape's run is already exhausted */
+
priorAvail = state->availMem;
state->availMem = state->mergeavailmem[srcTape];
while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
***************
*** 1730,1736 ****
--- 1931,1939 ----
/* read next tuple, if any */
if ((tuplen = getlen(state, srcTape, true)) == 0)
{
+ /* remove this tape from the active set */
state->mergeactive[srcTape] = false;
+ --state->activeTapes;
break;
}
READTUP(state, &stup, srcTape, tuplen);
---------------------------(end of broadcast)---------------------------
TIP 2: Don't 'kill -9' the postmaster