Hi, On 2017-11-14 01:30:30 +1300, Thomas Munro wrote: > > +-- The "good" case: batches required, but we plan the right number; we > > +-- plan for 16 batches, and we stick to that number, and peak memory > > +-- usage says within our work_mem budget > > +-- non-parallel > > +set max_parallel_workers_per_gather = 0; > > +set work_mem = '128kB'; > > > > So how do we know that's actually the case we're testing rather than > > something arbitrarily different? There's IIRC tests somewhere that just > > filter the json explain output to the right parts... > > Yeah, good idea. My earlier attempts to dump out the hash join > dimensions ran into problems with architecture sensitivity and then > some run-to-run non-determinism in the parallel case (due to varying > fragmentation depending on how many workers get involved in time). > The attached version tells you about batch growth without reporting > the exact numbers, except in the "ugly" case where we know that there > is only one possible outcome because the extreme skew detector is > guaranteed to go off after the first nbatch increase (I got rid of all > other tuples except ones with the same key to make this true).
Hm. The way you access this doesn't quite seem right: +-- +-- exercises for the hash join code +-- +begin; +set min_parallel_table_scan_size = 0; +set parallel_setup_cost = 0; +-- Extract bucket and batch counts from an explain analyze plan. In +-- general we can't make assertions about how many batches (or +-- buckets) will be required because it can vary, but we can in some +-- special cases and we can check for growth. +create or replace function hash_join_batches(query text) +returns table (original int, final int) language plpgsql +as +$$ +declare + line text; + matches text[]; +begin + for line in + execute 'explain analyze ' || query + loop + matches := (regexp_matches(line, ' Batches: ([0-9]+) \(originally ([0-9]+)\)')); + if matches is not null then + original := matches[2]::int; + final := matches[1]::int; + return next; + else + matches := regexp_matches(line, ' Batches: ([0-9]+)'); + if matches is not null then + original := matches[1]::int; + final := original; + return next; + end if; + end if; + end loop; +end; +$$; Why not use format json and access the output that way? Then you can be sure you access the right part of the tree and such? > > + else > > + { > > + errno = stat_errno; > > + elog(LOG, "could not stat file \"%s\": %m", path); > > + } > > > > All these messages are "not expected to ever happen" ones, right? > > You'd have to suffer a nasty filesystem failure, remount read-only or > manually with permissions or something. Not sure where the line is, > but I've changed all of these new elog calls to ereport. Oh, I'd been fine keeping them as elogs. The one exception would have been out-of-space cases which'll occur in practice. > > + if (vfdP->fdstate & FD_TEMP_FILE_LIMIT) > > + { > > + /* Subtract its size from current usage (do first in case > > of error) */ > > + temporary_files_size -= vfdP->fileSize; > > + vfdP->fileSize = 0; > > + } > > > > So, is it right to do so unconditionally and without regard for errors? > > If the file isn't deleted, it shouldn't be subtracted from fileSize. I > > guess you're managing that through the flag, but that's not entirely > > obvious. > > I think it is. Reasoning: The existing behaviour of fd.c is that if > we don't manage to delete temporary files, we'll LOG something and > forget about them (they'll be cleaned up eventually by a clean restart > or human intervention). IOW: Never ;) > > +/* > > + * Write a tuple. If a meta-data size was provided to sts_initialize, > > then a > > + * pointer to meta data of that size must be provided. > > + */ > > +void > > +sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, > > + MinimalTuple tuple) > > +{ > > > > + /* Do we have space? */ > > + size = accessor->sts->meta_data_size + tuple->t_len; > > + if (accessor->write_pointer + size >= accessor->write_end) > > + { > > + /* Try flushing to see if that creates enough space. */ > > + if (accessor->write_chunk != NULL) > > + sts_flush_chunk(accessor); > > + > > + /* > > + * It may still not be enough in the case of a gigantic > > tuple, or if > > + * we haven't created a chunk buffer at all yet. > > + */ > > + if (accessor->write_pointer + size >= accessor->write_end) > > + { > > + SharedTuplestoreParticipant *participant; > > + size_t space_needed; > > + int pages_needed; > > + > > + /* How many pages to hold this data and the chunk > > header? */ > > + space_needed = offsetof(SharedTuplestoreChunk, > > data) + size; > > + pages_needed = (space_needed + (BLCKSZ - 1)) / > > BLCKSZ; > > + pages_needed = Max(pages_needed, > > STS_MIN_CHUNK_PAGES); > > + > > + /* > > + * Double the chunk size until it's big enough, and > > record that > > + * fact in the shared expansion log so that readers > > know about it. > > + */ > > + participant = > > &accessor->sts->participants[accessor->participant]; > > + while (accessor->write_pages < pages_needed) > > + { > > + accessor->write_pages *= 2; > > + > > participant->chunk_expansion_log[participant->chunk_expansions++] = > > + accessor->write_page; > > + } > > > > Hm. Isn't that going to be pretty unfunny if you have one large and a > > lot of small tuples? > > It will increase the parallel scan grain size, and then keep that size > for the rest of the contents of one backend's output file. I am aware > of two downsides to using a large parallel grain: > 1. It determines the amount of unfairness when we run out of data: > it's the maximum amount of extra data that the unlucky last worker can > finish up with after all the others have finished. I think this > effect is reduced by higher level factors: when a reader runs out of > data in one backend's file, it'll start reading another backend's > file. If it's hit the end of all backends' files and this is an outer > batch, Parallel Hash will just go and work on another batch > immediately. Consider e.g. what happens if there's the occasional 500MB datum, and the rest's very small... > Better ideas? Not really. I'm more than a bit suspicous of this solution, but I don't really have a great suggestion otherwise. One way to combat extreme size skew would be to put very large datums into different files. But I think we probably can go with your approach for now, ignoring my failure prone spidey senses ;) > > + /* Find the location of a new chunk to read. */ > > + p = > > &accessor->sts->participants[accessor->read_participant]; > > + > > + SpinLockAcquire(&p->mutex); > > + eof = p->read_page >= p->npages; > > + if (!eof) > > + { > > + /* > > + * Figure out how big this chunk is. It will > > almost always be the > > + * same as the last chunk loaded, but if there is > > one or more > > + * entry in the chunk expansion log for this page > > then we know > > + * that it doubled that number of times. This > > avoids the need to > > + * do IO to adjust the read head, so we don't need > > to hold up > > + * concurrent readers. (An alternative to this > > extremely rarely > > + * run loop would be to use more space storing the > > new size in the > > + * log so we'd have 'if' instead of 'while'.) > > + */ > > + read_page = p->read_page; > > + while (p->chunk_expansion < p->chunk_expansions && > > + > > p->chunk_expansion_log[p->chunk_expansion] == p->read_page) > > + { > > + p->chunk_pages *= 2; > > + p->chunk_expansion++; > > + } > > + chunk_pages = p->chunk_pages; > > + > > + /* The next reader will start after this chunk. */ > > + p->read_page += chunk_pages; > > + } > > + SpinLockRelease(&p->mutex); > > > > This looks more like the job of an lwlock rather than a spinlock. > > I switched to the alternative algorithm mentioned in parentheses in > the comment. It uses a bit more space, but that loop is gone. In my > mind this is much like Parallel Seq Scan: we acquire a spinlock just > to advance the block pointer. The added complication is that we also > check if the chunk size has changed, which clang renders as this many > instructions: > > postgres[0x10047eee0] <+176>: movslq 0x144(%r15,%rbx), %rcx > postgres[0x10047eee8] <+184>: cmpl 0x140(%r15,%rbx), %ecx > postgres[0x10047eef0] <+192>: jge 0x10047ef16 ; > <+230> at sharedtuplestore.c:489 > postgres[0x10047eef2] <+194>: leaq (%r15,%rbx), %rdx > postgres[0x10047eef6] <+198>: cmpl %r12d, 0x40(%rdx,%rcx,8) > postgres[0x10047eefb] <+203>: jne 0x10047ef16 ; > <+230> at sharedtuplestore.c:489 > postgres[0x10047eefd] <+205>: leaq 0x144(%r15,%rbx), %rsi > postgres[0x10047ef05] <+213>: leal 0x1(%rcx), %edi > postgres[0x10047ef08] <+216>: movl %edi, (%rsi) > postgres[0x10047ef0a] <+218>: movl 0x44(%rdx,%rcx,8), %ecx > postgres[0x10047ef0e] <+222>: movl %ecx, 0x148(%r15,%rbx) > postgres[0x10047ef16] <+230>: movl 0x148(%r15,%rbx), %r15d > > That should be OK, right? It's not too bad. Personally I'm of the opinion though that pretty much no new spinlocks should be added - their worst case performance characteristics are bad enough for that to be only worth the experimentation in case swhere each cycle really matters and where contention is unlikely. > > One day we're going to need a better approach to this. I have no idea > > how, but this per-node, and now per_node * max_parallelism, approach has > > only implementation simplicity as its benefit. > > I agree, and I am interested in that subject. In the meantime, I > think it'd be pretty unfair if parallel-oblivious hash join and > sort-merge join and every other parallel plan get to use work_mem * p > (and in some cases waste it with duplicate data), but Parallel Hash > isn't allowed to do the same (and put it to good use). I'm not sure I care about fairness between pieces of code ;) > > - node->hj_JoinState = HJ_NEED_NEW_OUTER; > > + if (hashtable->parallel_state) > > + { > > + Barrier *build_barrier; > > + > > + build_barrier = > > &hashtable->parallel_state->build_barrier; > > + if (BarrierPhase(build_barrier) == > > PHJ_BUILD_HASHING_OUTER) > > + { > > + /* > > + * If multi-batch, we need > > to hash the outer relation > > + * up front. > > + */ > > + if (hashtable->nbatch > 1) > > + > > ExecParallelHashJoinPartitionOuter(node); > > + > > BarrierArriveAndWait(build_barrier, > > + > > WAIT_EVENT_HASH_BUILD_HASHING_OUTER); > > + } > > + Assert(BarrierPhase(build_barrier) > > == PHJ_BUILD_DONE); > > + > > + /* Each backend should now select a > > batch to work on. */ > > + hashtable->curbatch = -1; > > + node->hj_JoinState = > > HJ_NEED_NEW_BATCH; > > + > > + continue; > > + } > > + else > > + node->hj_JoinState = > > HJ_NEED_NEW_OUTER; > > > > You know what I'm going to say about all these branches, and sigh. > > BTW this is not per-tuple code -- it runs once at the end of hashing. > Not sure what you're looking for here. It was more a general statement about all the branches in nodeHashjoin, than about these specific branches. Should've made that clearer. There's definitely branches in very common parts: case HJ_NEED_NEW_OUTER: /* * We don't have an outer tuple, try to get the next one */ if (hashtable->parallel_state) outerTupleSlot = ExecParallelHashJoinOuterGetTuple(outerNode, node, &hashvalue); else outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue); I don't think you should do so now, but I think a reasonable approach here would be to move the HJ_BUILD_HASHTABLE code into a separate function (it really can't be hot). Then have specialized ExecHashJoin() versions for parallel/non-parallel and potentially for outer/inner/anti. > > If we don't split this into two versions, we at least should store > > hashNode->parallel_state in a local var, so the compiler doesn't have to > > pull that out of memory after every external function call (of which > > there are a lot). In common cases it'll end up in a callee saved > > registers, and most of the called functions won't be too register > > starved (on x86-64). > > Hmm. Well I did that already in v24 -- in many places there is now a > local variable called pstate. See above piece of code, and a few others, in nodeHash. > > I think it'd be better if we structured the file so we just sat guc's > > with SET LOCAL inside a transaction. > > I wrapped the whole region of join.sql concerned with hash joins in a > transaction that rolls back, so I don't have to write LOCAL. That's > just as good, right? Not really imo. Being able to read a test without going through all previous ones is a lot better. Greetings, Andres Freund