On 19.8.2014 19:05, Robert Haas wrote: > On Sat, Aug 16, 2014 at 9:31 AM, Tomas Vondra <t...@fuzzy.cz> wrote: >> On 12.8.2014 00:30, Tomas Vondra wrote: >>> On 11.8.2014 20:25, Robert Haas wrote: >>>> It also strikes me that when there's only 1 batch, the set of bits >>>> that map onto the batch number is zero-width, and one zero-width bit >>>> range is as good as another. In other words, if you're only planning >>>> to do one batch, you can easily grow the number of buckets on the fly. >>>> Growing the number of buckets only becomes difficult once you have >>>> more than one batch. >> >> ... >> >>> I was considering using reversing the bits of the hash, because that's >>> pretty much the simplest solution. But I think you're right it might >>> actually work like this: >>> >>> * Are more batches needed? >>> >>> (yes) => just use nbuckets = my_log2(work_mem / tuple_size) >>> >>> (no) => go ahead, until processing all tuples or hitting work_mem >>> >>> (work_mem) => meh, use the same nbuckets above >>> >>> (all tuples) => compute optimal nbuckets / resize >>> >>> >>> But I need to think about this a bit. So far it seems to me there's no >>> way additional batches might benefit from increasing nbuckets further. >> >> I think this is a simple and solid solution, solving the batchno >> computation issues quite nicely. Attached is v10 patch (bare and >> combined with the dense allocation), that does this: >> >> 1) when we know we'll need batching, buckets are sized for full work_mem >> (using the estimated tuple width, etc.) >> >> 2) without the batching, we estimate the 'right number of buckets' for >> the estimated number of tuples, and keep track of the optimal number >> as tuples are added to the hash table >> >> - if we discover we need to start batching, we keep the current >> optimal value (which should be the same as the max number of >> buckets) and don't mess with it anymore (making it possible to >> compute batch IDs just like before) >> >> - also, on the fist rebatch (nbatch=1 => nbatch=2) the hash table >> is resized as part of the rebatch >> >> - if the hash build completes without batching, we do the resize >> >> I believe the patch is pretty much perfect. I plan to do more thorough >> testing on a wide range of queries in the next few days. >> >> I also removed the 'enable_hash_resize' GUC, because it would be more >> complex to implement this properly after doing the resize as part of >> rebatch etc.. So either it would make the patch more complex, or it >> wouldn't do what the name promises. > > A variety of trivial comments on this: > > PostgreSQL style is un-cuddled curly braces. Also, multi-line > comments need to start with a line containing only /* and end with a > line containing only */. In one place you've added curly braces > around a single-line block that is otherwise unmodified; please don't > do that. In one place, you have "becase" instead of "because". In > another place, you write "add if after it" but it should say "add it > after it" or maybe better "add the new one after it". Avoid using > punctuation like "=>" in comments to illustrate the connection between > sentences; instead, use a connecting word like "then" or "therefore" > or whatever is appropriate; in this instance, a period followed by the > start of a new sentence seems sufficient. Revert the removal of a > single line of whitespace near the top of nodeHash.c. > > There are too many things marked XXX in this patch. They should > either be fixed, if they are real problems, or they should be > commented in a way that doesn't give rise to the idea that they're > problems if they aren't.

OK, thanks for pointing this out. Attached is v11 of the patch (both separate and combined with the dense allocation, as before). I fixed as many of those issues as possible. All the XXX items were obsolete, except for one in the chunk_alloc function. I have also removed one constant > > OK, now on to some more substantive stuff: > > 1. It's not clear to me what the overall effect of this patch on > memory utilization is. Reducing NTUP_PER_BUCKET from 10 to 1 is going > to use, on the average, 10x as much bucket-header memory per tuple. > Specifically, I think it means we'll use about 8 bytes of > bucket-header memory per tuple instead of 0.8 bytes per tuple. If the > tuples are narrow, that could be significant; concerns have been > expressed about that here in the past. Increasing the number of > buckets could also increase memory usage. On the other hand, the > dense allocation stuff probably saves a ton of memory, so maybe we end > up overall, but I'm not sure. Your thoughts, and maybe some test > results with narrow and wide tuples, would be appreciated. The effect of the dense allocation was briefly discussed in this thread, along with some quick measurements: http://www.postgresql.org/message-id/53beea9e.2080...@fuzzy.cz The dense allocation removes pretty much all the palloc overhead. For a 40B tuple, I did get this before the dense allocation HashBatchContext: 1451221040 total in 182 blocks; 2826592 free (11 chunks); 1448394448 used and this with the dense allocation patch applied HashBatchContext: 729651520 total in 21980 blocks; 0 free (0 chunks); 729651520 used So it's pretty much 50% reduction in memory consumption. Now, the palloc header is >=16B, and removing this more than compensates the increased number of buckets (in the worst case, there may be ~2x buckets per tuple, which is 2x8B pointers). I did a number of tests, and the results are quite consistent with this. > 2. But, on the positive side, modulo the memory utilization questions > mentioned above, I would expect the impact on hash join performance to > be positive. Going from 10 tuples per bucket to just 1 should help, > and on cases where the actual load factor would have ended up much > higher because of poor estimation, increasing the number of buckets on > the fly should help even more. I haven't tested this, though. Yes, that's the results I was getting. I've done a number of tests, and not a single one showed a slowdown so far. Most well-estimated queries were 2-3x faster, and the poorly estimated ones were orders of magnitude faster. We're working on getting this fixed on a range of workloads, I'll post some results once I have that. > I haven't had a chance to completely go through this yet, so these are > just some initial thoughts. Thanks! Tomas

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 781a736..d128e08 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1900,18 +1900,21 @@ show_hash_info(HashState *hashstate, ExplainState *es) if (es->format != EXPLAIN_FORMAT_TEXT) { ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es); + ExplainPropertyLong("Original Hash Buckets", + hashtable->nbuckets_original, es); ExplainPropertyLong("Hash Batches", hashtable->nbatch, es); ExplainPropertyLong("Original Hash Batches", hashtable->nbatch_original, es); ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es); } - else if (hashtable->nbatch_original != hashtable->nbatch) + else if ((hashtable->nbatch_original != hashtable->nbatch) || + (hashtable->nbuckets_original != hashtable->nbuckets)) { appendStringInfoSpaces(es->str, es->indent * 2); appendStringInfo(es->str, - "Buckets: %d Batches: %d (originally %d) Memory Usage: %ldkB\n", - hashtable->nbuckets, hashtable->nbatch, - hashtable->nbatch_original, spacePeakKb); + "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n", + hashtable->nbuckets, hashtable->nbuckets_original, + hashtable->nbatch, hashtable->nbatch_original, spacePeakKb); } else { diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 6a9d956..688b6b4 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -37,8 +37,8 @@ #include "utils/lsyscache.h" #include "utils/syscache.h" - static void ExecHashIncreaseNumBatches(HashJoinTable hashtable); +static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable); static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse); static void ExecHashSkewTableInsert(HashJoinTable hashtable, @@ -49,6 +49,9 @@ static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); static char * chunk_alloc(HashJoinTable hashtable, int tupleSize); +/* Memory needed for optimal number of buckets. */ +#define BUCKETS_SPACE(htab) ((htab)->nbuckets_optimal * sizeof(HashJoinTuple)) + /* ---------------------------------------------------------------- * ExecHash * @@ -117,6 +120,7 @@ MultiExecHash(HashState *node) /* It's a skew tuple, so put it into that hash table */ ExecHashSkewTableInsert(hashtable, slot, hashvalue, bucketNumber); + hashtable->skewTuples += 1; } else { @@ -127,10 +131,31 @@ MultiExecHash(HashState *node) } } + /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ + if (hashtable->nbuckets != hashtable->nbuckets_optimal) + { + + /* We never decrease the number of buckets. */ + Assert(hashtable->nbuckets_optimal > hashtable->nbuckets); + +#ifdef HJDEBUG + printf("Increasing nbuckets %d => %d\n", + hashtable->nbuckets, hashtable->nbuckets_optimal); +#endif + + ExecHashIncreaseNumBuckets(hashtable); + + } + + /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ + hashtable->spaceUsed += BUCKETS_SPACE(hashtable); + if (hashtable->spaceUsed > hashtable->spacePeak) + hashtable->spacePeak = hashtable->spaceUsed; + /* must provide our own instrumentation support */ if (node->ps.instrument) InstrStopNode(node->ps.instrument, hashtable->totalTuples); - + /* * We do not return the hash table directly because it's not a subtype of * Node, and so would violate the MultiExecProcNode API. Instead, our @@ -224,6 +249,7 @@ ExecEndHash(HashState *node) ExecEndNode(outerPlan); } + /* ---------------------------------------------------------------- * ExecHashTableCreate * @@ -271,7 +297,10 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) */ hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData)); hashtable->nbuckets = nbuckets; + hashtable->nbuckets_original = nbuckets; + hashtable->nbuckets_optimal = nbuckets; hashtable->log2_nbuckets = log2_nbuckets; + hashtable->log2_nbuckets_optimal = log2_nbuckets; hashtable->buckets = NULL; hashtable->keepNulls = keepNulls; hashtable->skewEnabled = false; @@ -285,6 +314,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) hashtable->nbatch_outstart = nbatch; hashtable->growEnabled = true; hashtable->totalTuples = 0; + hashtable->skewTuples = 0; hashtable->innerBatchFile = NULL; hashtable->outerBatchFile = NULL; hashtable->spaceUsed = 0; @@ -386,7 +416,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) */ /* Target bucket loading (tuples per bucket) */ -#define NTUP_PER_BUCKET 10 +#define NTUP_PER_BUCKET 1 void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, @@ -396,6 +426,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, { int tupsize; double inner_rel_bytes; + double buckets_bytes; long hash_table_bytes; long skew_table_bytes; long max_pointers; @@ -418,6 +449,13 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, inner_rel_bytes = ntuples * tupsize; /* + * Estimate memory needed for buckets, assuming all the tuples fit into + * a single batch (consider NTUP_PER_BUCKET tuples per bucket) - buckets + * are just usual 'HashJoinTuple' (pointers to HashJoinTupleData). + */ + buckets_bytes = sizeof(HashJoinTuple) * my_log2(ntuples / NTUP_PER_BUCKET); + + /* * Target in-memory hashtable size is work_mem kilobytes. */ hash_table_bytes = work_mem * 1024L; @@ -468,16 +506,13 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, /* also ensure we avoid integer overflow in nbatch and nbuckets */ max_pointers = Min(max_pointers, INT_MAX / 2); - if (inner_rel_bytes > hash_table_bytes) + if ((inner_rel_bytes + buckets_bytes) > hash_table_bytes) { /* We'll need multiple batches */ long lbuckets; double dbatch; int minbatch; - - lbuckets = (hash_table_bytes / tupsize) / NTUP_PER_BUCKET; - lbuckets = Min(lbuckets, max_pointers); - nbuckets = (int) lbuckets; + double batch_bytes; dbatch = ceil(inner_rel_bytes / hash_table_bytes); dbatch = Min(dbatch, max_pointers); @@ -485,6 +520,40 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, nbatch = 2; while (nbatch < minbatch) nbatch <<= 1; + + /* assuming equally-sized batches, compute bytes of hash table */ + batch_bytes = inner_rel_bytes / nbatch; + + /* when batching, for the buckets, assume full work_mem */ + lbuckets = 1 << my_log2(hash_table_bytes / (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple))); + buckets_bytes = lbuckets * sizeof(HashJoinTuple); + + /* + * Increase the nbatch until we get both tuples and buckets into work_mem. + * + * The loop should not execute more than once in most cases, becase tuples are + * usually much wider than buckets (usually 8B pointers), so by using only + * (batch_bytes/2) should get us below work_mem. + * + * The worst case is that (nbuckets == 2*ntuples-1), giving us about twice the + * number of buckets, i.e. about 2*sizeof(void*) per tuple. But that's + * the consequence of how NTUP_PER_BUCKET is chosen, and work_mem limit. + */ + + while (batch_bytes + buckets_bytes > hash_table_bytes) + { + + /* increment number of batches, and re-evaluate the amount of memory */ + nbatch <<= 1; + + /* assuming equally-sized batches, compute bytes of hash table and buckets */ + batch_bytes = inner_rel_bytes / nbatch; + + } + + /* protect against nbucket overflow */ + lbuckets = Min(lbuckets, max_pointers); + nbuckets = (int) lbuckets; } else { @@ -561,6 +630,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) MemoryContext oldcxt; long ninmemory; long nfreed; + HashChunk oldchunks = hashtable->chunks_batch; /* do nothing if we've decided to shut off growth */ @@ -614,24 +684,38 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) */ ninmemory = nfreed = 0; - /* we're going to scan through the chunks directly, not buckets, so we can reset the - * buckets and reinsert all the tuples there - just set them to NULL */ - memset(hashtable->buckets, 0, sizeof(void*) * hashtable->nbuckets); + /* + * We will scan the tuples through chunks, not buckets, so we can reset the + * buckets and reinsert all the tuples there. So resetting buckets is OK. + */ + memset(hashtable->buckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets); + + /* If we need to do a resize of buckets, we can do it while rebatching. */ + if (hashtable->nbuckets_optimal != hashtable->nbuckets) + { + hashtable->nbuckets = hashtable->nbuckets_optimal; + hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; - /* reset the chunks (we have a copy in oldchunks) - this way we'll allocate */ + hashtable->buckets = repalloc(hashtable->buckets, + sizeof(HashJoinTuple) * hashtable->nbuckets); + } + + /* Reset the chunks, so that we allocate everything into a new list of chunks. */ hashtable->chunks_batch = NULL; - /* so, let's scan through the old chunks, and all tuples in each chunk */ - while (oldchunks != NULL) { + /* Scan through the old chunks, keep only tuples from the current batch. */ + while (oldchunks != NULL) + { /* pointer to the next memory chunk (may be NULL for the last chunk) */ HashChunk nextchunk = oldchunks->next; - /* position within the buffer (up to oldchunks->used) */ - size_t idx = 0; + /* position within the chunk (up to oldchunks->used) */ + size_t idx = 0; /* process all tuples stored in this chunk (and then free it) */ - while (idx < oldchunks->used) { + while (idx < oldchunks->used) + { /* get the hashjoin tuple and mintuple stored right after it */ HashJoinTuple hashTuple = (HashJoinTuple)(oldchunks->data + idx); @@ -703,6 +787,93 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } /* + * ExecHashIncreaseNumBuckets + * increase the original number of buckets in order to reduce + * number of tuples per bucket + */ +static void +ExecHashIncreaseNumBuckets(HashJoinTable hashtable) +{ + HashChunk chunk; + + /* do nothing if not an increase (it's called increase for a reason) */ + if (hashtable->nbuckets >= hashtable->nbuckets_optimal) + return; + + /* + * We already know the optimal number of buckets, so let's just + * compute the log2_nbuckets for it. + */ + hashtable->nbuckets = hashtable->nbuckets_optimal; + hashtable->log2_nbuckets = my_log2(hashtable->nbuckets_optimal); + + Assert(hashtable->nbuckets > 1); + Assert(hashtable->nbuckets <= (INT_MAX/2)); + Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets)); + +#ifdef HJDEBUG + printf("Increasing nbuckets to %d\n", hashtable->nbuckets); +#endif + + /* + * Just reallocate the proper number of buckets - we don't need to + * walk through them - we can walk the dense-allocated chunks + * (just like in ExecHashIncreaseNumBatches, but without all the + * copying into new chunks) + */ + + hashtable->buckets = (HashJoinTuple *) repalloc(hashtable->buckets, + hashtable->nbuckets * sizeof(HashJoinTuple)); + + memset(hashtable->buckets, 0, sizeof(void*) * hashtable->nbuckets); + + /* scan through all tuples in all chunks, rebuild the hash table */ + chunk = hashtable->chunks_batch; + while (chunk != NULL) + { + + /* position within the buffer (up to chunk->used) */ + size_t idx = 0; + + /* process all tuples stored in this chunk (and then free it) */ + while (idx < chunk->used) + { + + /* get the hashjoin tuple and mintuple stored right after it */ + HashJoinTuple hashTuple = (HashJoinTuple)(chunk->data + idx); + MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); + + int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len); + + int bucketno; + int batchno; + + ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, + &bucketno, &batchno); + + /* just add the tuple to the proper bucket */ + hashTuple->next = hashtable->buckets[bucketno]; + hashtable->buckets[bucketno] = hashTuple; + + /* bytes occupied in memory HJ tuple overhead + actual tuple length */ + idx += hashTupleSize; + + } + + /* proceed to the next chunk */ + chunk = chunk->next; + + } + +#ifdef HJDEBUG + printf("Nbuckets increased to %d, average items per bucket %.1f\n", + hashtable->nbuckets, batchTuples / hashtable->nbuckets); +#endif + +} + + +/* * ExecHashTableInsert * insert a tuple into the hash table depending on the hash value * it may just go to a temp file for later batches @@ -755,12 +926,34 @@ ExecHashTableInsert(HashJoinTable hashtable, hashTuple->next = hashtable->buckets[bucketno]; hashtable->buckets[bucketno] = hashTuple; + /* + * Increase the (optimal) number of buckets if we just exceeded NTUP_PER_BUCKET, + * but only when there's still a single batch at this moment. + */ + if ((hashtable->nbatch == 1) && + ((hashtable->totalTuples - hashtable->skewTuples) + >= (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))) + { + /* overflow protection */ + if (hashtable->nbuckets_optimal <= (INT_MAX/2)) + { + hashtable->nbuckets_optimal *= 2; + hashtable->log2_nbuckets_optimal += 1; + } + } + /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; if (hashtable->spaceUsed > hashtable->spacePeak) hashtable->spacePeak = hashtable->spaceUsed; - if (hashtable->spaceUsed > hashtable->spaceAllowed) + + /* + * Do we need to increase number of batches? Account for space required + * by buckets (optimal number). + */ + if (hashtable->spaceUsed + BUCKETS_SPACE(hashtable) > hashtable->spaceAllowed) ExecHashIncreaseNumBatches(hashtable); + } else { @@ -883,7 +1076,10 @@ ExecHashGetHashValue(HashJoinTable hashtable, * functions are good about randomizing all their output bits, else we are * likely to have very skewed bucket or batch occupancy.) * - * nbuckets doesn't change over the course of the join. + * The nbuckets/log2_nbuckets may change while (nbatch==1) because of dynamic + * buckets growth. Once we start batching, the value is fixed and does not + * change over the course of join (making it possible to compute batch number + * the way we do here). * * nbatch is always a power of 2; we increase it only by doubling it. This * effectively adds one more bit to the top of the batchno. @@ -1090,8 +1286,10 @@ ExecHashTableReset(HashJoinTable hashtable) MemoryContextSwitchTo(oldcxt); - /* reset the chunks too (the memory was allocated within batchCxt, so it's - * already freed by resetting the batch context -just set it to NULL) */ + /* + * Reset the chunks too (the memory was allocated within batchCxt, so it's + * already freed by resetting the batch context -just set it to NULL). + */ hashtable->chunks_batch = NULL; } @@ -1347,23 +1545,32 @@ ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue) static char * chunk_alloc(HashJoinTable hashtable, int size) { - /* maybe we should use MAXALIGN(size) here ... */ + /* XXX maybe we should use MAXALIGN(size) here ... */ - /* if tuple size is larger than of 1/8 of chunk size, allocate a separate chunk */ - if (size > (HASH_CHUNK_SIZE/8)) { + /* if requested size is larger than of 1/8 of chunk size, allocate a separate chunk */ + if (size > (HASH_CHUNK_SIZE/8)) + { /* allocate new chunk and put it at the beginning of the list */ - HashChunk newChunk = (HashChunk)MemoryContextAlloc(hashtable->batchCxt, offsetof(HashChunkData, data) + size); + HashChunk newChunk + = (HashChunk)MemoryContextAlloc(hashtable->batchCxt, + offsetof(HashChunkData, data) + size); newChunk->maxlen = size; newChunk->used = 0; newChunk->ntuples = 0; - /* If there already is a chunk, add if after it so we can still use the space in it */ - if (hashtable->chunks_batch != NULL) { + /* + * If there already is a chunk, add the new one after it, so we can still use the + * space in the existing one. + */ + if (hashtable->chunks_batch != NULL) + { newChunk->next = hashtable->chunks_batch->next; hashtable->chunks_batch->next = newChunk; - } else { + } + else + { newChunk->next = hashtable->chunks_batch; hashtable->chunks_batch = newChunk; } @@ -1375,12 +1582,18 @@ char * chunk_alloc(HashJoinTable hashtable, int size) { } - /* ok, it's within chunk size, let's see if we have enough space for it in the current - * chunk => if not, allocate a new chunk (chunks==NULL means this is the first chunk) */ - if ((hashtable->chunks_batch == NULL) || (hashtable->chunks_batch->maxlen - hashtable->chunks_batch->used) < size) { + /* + * Requested size is less than 1/8 of a chunk, so place it in the current chunk if there + * is enough free space. If not, allocate a new chunk and add it there. + */ + if ((hashtable->chunks_batch == NULL) || + (hashtable->chunks_batch->maxlen - hashtable->chunks_batch->used) < size) + { /* allocate new chunk and put it at the beginning of the list */ - HashChunk newChunk = (HashChunk)MemoryContextAlloc(hashtable->batchCxt, offsetof(HashChunkData, data) + HASH_CHUNK_SIZE); + HashChunk newChunk + = (HashChunk)MemoryContextAlloc(hashtable->batchCxt, + offsetof(HashChunkData, data) + HASH_CHUNK_SIZE); newChunk->maxlen = HASH_CHUNK_SIZE; newChunk->used = 0; @@ -1421,7 +1634,6 @@ ExecHashSkewTableInsert(HashJoinTable hashtable, hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt, hashTupleSize); - hashTuple->hashvalue = hashvalue; memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index c80a4d1..eda7502 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -103,7 +103,6 @@ typedef struct HashSkewBucket #define SKEW_MIN_OUTER_FRACTION 0.01 #define HASH_CHUNK_SIZE (16*1024L) /* 16kB chunks by default */ -#define HASH_SKEW_CHUNK_SIZE (4*1024L) /* 4kB chunks for skew buckets */ typedef struct HashChunkData { @@ -124,6 +123,10 @@ typedef struct HashJoinTableData int nbuckets; /* # buckets in the in-memory hash table */ int log2_nbuckets; /* its log2 (nbuckets must be a power of 2) */ + int nbuckets_original; /* # buckets when starting the first hash */ + int nbuckets_optimal; /* optimal # buckets (per batch) */ + int log2_nbuckets_optimal; /* same as log2_nbuckets optimal */ + /* buckets[i] is head of list of tuples in i'th in-memory bucket */ struct HashJoinTupleData **buckets; /* buckets array is per-batch storage, as are all the tuples */ @@ -145,6 +148,7 @@ typedef struct HashJoinTableData bool growEnabled; /* flag to shut off nbatch increases */ double totalTuples; /* # tuples obtained from inner plan */ + double skewTuples; /* # tuples inserted into skew tuples */ /* * These arrays are allocated for the life of the hash join, but only if

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 781a736..d128e08 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1900,18 +1900,21 @@ show_hash_info(HashState *hashstate, ExplainState *es) if (es->format != EXPLAIN_FORMAT_TEXT) { ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es); + ExplainPropertyLong("Original Hash Buckets", + hashtable->nbuckets_original, es); ExplainPropertyLong("Hash Batches", hashtable->nbatch, es); ExplainPropertyLong("Original Hash Batches", hashtable->nbatch_original, es); ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es); } - else if (hashtable->nbatch_original != hashtable->nbatch) + else if ((hashtable->nbatch_original != hashtable->nbatch) || + (hashtable->nbuckets_original != hashtable->nbuckets)) { appendStringInfoSpaces(es->str, es->indent * 2); appendStringInfo(es->str, - "Buckets: %d Batches: %d (originally %d) Memory Usage: %ldkB\n", - hashtable->nbuckets, hashtable->nbatch, - hashtable->nbatch_original, spacePeakKb); + "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n", + hashtable->nbuckets, hashtable->nbuckets_original, + hashtable->nbatch, hashtable->nbatch_original, spacePeakKb); } else { diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 589b2f1..688b6b4 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -37,8 +37,8 @@ #include "utils/lsyscache.h" #include "utils/syscache.h" - static void ExecHashIncreaseNumBatches(HashJoinTable hashtable); +static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable); static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse); static void ExecHashSkewTableInsert(HashJoinTable hashtable, @@ -47,6 +47,10 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable, int bucketNumber); static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); +static char * chunk_alloc(HashJoinTable hashtable, int tupleSize); + +/* Memory needed for optimal number of buckets. */ +#define BUCKETS_SPACE(htab) ((htab)->nbuckets_optimal * sizeof(HashJoinTuple)) /* ---------------------------------------------------------------- * ExecHash @@ -116,6 +120,7 @@ MultiExecHash(HashState *node) /* It's a skew tuple, so put it into that hash table */ ExecHashSkewTableInsert(hashtable, slot, hashvalue, bucketNumber); + hashtable->skewTuples += 1; } else { @@ -126,6 +131,27 @@ MultiExecHash(HashState *node) } } + /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ + if (hashtable->nbuckets != hashtable->nbuckets_optimal) + { + + /* We never decrease the number of buckets. */ + Assert(hashtable->nbuckets_optimal > hashtable->nbuckets); + +#ifdef HJDEBUG + printf("Increasing nbuckets %d => %d\n", + hashtable->nbuckets, hashtable->nbuckets_optimal); +#endif + + ExecHashIncreaseNumBuckets(hashtable); + + } + + /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ + hashtable->spaceUsed += BUCKETS_SPACE(hashtable); + if (hashtable->spaceUsed > hashtable->spacePeak) + hashtable->spacePeak = hashtable->spaceUsed; + /* must provide our own instrumentation support */ if (node->ps.instrument) InstrStopNode(node->ps.instrument, hashtable->totalTuples); @@ -271,7 +297,10 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) */ hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData)); hashtable->nbuckets = nbuckets; + hashtable->nbuckets_original = nbuckets; + hashtable->nbuckets_optimal = nbuckets; hashtable->log2_nbuckets = log2_nbuckets; + hashtable->log2_nbuckets_optimal = log2_nbuckets; hashtable->buckets = NULL; hashtable->keepNulls = keepNulls; hashtable->skewEnabled = false; @@ -285,6 +314,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) hashtable->nbatch_outstart = nbatch; hashtable->growEnabled = true; hashtable->totalTuples = 0; + hashtable->skewTuples = 0; hashtable->innerBatchFile = NULL; hashtable->outerBatchFile = NULL; hashtable->spaceUsed = 0; @@ -294,6 +324,8 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) hashtable->spaceAllowedSkew = hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100; + hashtable->chunks_batch = NULL; + /* * Get info about the hash functions to be used for each hash key. Also * remember whether the join operators are strict. @@ -384,7 +416,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) */ /* Target bucket loading (tuples per bucket) */ -#define NTUP_PER_BUCKET 10 +#define NTUP_PER_BUCKET 1 void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, @@ -394,6 +426,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, { int tupsize; double inner_rel_bytes; + double buckets_bytes; long hash_table_bytes; long skew_table_bytes; long max_pointers; @@ -416,6 +449,13 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, inner_rel_bytes = ntuples * tupsize; /* + * Estimate memory needed for buckets, assuming all the tuples fit into + * a single batch (consider NTUP_PER_BUCKET tuples per bucket) - buckets + * are just usual 'HashJoinTuple' (pointers to HashJoinTupleData). + */ + buckets_bytes = sizeof(HashJoinTuple) * my_log2(ntuples / NTUP_PER_BUCKET); + + /* * Target in-memory hashtable size is work_mem kilobytes. */ hash_table_bytes = work_mem * 1024L; @@ -466,16 +506,13 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, /* also ensure we avoid integer overflow in nbatch and nbuckets */ max_pointers = Min(max_pointers, INT_MAX / 2); - if (inner_rel_bytes > hash_table_bytes) + if ((inner_rel_bytes + buckets_bytes) > hash_table_bytes) { /* We'll need multiple batches */ long lbuckets; double dbatch; int minbatch; - - lbuckets = (hash_table_bytes / tupsize) / NTUP_PER_BUCKET; - lbuckets = Min(lbuckets, max_pointers); - nbuckets = (int) lbuckets; + double batch_bytes; dbatch = ceil(inner_rel_bytes / hash_table_bytes); dbatch = Min(dbatch, max_pointers); @@ -483,6 +520,40 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, nbatch = 2; while (nbatch < minbatch) nbatch <<= 1; + + /* assuming equally-sized batches, compute bytes of hash table */ + batch_bytes = inner_rel_bytes / nbatch; + + /* when batching, for the buckets, assume full work_mem */ + lbuckets = 1 << my_log2(hash_table_bytes / (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple))); + buckets_bytes = lbuckets * sizeof(HashJoinTuple); + + /* + * Increase the nbatch until we get both tuples and buckets into work_mem. + * + * The loop should not execute more than once in most cases, becase tuples are + * usually much wider than buckets (usually 8B pointers), so by using only + * (batch_bytes/2) should get us below work_mem. + * + * The worst case is that (nbuckets == 2*ntuples-1), giving us about twice the + * number of buckets, i.e. about 2*sizeof(void*) per tuple. But that's + * the consequence of how NTUP_PER_BUCKET is chosen, and work_mem limit. + */ + + while (batch_bytes + buckets_bytes > hash_table_bytes) + { + + /* increment number of batches, and re-evaluate the amount of memory */ + nbatch <<= 1; + + /* assuming equally-sized batches, compute bytes of hash table and buckets */ + batch_bytes = inner_rel_bytes / nbatch; + + } + + /* protect against nbucket overflow */ + lbuckets = Min(lbuckets, max_pointers); + nbuckets = (int) lbuckets; } else { @@ -556,11 +627,12 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) int oldnbatch = hashtable->nbatch; int curbatch = hashtable->curbatch; int nbatch; - int i; MemoryContext oldcxt; long ninmemory; long nfreed; + HashChunk oldchunks = hashtable->chunks_batch; + /* do nothing if we've decided to shut off growth */ if (!hashtable->growEnabled) return; @@ -612,51 +684,84 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) */ ninmemory = nfreed = 0; - for (i = 0; i < hashtable->nbuckets; i++) + /* + * We will scan the tuples through chunks, not buckets, so we can reset the + * buckets and reinsert all the tuples there. So resetting buckets is OK. + */ + memset(hashtable->buckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets); + + /* If we need to do a resize of buckets, we can do it while rebatching. */ + if (hashtable->nbuckets_optimal != hashtable->nbuckets) + { + hashtable->nbuckets = hashtable->nbuckets_optimal; + hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; + + hashtable->buckets = repalloc(hashtable->buckets, + sizeof(HashJoinTuple) * hashtable->nbuckets); + } + + /* Reset the chunks, so that we allocate everything into a new list of chunks. */ + hashtable->chunks_batch = NULL; + + /* Scan through the old chunks, keep only tuples from the current batch. */ + while (oldchunks != NULL) { - HashJoinTuple prevtuple; - HashJoinTuple tuple; - prevtuple = NULL; - tuple = hashtable->buckets[i]; + /* pointer to the next memory chunk (may be NULL for the last chunk) */ + HashChunk nextchunk = oldchunks->next; + + /* position within the chunk (up to oldchunks->used) */ + size_t idx = 0; - while (tuple != NULL) + /* process all tuples stored in this chunk (and then free it) */ + while (idx < oldchunks->used) { - /* save link in case we delete */ - HashJoinTuple nexttuple = tuple->next; - int bucketno; - int batchno; + + /* get the hashjoin tuple and mintuple stored right after it */ + HashJoinTuple hashTuple = (HashJoinTuple)(oldchunks->data + idx); + MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); + + int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len); + + int bucketno; + int batchno; ninmemory++; - ExecHashGetBucketAndBatch(hashtable, tuple->hashvalue, + ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, &bucketno, &batchno); - Assert(bucketno == i); + if (batchno == curbatch) { - /* keep tuple */ - prevtuple = tuple; + /* keep tuple - this means we need to copy it into the new chunks */ + HashJoinTuple copyTuple = (HashJoinTuple) chunk_alloc(hashtable, hashTupleSize); + memcpy(copyTuple, hashTuple, hashTupleSize); + + /* and of course add it to the new buckets - just push the copy it onto the front + * of the bucket's list */ + copyTuple->next = hashtable->buckets[bucketno]; + hashtable->buckets[bucketno] = copyTuple; } else { /* dump it out */ Assert(batchno > curbatch); - ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(tuple), - tuple->hashvalue, + ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple), + hashTuple->hashvalue, &hashtable->innerBatchFile[batchno]); - /* and remove from hash table */ - if (prevtuple) - prevtuple->next = nexttuple; - else - hashtable->buckets[i] = nexttuple; - /* prevtuple doesn't change */ - hashtable->spaceUsed -= - HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(tuple)->t_len; - pfree(tuple); + + hashtable->spaceUsed -= hashTupleSize; nfreed++; } - tuple = nexttuple; + /* bytes occupied in memory HJ tuple overhead + actual tuple length */ + idx += hashTupleSize; + } + + /* so, we're done with this chunk - free it and proceed to the next one */ + pfree(oldchunks); + oldchunks = nextchunk; + } #ifdef HJDEBUG @@ -682,6 +787,93 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } /* + * ExecHashIncreaseNumBuckets + * increase the original number of buckets in order to reduce + * number of tuples per bucket + */ +static void +ExecHashIncreaseNumBuckets(HashJoinTable hashtable) +{ + HashChunk chunk; + + /* do nothing if not an increase (it's called increase for a reason) */ + if (hashtable->nbuckets >= hashtable->nbuckets_optimal) + return; + + /* + * We already know the optimal number of buckets, so let's just + * compute the log2_nbuckets for it. + */ + hashtable->nbuckets = hashtable->nbuckets_optimal; + hashtable->log2_nbuckets = my_log2(hashtable->nbuckets_optimal); + + Assert(hashtable->nbuckets > 1); + Assert(hashtable->nbuckets <= (INT_MAX/2)); + Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets)); + +#ifdef HJDEBUG + printf("Increasing nbuckets to %d\n", hashtable->nbuckets); +#endif + + /* + * Just reallocate the proper number of buckets - we don't need to + * walk through them - we can walk the dense-allocated chunks + * (just like in ExecHashIncreaseNumBatches, but without all the + * copying into new chunks) + */ + + hashtable->buckets = (HashJoinTuple *) repalloc(hashtable->buckets, + hashtable->nbuckets * sizeof(HashJoinTuple)); + + memset(hashtable->buckets, 0, sizeof(void*) * hashtable->nbuckets); + + /* scan through all tuples in all chunks, rebuild the hash table */ + chunk = hashtable->chunks_batch; + while (chunk != NULL) + { + + /* position within the buffer (up to chunk->used) */ + size_t idx = 0; + + /* process all tuples stored in this chunk (and then free it) */ + while (idx < chunk->used) + { + + /* get the hashjoin tuple and mintuple stored right after it */ + HashJoinTuple hashTuple = (HashJoinTuple)(chunk->data + idx); + MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); + + int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len); + + int bucketno; + int batchno; + + ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, + &bucketno, &batchno); + + /* just add the tuple to the proper bucket */ + hashTuple->next = hashtable->buckets[bucketno]; + hashtable->buckets[bucketno] = hashTuple; + + /* bytes occupied in memory HJ tuple overhead + actual tuple length */ + idx += hashTupleSize; + + } + + /* proceed to the next chunk */ + chunk = chunk->next; + + } + +#ifdef HJDEBUG + printf("Nbuckets increased to %d, average items per bucket %.1f\n", + hashtable->nbuckets, batchTuples / hashtable->nbuckets); +#endif + +} + + +/* * ExecHashTableInsert * insert a tuple into the hash table depending on the hash value * it may just go to a temp file for later batches @@ -717,8 +909,8 @@ ExecHashTableInsert(HashJoinTable hashtable, /* Create the HashJoinTuple */ hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; - hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt, - hashTupleSize); + hashTuple = (HashJoinTuple) chunk_alloc(hashtable, hashTupleSize); + hashTuple->hashvalue = hashvalue; memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); @@ -734,12 +926,34 @@ ExecHashTableInsert(HashJoinTable hashtable, hashTuple->next = hashtable->buckets[bucketno]; hashtable->buckets[bucketno] = hashTuple; + /* + * Increase the (optimal) number of buckets if we just exceeded NTUP_PER_BUCKET, + * but only when there's still a single batch at this moment. + */ + if ((hashtable->nbatch == 1) && + ((hashtable->totalTuples - hashtable->skewTuples) + >= (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))) + { + /* overflow protection */ + if (hashtable->nbuckets_optimal <= (INT_MAX/2)) + { + hashtable->nbuckets_optimal *= 2; + hashtable->log2_nbuckets_optimal += 1; + } + } + /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; if (hashtable->spaceUsed > hashtable->spacePeak) hashtable->spacePeak = hashtable->spaceUsed; - if (hashtable->spaceUsed > hashtable->spaceAllowed) + + /* + * Do we need to increase number of batches? Account for space required + * by buckets (optimal number). + */ + if (hashtable->spaceUsed + BUCKETS_SPACE(hashtable) > hashtable->spaceAllowed) ExecHashIncreaseNumBatches(hashtable); + } else { @@ -862,7 +1076,10 @@ ExecHashGetHashValue(HashJoinTable hashtable, * functions are good about randomizing all their output bits, else we are * likely to have very skewed bucket or batch occupancy.) * - * nbuckets doesn't change over the course of the join. + * The nbuckets/log2_nbuckets may change while (nbatch==1) because of dynamic + * buckets growth. Once we start batching, the value is fixed and does not + * change over the course of join (making it possible to compute batch number + * the way we do here). * * nbatch is always a power of 2; we increase it only by doubling it. This * effectively adds one more bit to the top of the batchno. @@ -1068,6 +1285,13 @@ ExecHashTableReset(HashJoinTable hashtable) hashtable->spaceUsed = 0; MemoryContextSwitchTo(oldcxt); + + /* + * Reset the chunks too (the memory was allocated within batchCxt, so it's + * already freed by resetting the batch context -just set it to NULL). + */ + hashtable->chunks_batch = NULL; + } /* @@ -1318,6 +1542,76 @@ ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue) return INVALID_SKEW_BUCKET_NO; } +static +char * chunk_alloc(HashJoinTable hashtable, int size) { + + /* XXX maybe we should use MAXALIGN(size) here ... */ + + /* if requested size is larger than of 1/8 of chunk size, allocate a separate chunk */ + if (size > (HASH_CHUNK_SIZE/8)) + { + + /* allocate new chunk and put it at the beginning of the list */ + HashChunk newChunk + = (HashChunk)MemoryContextAlloc(hashtable->batchCxt, + offsetof(HashChunkData, data) + size); + + newChunk->maxlen = size; + newChunk->used = 0; + newChunk->ntuples = 0; + + /* + * If there already is a chunk, add the new one after it, so we can still use the + * space in the existing one. + */ + if (hashtable->chunks_batch != NULL) + { + newChunk->next = hashtable->chunks_batch->next; + hashtable->chunks_batch->next = newChunk; + } + else + { + newChunk->next = hashtable->chunks_batch; + hashtable->chunks_batch = newChunk; + } + + newChunk->used += size; + newChunk->ntuples += 1; + + return newChunk->data; + + } + + /* + * Requested size is less than 1/8 of a chunk, so place it in the current chunk if there + * is enough free space. If not, allocate a new chunk and add it there. + */ + if ((hashtable->chunks_batch == NULL) || + (hashtable->chunks_batch->maxlen - hashtable->chunks_batch->used) < size) + { + + /* allocate new chunk and put it at the beginning of the list */ + HashChunk newChunk + = (HashChunk)MemoryContextAlloc(hashtable->batchCxt, + offsetof(HashChunkData, data) + HASH_CHUNK_SIZE); + + newChunk->maxlen = HASH_CHUNK_SIZE; + newChunk->used = 0; + newChunk->ntuples = 0; + + newChunk->next = hashtable->chunks_batch; + hashtable->chunks_batch = newChunk; + } + + /* OK, we have enough space in the chunk, let's add the tuple */ + hashtable->chunks_batch->used += size; + hashtable->chunks_batch->ntuples += 1; + + /* allocate pointer to the start of the tuple memory */ + return hashtable->chunks_batch->data + (hashtable->chunks_batch->used - size); + +} + /* * ExecHashSkewTableInsert * diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 3beae40..eda7502 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -102,12 +102,31 @@ typedef struct HashSkewBucket #define SKEW_WORK_MEM_PERCENT 2 #define SKEW_MIN_OUTER_FRACTION 0.01 +#define HASH_CHUNK_SIZE (16*1024L) /* 16kB chunks by default */ + +typedef struct HashChunkData +{ + int ntuples; /* number of tuples stored in this chunk */ + size_t maxlen; /* length of the buffer */ + size_t used; /* number of chunk bytes already used */ + + struct HashChunkData *next; /* pointer to the next chunk (linked list) */ + + char data[1]; /* buffer allocated at the end */ +} HashChunkData; + +typedef struct HashChunkData* HashChunk; + typedef struct HashJoinTableData { int nbuckets; /* # buckets in the in-memory hash table */ int log2_nbuckets; /* its log2 (nbuckets must be a power of 2) */ + int nbuckets_original; /* # buckets when starting the first hash */ + int nbuckets_optimal; /* optimal # buckets (per batch) */ + int log2_nbuckets_optimal; /* same as log2_nbuckets optimal */ + /* buckets[i] is head of list of tuples in i'th in-memory bucket */ struct HashJoinTupleData **buckets; /* buckets array is per-batch storage, as are all the tuples */ @@ -129,6 +148,7 @@ typedef struct HashJoinTableData bool growEnabled; /* flag to shut off nbatch increases */ double totalTuples; /* # tuples obtained from inner plan */ + double skewTuples; /* # tuples inserted into skew tuples */ /* * These arrays are allocated for the life of the hash join, but only if @@ -157,6 +177,10 @@ typedef struct HashJoinTableData MemoryContext hashCxt; /* context for whole-hash-join storage */ MemoryContext batchCxt; /* context for this-batch-only storage */ + + /* used for dense allocation of tuples (into linked chunks) */ + HashChunk chunks_batch; /* one list for the whole batch */ + } HashJoinTableData; #endif /* HASHJOIN_H */

-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers