Hi hackers, Thanks very much to Rafia for testing, and to Andres for his copious review feedback. Here's a new version. Changes:
1. Keep all the backing files that are part of a BufFileSet in subdirectories, as suggested by Andres. Now, instead of that unpopular logic for scanning ranges of possible file paths to delete, we can just blow away whole directories that group sets of related files. 2. Don't expose 'participant' and 'partition' concepts, Andres didn't like much, in the BufFile API. There is a new concept 'stripe' which client code of BufFileSet can use to specify the participant number in a more general way without saying so: it's really just a way to spread files over tablespaces. I'm not sure if tablespaces are really used that way much, but it seemed like Peter wasn't going to be too happy with a proposal that didn't do *something* to respect the existing temp_tablespaces GUC beahviour (and he'd be right). But I didn't think it would make any kind of sense at all to stripe by 1GB segments as private BufFiles do when writing from multiple processes, as I have argued elsewhere, hence this scheme. The 'qunique' function used here (basically poor man's std::unique) is one I proposed earlier, with the name suggested by Tom Lane: See https://www.postgresql.org/message-id/flat/CAEepm%3D2vmFTNpAmwbGGD2WaryM6T3hSDVKQPfUwjdD_5XY6vAA%40mail.gmail.com . 3. Merged the single-batch and multi-batch patches into one. EarlierI had the idea that it was easier to review them in layers since I hoped people might catch a glimpse of the central simplicity without being hit by a wall of multi-batch logic, but since Andres is reviewing and disagrees, I give you 0010-hj-parallel-v11.patch which weighs in at 32 files changed, 2278 insertions(+), 250 deletions(-). 4. Moved the DSM handling to the every end of resowner.c's cleanup. Peter pointed out that it would otherwise happen before fd.c Files are closed. He was concerned about a different aspect of that which I'm not sure I fully understand, but at the very least it seemed to represent a significant problem for this design on Windows. I discussed this briefly with Robert off-list and he told me that there is probably no good reason for the ordering that we have, and what's more, there may be good arguments even outside this case for DSM segments being cleaned up as late as possible, now that they contain shared control information and not just tuple data as once had been imagined. I can't think of any reason why this would not be safe. Can you? 5. The empty inner relation optimisation implemented. Some smaller changes and miles of feedback inline below: On Mon, Mar 27, 2017 at 11:03 AM, Thomas Munro <thomas.mu...@enterprisedb.com> wrote: > On Mon, Mar 27, 2017 at 9:41 AM, Andres Freund <and...@anarazel.de> wrote: >> SharedBufFile allows temporary files to be created by one backend and >> then exported for read-only access by other backends, with clean-up >> managed by reference counting associated with a DSM segment. This includes >> changes to fd.c and buffile.c to support new kinds of temporary file. >> >> >> diff --git a/src/backend/storage/file/buffile.c >> b/src/backend/storage/file/buffile.c >> index 4ca0ea4..a509c05 100644 >> --- a/src/backend/storage/file/buffile.c >> +++ b/src/backend/storage/file/buffile.c >> >> I think the new facilities should be explained in the file's header. > > Will do. Done. >> @@ -68,9 +71,10 @@ struct BufFile >> * avoid making redundant FileSeek calls. >> */ >> >> - bool isTemp; /* can only add files if >> this is TRUE */ >> + bool isSegmented; /* can only add files if this is >> TRUE */ >> >> That's a bit of a weird and uncommented upon change. > > I was trying to cut down on the number of places we use the word > 'temporary' to activate various different behaviours. In this case, > the only thing it controls is whether the BufFile is backed by one > single fd.c File or many segments, so I figured it should be renamed. > > As Peter and you have pointed out, there may be a case for removing it > altogether. Done in 0007-hj-remove-buf-file-is-temp-v11.patch. >> @@ -79,6 +83,8 @@ struct BufFile >> */ >> ResourceOwner resowner; >> >> + BufFileTag tag; /* for discoverability >> between backends */ >> >> Not perfectly happy with the name tag here, the name is a bit too >> similar to BufferTag - something quite different. > > Yeah, will rename. Done. That existed only because I had sharedbuffile.c which needed special access to buffile.c via those weird 'tag' interfaces. In the new version that isn't required, and a new struct BufFileSet is provided by buffile.c/h. >> +static void >> +make_tagged_path(char *tempdirpath, char *tempfilepath, >> + const BufFileTag *tag, int segment) >> +{ >> + if (tag->tablespace == DEFAULTTABLESPACE_OID || >> + tag->tablespace == GLOBALTABLESPACE_OID) >> + snprintf(tempdirpath, MAXPGPATH, "base/%s", >> PG_TEMP_FILES_DIR); >> + else >> + { >> + snprintf(tempdirpath, MAXPGPATH, "pg_tblspc/%u/%s/%s", >> + tag->tablespace, >> TABLESPACE_VERSION_DIRECTORY, >> + PG_TEMP_FILES_DIR); >> + } >> + >> + snprintf(tempfilepath, MAXPGPATH, "%s/%s%d.%d.%d.%d.%d", tempdirpath, >> + PG_TEMP_FILE_PREFIX, >> + tag->creator_pid, tag->set, tag->partition, >> tag->participant, >> + segment); >> >> Is there a risk that this ends up running afoul of filename length >> limits on some platforms? The names are shorter now, and split over two levels: pgsql_tmp37303.2.set/pgsql_tmp.p30.b0.0 >> If we do decide not to change this: Why is that sufficient? Doesn't the >> same problem exist for segments later than the first? > > It does exist and it is handled. The comment really should say > "unlinking segment N + 1 (if it exists) before creating segment N". > Will update. I got rid of this. This doesn't come up anymore because the patch now blows away whole directories. There is never a case where files left over after a crash-restart would confuse us. There may be left over directories, but if we find that we can't create a directory, we try to delete it and all its contents first (ie to see if there was a leftover directory from before a crash-restart) and then try again, so individual segment files shouldn't be able to confuse us. >> + * PathNameCreateTemporaryFile, PathNameOpenTemporaryFile and >> + * PathNameDeleteTemporaryFile are used for temporary files that may be >> shared >> + * between backends. A File created or opened with these functions is not >> + * automatically deleted when the file is closed, but it is automatically >> + * closed and end of transaction and counts agains the temporary file limit >> of >> + * the backend that created it. Any File created this way must be >> explicitly >> + * deleted with PathNameDeleteTemporaryFile. Automatic file deletion is not >> + * provided because this interface is designed for use by buffile.c and >> + * indirectly by sharedbuffile.c to implement temporary files with shared >> + * ownership and cleanup. >> >> Hm. Those name are pretty easy to misunderstand, no? s/Temp/Shared/? > > Hmm. Yeah these may be better. Will think about that. I like these names. This is fd.c providing named temporary files. They are definitely temporary files still: they participate in the total temp limit and logging/pgstat and they are automatically closed. The only different things are: they have names permitting opening by other backends, and (it follows) are not automatically deleted on close. buffile.c takes over that duty using a BufFileSet. >> +File >> +PathNameOpenTemporaryFile(char *tempfilepath) >> +{ >> + File file; >> + >> + /* >> + * Open the file. Note: we don't use O_EXCL, in case there is an >> orphaned >> + * temp file that can be reused. >> + */ >> + file = PathNameOpenFile(tempfilepath, O_RDONLY | PG_BINARY, 0); >> >> If so, wouldn't we need to truncate the file? > > Yes, this lacks O_TRUNC. Thanks. Actually the reason I did that is because I wanted to open the file with O_RDONLY, which is incompatible with O_TRUNC. Misleading comment removed. >> + * A single SharedBufFileSet can manage any number of 'tagged' BufFiles that >> + * are shared between a fixed number of participating backends. Each shared >> + * BufFile can be written to by a single participant but can be read by any >> + * backend after it has been 'exported'. Once a given BufFile is exported, >> it >> + * becomes read-only and cannot be extended. To create a new shared >> BufFile, >> + * a participant needs its own distinct participant number, and needs to >> + * specify an arbitrary partition number for the file. To make it available >> + * to other backends, it must be explicitly exported, which flushes internal >> + * buffers and renders it read-only. To open a file that has been shared, a >> + * backend needs to know the number of the participant that created the >> file, >> + * and the partition number. It is the responsibily of calling code to >> ensure >> + * that files are not accessed before they have been shared. >> >> Hm. One way to make this safer would be to rename files when exporting. >> Should be sufficient to do this to the first segment, I guess. > > Interesting idea. Will think about that. That comment isn't great > and repeats itself. Will improve. Comment improved. I haven't investigated a file-renaming scheme for exporting files yet. >> + * Each file is identified by a partition number and a participant number, >> so >> + * that a SharedBufFileSet can be viewed as a 2D table of individual files. >> >> I think using "files" as a term here is a bit dangerous - they're >> individually segmented again, right? > > True. It's a 2D matrix of BufFiles. The word "file" is super > overloaded here. Will fix. No longer present. >> +/* >> + * The number of bytes of shared memory required to construct a >> + * SharedBufFileSet. >> + */ >> +Size >> +SharedBufFileSetSize(int participants) >> +{ >> + return offsetof(SharedBufFileSet, participants) + >> + sizeof(SharedBufFileParticipant) * participants; >> +} >> >> The function name sounds a bit like a function actuallize setting some >> size... s/Size/DetermineSize/? > > Hmm yeah "set" as verb vs "set" as noun. I think "estimate" is the > established word for this sort of thing (even though that seems > strange because it sounds like it doesn't have to be exactly right: > clearly in all these shmem-space-reservation functions it has to be > exactly right). Will change. Done. (Of course 'estimate' is both a noun and a verb too, and for extra points pronounced differently...) >> >> +/* >> + * Create a new file suitable for sharing. Each backend that calls this >> must >> + * use a distinct participant number. Behavior is undefined if a >> participant >> + * calls this more than once for the same partition number. Partitions >> should >> + * ideally be numbered consecutively or in as small a range as possible, >> + * because file cleanup will scan the range of known partitions looking for >> + * files. >> + */ >> >> Wonder if we shouldn't just create a directory for all such files. > > Hmm. Yes, that could work well. Will try that. Done. >> I'm a bit unhappy with the partition terminology around this. It's >> getting a bit confusing. We have partitions, participants and >> segements. Most of them could be understood for something entirely >> different than the meaning you have here... > > Ok. Let me try to explain [explanation...]. > > (Perhaps SharedBufFileSet should be called PartitionedBufFileSet?) I got rid of most of that terminology. Now I have BufFileSet which is a set of named BufFiles and it's up to client code to manage the namespace within it. SharedTuplestore happens to build names that include partition and participant numbers, but that's its business. There is also a 'stripe' number, which is used as a way to spread files across multiple temp_tablespaces. >> +static void >> +shared_buf_file_on_dsm_detach(dsm_segment *segment, Datum datum) >> +{ >> + bool unlink_files = false; >> + SharedBufFileSet *set = (SharedBufFileSet *) DatumGetPointer(datum); >> + >> + SpinLockAcquire(&set->mutex); >> + Assert(set->refcount > 0); >> + if (--set->refcount == 0) >> + unlink_files = true; >> + SpinLockRelease(&set->mutex); >> >> I'm a bit uncomfortable with releasing a refcount, and then still using >> the memory from the set... I don't think there's a concrete danger >> here as the code stands, but it's a fairly dangerous pattern. > > Will fix. I could fix that but I'd feel bad about doing more work while holding the spinlock (even though it can't possibly be contended because we are the last to detach). I have added a comment to explain that it's safe to continue accessing the DSM segment while in this function body. On Mon, Mar 27, 2017 at 10:47 AM, Andres Freund <and...@anarazel.de> wrote: > On 2017-03-23 20:35:09 +1300, Thomas Munro wrote: >> Here is a new patch series responding to feedback from Peter and Andres: > > + > +/* Per-participant shared state. */ > +typedef struct SharedTuplestoreParticipant > +{ > + LWLock lock; > > Hm. No padding (ala LWLockMinimallyPadded / LWLockPadded) - but that's > probably ok, for now. I hunted around but didn't see an idiom for making this whole struct cacheline-sized. > + bool error; /* Error occurred > flag. */ > + bool eof; /* End of file > reached. */ > + int read_fileno; /* BufFile segment file > number. */ > + off_t read_offset; /* Offset within segment > file. */ > > Hm. I wonder if it'd not be better to work with 64bit offsets, and just > separate that out upon segment access. This falls out of the current two-part BufFileTell and BufFileSeek interface. Since translation could be done trivially (single_address_space_offset = fileno * MAX_PHYSICAL_FILESIZE + offset), that might be a reasonable refactoring, but it seems to be material for a separate patch, considering that other client code would be affected, no? > +/* The main data structure in shared memory. */ > > "main data structure" isn't particularly meaningful. Fixed. > +struct SharedTuplestore > +{ > + int reading_partition; > + int nparticipants; > + int flags; > > Maybe add a comment saying /* flag bits from SHARED_TUPLESTORE_* */? Done. > + Size meta_data_size; > > What's this? Comments added to every struct member. > + SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]; > > I'd add a comment here, that there's further data after participants. Done. > +}; > > + > +/* Per-participant backend-private state. */ > +struct SharedTuplestoreAccessor > +{ > > Hm. The name and it being backend-local are a bit conflicting. Hmm. It's a (SharedTupleStore) Accessor, not a Shared (...). Not sure if we have an established convention for this kind of thing... > + int participant; /* My partitipant number. */ > + SharedTuplestore *sts; /* The shared state. */ > + int nfiles; /* Size of local > files array. */ > + BufFile **files; /* Files we have open locally > for writing. */ > > Shouldn't this mention that it's indexed by partition? Done. > + BufFile *read_file; /* The current file to read > from. */ > + int read_partition; /* The current partition to > read from. */ > + int read_participant; /* The current participant to read > from. */ > + int read_fileno; /* BufFile segment file > number. */ > + off_t read_offset; /* Offset within segment > file. */ > +}; > > > +/* > + * Initialize a SharedTuplestore in existing shared memory. There must be > + * space for sts_size(participants) bytes. If flags is set to the value > + * SHARED_TUPLESTORE_SINGLE_PASS then each partition may only be read once, > + * because underlying files will be deleted. > > Any reason not to use flags that are compatible with tuplestore.c? tuplestore.c uses some executor.h flags like EXEC_FLAG_MARK. sharedtuplestore.c's interface and capabilities are extremely primitive and only really let it do exactly what I needed to do here. Namely, every participant writes into its own set of partition files, and then all together we perform a single "partial scan" in some undefined order to get all the tuples back and share them out between backends. Extending it to behave more like the real tuplestore may be interesting for other projects (dynamic partitioning etc) but it didn't seem like a good idea to speculate on what exactly would be needed. This particular flag means 'please delete individual backing files as we go after reading them', and I don't believe there is any equivalent; someone thought the private HJ should do that so I figured I should do it here too. > + * Tuples that are stored may optionally carry a piece of fixed sized > + * meta-data which will be retrieved along with the tuple. This is useful > for > + * the hash codes used for multi-batch hash joins, but could have other > + * applications. > + */ > +SharedTuplestoreAccessor * > +sts_initialize(SharedTuplestore *sts, int participants, > + int my_participant_number, > + Size meta_data_size, > + int flags, > + dsm_segment *segment) > +{ > > Not sure I like that the naming here has little in common with > tuplestore.h's api. Hmm. I feel like its interface needs to be significantly different to express the things it needs to do, especially at initialisation. As for the tuple write/write interface, how would you improve this? sts_puttuple(...); sts_puttuple(...); ... sts_end_write_all_partitions(...); sts_prepare_partial_scan(...); /* in one backend only */ sts_begin_partial_scan(...); ... = sts_gettuple(...); ... = sts_gettuple(...); ... sts_end_partial_scan(...); One thought that I keep having: the private hash join code should also use tuplestore. But a smarter tuplestore that knows how to hold onto the hash value (the meta-data in my sharedtuplestore.c) and knows about partitions (batches). It would be nice if the private and shared batching code finished up harmonised in this respect. > + > +MinimalTuple > +sts_gettuple(SharedTuplestoreAccessor *accessor, void *meta_data) > +{ > > This needs docs. Done. > + SharedBufFileSet *fileset = GetSharedBufFileSet(accessor->sts); > + MinimalTuple tuple = NULL; > + > + for (;;) > + { > > ... > + /* Check if this participant's file has already been entirely > read. */ > + if (participant->eof) > + { > + BufFileClose(accessor->read_file); > + accessor->read_file = NULL; > + LWLockRelease(&participant->lock); > + continue; > > Why are we closing the file while holding the lock? Fixed. > + > + /* Read the optional meta-data. */ > + eof = false; > + if (accessor->sts->meta_data_size > 0) > + { > + nread = BufFileRead(accessor->read_file, meta_data, > + > accessor->sts->meta_data_size); > + if (nread == 0) > + eof = true; > + else if (nread != accessor->sts->meta_data_size) > + ereport(ERROR, > + (errcode_for_file_access(), > + errmsg("could not read from > temporary file: %m"))); > + } > + > + /* Read the size. */ > + if (!eof) > + { > + nread = BufFileRead(accessor->read_file, &tuple_size, > sizeof(tuple_size)); > + if (nread == 0) > + eof = true; > > Why is it legal to have EOF here, if metadata previously didn't have an > EOF? Perhaps add an error if accessor->sts->meta_data_size != 0? Improved comments. > + if (eof) > + { > + participant->eof = true; > + if ((accessor->sts->flags & > SHARED_TUPLESTORE_SINGLE_PASS) != 0) > + SharedBufFileDestroy(fileset, > accessor->read_partition, > + > accessor->read_participant); > + > + participant->error = false; > + LWLockRelease(&participant->lock); > + > + /* Move to next participant's file. */ > + BufFileClose(accessor->read_file); > + accessor->read_file = NULL; > + continue; > + } > + > + /* Read the tuple. */ > + tuple = (MinimalTuple) palloc(tuple_size); > + tuple->t_len = tuple_size; > > Hm. Constantly re-allocing this doesn't strike me as a good idea (not to > mention that the API doesn't mention this is newly allocated). Seems > like it'd be a better idea to have a per-accessor buffer where this can > be stored in - increased in size when necessary. Done. On Tue, Mar 28, 2017 at 6:33 PM, Andres Freund <and...@anarazel.de> wrote: > On 2017-03-23 20:35:09 +1300, Thomas Munro wrote: >> Here is a new patch series responding to feedback from Peter and Andres: > > Here's a review of 0007 & 0010 together - they're going to have to be > applied together anyway... I have now merged them FWIW. > diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml > index ac339fb566..775c9126c7 100644 > --- a/doc/src/sgml/config.sgml > +++ b/doc/src/sgml/config.sgml > @@ -3814,6 +3814,21 @@ ANY <replaceable > class="parameter">num_sync</replaceable> ( <replaceable class=" > </listitem> > </varlistentry> > > + <varlistentry id="guc-cpu-shared-tuple-cost" > xreflabel="cpu_shared_tuple_cost"> > + <term><varname>cpu_shared_tuple_cost</varname> (<type>floating > point</type>) > + <indexterm> > + <primary><varname>cpu_shared_tuple_cost</> configuration > parameter</primary> > + </indexterm> > + </term> > + <listitem> > + <para> > + Sets the planner's estimate of the cost of sharing rows in > + memory during a parallel query. > + The default is 0.001. > + </para> > + </listitem> > + </varlistentry> > + > > Isn't that really low in comparison to the other costs? I think > specifying a bit more what this actually measures would be good too - is > it putting the tuple in shared memory? Is it accessing it? Yeah. It was really just to make the earlier Shared Hash consistently more expensive than private Hash, by a tiny amount. Then it wouldn't kick in until it could help you avoid batching. I will try to come up with some kind of argument based on data... > + <varlistentry id="guc-cpu-synchronization-cost" > xreflabel="cpu_synchronization_cost"> > + <term><varname>cpu_synchronization_cost</varname> (<type>floating > point</type>) > + <indexterm> > + <primary><varname>cpu_synchronization_cost</> configuration > parameter</primary> > + </indexterm> > + </term> > + <listitem> > + <para> > + Sets the planner's estimate of the cost of waiting at synchronization > + points for other processes while executing parallel queries. > + The default is 1.0. > + </para> > + </listitem> > + </varlistentry> > > Isn't this also really cheap in comparison to a, probably cached, seq > page read? It's not really the synchronisation primitive itself, which is fast, it's how long the other guys may spend doing other stuff before they reach the barrier. Currently we have a block granularity parallel query system, so really this is an estimation of how long the average participant will have to wait for the last of its peers to finish chewing on up to one page of tuples from its (ultimate) source of parallelism. Yeah I'm waffling a bit because I don't have a principled answer to this question yet... > + if (HashJoinTableIsShared(hashtable)) > + { > + /* > + * Synchronize parallel hash table builds. At this stage we > know that > + * the shared hash table has been created, but we don't know > if our > + * peers are still in MultiExecHash and if so how far > through. We use > + * the phase to synchronize with them. > + */ > + barrier = &hashtable->shared->barrier; > + > + switch (BarrierPhase(barrier)) > + { > + case PHJ_PHASE_BEGINNING: > > Note pgindent will indent this further. Might be worthwhile to try to > pgindent the file, revert some of the unintended damage. Fixed switch statement indentation. I will try pgindent soon and see how badly it all breaks. > /* > * set expression context > */ > > I'd still like this to be moved to the start. Done. > @@ -126,17 +202,79 @@ MultiExecHash(HashState *node) > /* Not subject to skew optimization, so > insert normally */ > ExecHashTableInsert(hashtable, slot, > hashvalue); > } > - hashtable->totalTuples += 1; > + hashtable->partialTuples += 1; > + if (!HashJoinTableIsShared(hashtable)) > + hashtable->totalTuples += 1; > } > } > > FWIW, I'd put HashJoinTableIsShared() into a local var - the compiler > won't be able to do that on its own because external function calls > could invalidate the result. Done in in the hot loops. > That brings me to a related topic: Have you measured whether your > changes cause performance differences? I have never succeeded in measuring any reproducible difference between master with 0 workers and my patch with the 0 workers on various contrived queries and TPCH queries (except the ones where my patch makes certain outer joins faster for known reasons). I suspect it just spends to much time ping ponging in and out of the node for each tuple for tiny differences in coding to show up. But I could be testing for the wrong things... > + finish_loading(hashtable); > > I find the sudden switch to a different naming scheme in the same file a > bit jarring. Ok. I have now changed all of the static functions in nodeHash.c from foo_bar to ExecHashFooBar. > + if (HashJoinTableIsShared(hashtable)) > + BarrierDetach(&hashtable->shared->shrink_barrier); > + > + if (HashJoinTableIsShared(hashtable)) > + { > > Consecutive if blocks with the same condition... Fixed. > > + bool elected_to_resize; > + > + /* > + * Wait for all backends to finish building. If only one > worker is > + * running the building phase because of a non-partial inner > plan, the > + * other workers will pile up here waiting. If multiple > worker are > + * building, they should finish close to each other in time. > + */ > > That comment is outdated, isn't it? Yes, fixed. > /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ > - if (hashtable->nbuckets != hashtable->nbuckets_optimal) > - ExecHashIncreaseNumBuckets(hashtable); > + ExecHashUpdate(hashtable); > + ExecHashIncreaseNumBuckets(hashtable); > > So this now doesn't actually increase the number of buckets anymore. Well that function always returned if found there were already enough buckets, so either the test at call site or in the function was redundant. I have renamed it to ExecHashIncreaseNumBucketsIfNeeded() to make that clearer. > + reinsert: > + /* If the table was resized, insert tuples into the new buckets. */ > + ExecHashUpdate(hashtable); > + ExecHashReinsertAll(hashtable); > > ReinsertAll just happens to do nothing if we didn't have to > resize... Not entirely obvious, sure reads as if it were unconditional. > Also, it's not actually "All" when batching is in use, no? Renamed to ExecHashReinsertHashtableIfNeeded. > + post_resize: > + if (HashJoinTableIsShared(hashtable)) > + { > + Assert(BarrierPhase(barrier) == PHJ_PHASE_RESIZING); > + BarrierWait(barrier, WAIT_EVENT_HASH_RESIZING); > + Assert(BarrierPhase(barrier) == PHJ_PHASE_REINSERTING); > + } > + > + reinsert: > + /* If the table was resized, insert tuples into the new buckets. */ > + ExecHashUpdate(hashtable); > + ExecHashReinsertAll(hashtable); > > Hm. So even non-resizing backends reach this - but they happen to not > do anything because there's no work queued up, right? That's, uh, not > obvious. Added comments to that effect. > For me the code here would be a good bit easier to read if we had a > MultiExecHash and MultiExecParallelHash. Half of MultiExecHash is just > if(IsShared) blocks, and copying would avoid potential slowdowns. Hmm. Yeah I have struggled with this question in several places. For example I have ExecHashLoadPrivateTuple and ExecHashLoadSharedTuple because the intertwangled version was unbearable. But in MultiExecHash's case, I feel there is some value in showing that the basic hash build steps are the same. The core loop, where the main action really happens, is unchanged. > + /* > + * Set up for skew optimization, if possible and there's a > need for > + * more than one batch. (In a one-batch join, there's no > point in > + * it.) > + */ > + if (nbatch > 1) > + ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); > > So there's no equivalent to the skew optimization for parallel query > yet... It doesn't sound like that should be particulalry hard on first > blush? Making the skew table shared, setting up buckets for MVCs, build and probing it is easy. It's work_mem exhaustion and shrinking and related jiggery pokery that'll be tricky, but I'll shortly be looking at that with vigour and vim. That there may be one or two empty relation optimisations that I haven't got yet because they involve a bit of extra communication. > static void > -ExecHashIncreaseNumBatches(HashJoinTable hashtable) > +ExecHashIncreaseNumBatches(HashJoinTable hashtable, int nbatch) > > So this doesn't actually increase the number of batches anymore... At > the very least this should mention that the main work is done in > ExecHashShrink. Yeah. Done. > +/* > + * Process the queue of chunks whose tuples need to be redistributed into the > + * correct batches until it is empty. In the best case this will shrink the > + * hash table, keeping about half of the tuples in memory and sending the > rest > + * to a future batch. > + */ > +static void > +ExecHashShrink(HashJoinTable hashtable) > > Should mention this really only is meaningful after > ExecHashIncreaseNumBatches has run. Updated. > +{ > + long ninmemory; > + long nfreed; > + dsa_pointer chunk_shared; > + HashMemoryChunk chunk; > > - /* If know we need to resize nbuckets, we can do it while rebatching. > */ > - if (hashtable->nbuckets_optimal != hashtable->nbuckets) > + if (HashJoinTableIsShared(hashtable)) > { > - /* we never decrease the number of buckets */ > - Assert(hashtable->nbuckets_optimal > hashtable->nbuckets); > + /* > + * Since a newly launched participant could arrive while > shrinking is > + * already underway, we need to be able to jump to the > correct place > + * in this function. > + */ > + switch > (PHJ_SHRINK_PHASE(BarrierPhase(&hashtable->shared->shrink_barrier))) > + { > + case PHJ_SHRINK_PHASE_BEGINNING: /* likely case */ > + break; > + case PHJ_SHRINK_PHASE_CLEARING: > + goto clearing; > + case PHJ_SHRINK_PHASE_WORKING: > + goto working; > + case PHJ_SHRINK_PHASE_DECIDING: > + goto deciding; > + } > > Hm, so we jump into different nesting levels here :/ I rewrote this without goto. Mea culpa. > ok, ENOTIME for today... Thanks! Was enough to keep me busy for some time... > diff --git a/src/backend/executor/nodeHashjoin.c > b/src/backend/executor/nodeHashjoin.c > index f2c885afbe..87d8f3766e 100644 > --- a/src/backend/executor/nodeHashjoin.c > +++ b/src/backend/executor/nodeHashjoin.c > @@ -6,10 +6,78 @@ > * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group > * Portions Copyright (c) 1994, Regents of the University of California > * > - * > * IDENTIFICATION > * src/backend/executor/nodeHashjoin.c > * > + * NOTES: > + * > + * PARALLELISM > + * > + * Hash joins can participate in parallel queries in two ways: in > + * non-parallel-aware mode, where each backend builds an identical hash table > + * and then probes it with a partial outer relation, or parallel-aware mode > + * where there is a shared hash table that all participants help to build. A > + * parallel-aware hash join can save time and space by dividing the work up > + * and sharing the result, but has extra communication overheads. > > There's a third, right? The hashjoin, and everything below it, could > also not be parallel, but above it could be some parallel aware node > (e.g. a parallel aware HJ). Yeah that's the same thing: it's not aware of parallelism. Its outer plan may be partial or not, and it doesn't even know. That's the distinction I'm trying to make clear: actually doing something special for parallelism. I've update the text slightly to say that the outer plan may be partial or not in a hash join that is under Gather. > + * In both cases, hash joins use a private state machine to track progress > + * through the hash join algorithm. > > That's not really parallel specific, right? Perhaps just say that > parallel HJs use the normal state machine? Updated. > + * In a parallel-aware hash join, there is also a shared 'phase' which > + * co-operating backends use to synchronize their local state machine and > + * program counter with the multi-process join. The phase is managed by a > + * 'barrier' IPC primitive. > > Hm. I wonder if 'phase' shouldn't just be name > sharedHashJoinState. Might be a bit easier to understand than a > different terminology. Hmm. Well it is a lot like a state machine but it might be more confusing to have both local and shared 'state'. I think 'phases' of parallel computation are quite intuitive. I'm rather attached to this terminology... > + * The phases are as follows: > + * > + * PHJ_PHASE_BEGINNING -- initial phase, before any participant acts > + * PHJ_PHASE_CREATING -- one participant creates the shmem hash > table > + * PHJ_PHASE_BUILDING -- all participants build the hash table > + * PHJ_PHASE_RESIZING -- one participant decides whether to > expand buckets > + * PHJ_PHASE_REINSERTING -- all participants reinsert tuples if necessary > + * PHJ_PHASE_PROBING -- all participants probe the hash table > + * PHJ_PHASE_UNMATCHED -- all participants scan for unmatched tuples > > I think somewhere here - and probably around the sites it's happening - > should mention that state transitions are done kinda implicitly via > BarrierWait progressing to the numerically next phase. That's not > entirely obvious (and actually limits what the barrier mechanism can be > used for...). Yeah. Added comments. On Wed, Mar 29, 2017 at 9:31 AM, Andres Freund <and...@anarazel.de> wrote: > - ExecHashJoinSaveTuple(tuple, > - hashvalue, > - > &hashtable->innerBatchFile[batchno]); > + if (HashJoinTableIsShared(hashtable)) > + sts_puttuple(hashtable->shared_inner_batches, > batchno, &hashvalue, > + tuple); > + else > + ExecHashJoinSaveTuple(tuple, > + hashvalue, > + > &hashtable->innerBatchFile[batchno]); > } > } > > Why isn't this done inside of ExecHashJoinSaveTuple? I had it that way earlier but the arguments got ugly. I suppose it could take an SOMETHING_INNER/SOMETHING_OUTER enum and a partition number. I wonder if SharedTuplestore should be able to handle the private case too... > @@ -1280,6 +1785,68 @@ ExecHashTableReset(HashJoinTable hashtable) > > + /* Rewind the shared read heads for this batch, inner > and outer. */ > + > sts_prepare_parallel_read(hashtable->shared_inner_batches, > + > curbatch); > + > sts_prepare_parallel_read(hashtable->shared_outer_batches, > + > curbatch); > > It feels somewhat wrong to do this in here, rather than on the callsites. The private hash table code does the moral equivalent directly below: it uses BufFileSeek to rewind the current inner and outer batch to the start. > + } > + > + /* > + * Each participant needs to make sure that data it has > written for > + * this partition is now read-only and visible to other > participants. > + */ > + sts_end_write(hashtable->shared_inner_batches, curbatch); > + sts_end_write(hashtable->shared_outer_batches, curbatch); > + > + /* > + * Wait again, so that all workers see the new hash table and > can > + * safely read from batch files from any participant because > they have > + * all ended writing. > + */ > + Assert(BarrierPhase(&hashtable->shared->barrier) == > + PHJ_PHASE_RESETTING_BATCH(curbatch)); > + BarrierWait(&hashtable->shared->barrier, > WAIT_EVENT_HASH_RESETTING); > + Assert(BarrierPhase(&hashtable->shared->barrier) == > + PHJ_PHASE_LOADING_BATCH(curbatch)); > + ExecHashUpdate(hashtable); > + > + /* Forget the current chunks. */ > + hashtable->current_chunk = NULL; > + return; > + } > > /* > * Release all the hash buckets and tuples acquired in the prior > pass, and > @@ -1289,10 +1856,10 @@ ExecHashTableReset(HashJoinTable hashtable) > oldcxt = MemoryContextSwitchTo(hashtable->batchCxt); > > /* Reallocate and reinitialize the hash bucket headers. */ > - hashtable->buckets = (HashJoinTuple *) > - palloc0(nbuckets * sizeof(HashJoinTuple)); > + hashtable->buckets = (HashJoinBucketHead *) > + palloc0(nbuckets * sizeof(HashJoinBucketHead)); > > - hashtable->spaceUsed = nbuckets * sizeof(HashJoinTuple); > + hashtable->spaceUsed = nbuckets * sizeof(HashJoinBucketHead); > > /* Cannot be more than our previous peak; we had this size before. */ > Assert(hashtable->spaceUsed <= hashtable->spacePeak); > @@ -1301,6 +1868,22 @@ ExecHashTableReset(HashJoinTable hashtable) > > /* Forget the chunks (the memory was freed by the context reset > above). */ > hashtable->chunks = NULL; > + > + /* Rewind the shared read heads for this batch, inner and outer. */ > + if (hashtable->innerBatchFile[curbatch] != NULL) > + { > + if (BufFileSeek(hashtable->innerBatchFile[curbatch], 0, 0L, > SEEK_SET)) > + ereport(ERROR, > + (errcode_for_file_access(), > + errmsg("could not rewind hash-join > temporary file: %m"))); > + } > + if (hashtable->outerBatchFile[curbatch] != NULL) > + { > + if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, > SEEK_SET)) > + ereport(ERROR, > + (errcode_for_file_access(), > + errmsg("could not rewind hash-join > temporary file: %m"))); > + } > } > > /* > @@ -1310,12 +1893,21 @@ ExecHashTableReset(HashJoinTable hashtable) > void > ExecHashTableResetMatchFlags(HashJoinTable hashtable) > { > + dsa_pointer chunk_shared = InvalidDsaPointer; > HashMemoryChunk chunk; > HashJoinTuple tuple; > int i; > > /* Reset all flags in the main table ... */ > - chunk = hashtable->chunks; > + if (HashJoinTableIsShared(hashtable)) > + { > + /* This only runs in the leader during rescan initialization. > */ > + Assert(!IsParallelWorker()); > + hashtable->shared->chunk_work_queue = > hashtable->shared->chunks; > + chunk = pop_chunk_queue(hashtable, &chunk_shared); > + } > + else > + chunk = hashtable->chunks; > > Hm - doesn't pop_chunk_queue empty the work queue? Well first it puts the main chunks onto the work queue, and then it pops them off one by one clearing flags until there is nothing left on the work queue. But this is only running in one backend. It's not very exciting. Do you see a bug here? > +/* > + * Load a tuple into shared dense storage, like 'load_private_tuple'. This > + * version is for shared hash tables. > + */ > +static HashJoinTuple > +load_shared_tuple(HashJoinTable hashtable, MinimalTuple tuple, > + dsa_pointer *shared, bool respect_work_mem) > +{ > > Hm. Are there issues with "blessed" records being stored in shared > memory? I seem to recall you talking about it, but I see nothing > addressing the issue here? (later) Ah, I see - you just prohibit > paralleism in that case - might be worth pointing to. Note added. I had difficulty testing that. I couldn't create anonymous ROW(...) values without the project moving above the hash table. Andrew Gierth showed me a way to prevent that with OFFSET 0 but that disabled parallelism. I tested that code by writing extra test code to dump the output of tlist_references_transient_type() on the tlists of various test paths not in a parallel query. Ideas welcome, as I feel like this belongs in a regression test. > + /* Check if some other participant has increased nbatch. */ > + if (hashtable->shared->nbatch > hashtable->nbatch) > + { > + Assert(respect_work_mem); > + ExecHashIncreaseNumBatches(hashtable, > hashtable->shared->nbatch); > + } > + > + /* Check if we need to help shrinking. */ > + if (hashtable->shared->shrink_needed && respect_work_mem) > + { > + hashtable->current_chunk = NULL; > + LWLockRelease(&hashtable->shared->chunk_lock); > + return NULL; > + } > + > + /* Oversized tuples get their own chunk. */ > + if (size > HASH_CHUNK_THRESHOLD) > + chunk_size = size + HASH_CHUNK_HEADER_SIZE; > + else > + chunk_size = HASH_CHUNK_SIZE; > + > + /* If appropriate, check if work_mem would be exceeded by a new > chunk. */ > + if (respect_work_mem && > + hashtable->shared->grow_enabled && > + hashtable->shared->nbatch <= > MAX_BATCHES_BEFORE_INCREASES_STOP && > + (hashtable->shared->size + > + chunk_size) > (work_mem * 1024L * > + > hashtable->shared->planned_participants)) > + { > + /* > + * It would be exceeded. Let's increase the number of > batches, so we > + * can try to shrink the hash table. > + */ > + hashtable->shared->nbatch *= 2; > + ExecHashIncreaseNumBatches(hashtable, > hashtable->shared->nbatch); > + hashtable->shared->chunk_work_queue = > hashtable->shared->chunks; > + hashtable->shared->chunks = InvalidDsaPointer; > + hashtable->shared->shrink_needed = true; > + hashtable->current_chunk = NULL; > + LWLockRelease(&hashtable->shared->chunk_lock); > + > + /* The caller needs to shrink the hash table. */ > + return NULL; > + } > > Hm - we could end up calling ExecHashIncreaseNumBatches twice here? > Probably harmless. Yes. In the code higher up we could observe that someone else has increased the number of batches: here we are just updating our local hashtable->nbatch. Then further down we could decide that it needs to be done again because we work out that this allocation will push us over the work_mem limit. Really that function just *sets* the number of batches. It's really the code beginning hashtable->shared->nbatch *= 2 that is really increasing the number of batches and setting up the state for all participants to shrink the hash table and free up some memory. > > /* ---------------------------------------------------------------- > * ExecHashJoin > @@ -129,6 +200,14 @@ ExecHashJoin(HashJoinState *node) > /* no chance to not build the hash > table */ > node->hj_FirstOuterTupleSlot = NULL; > } > + else if (hashNode->shared_table_data != NULL) > + { > + /* > + * The empty-outer optimization is > not implemented for > + * shared hash tables yet. > + */ > + node->hj_FirstOuterTupleSlot = NULL; > > Hm, why is this checking for the shared-ness of the join in a different > manner? The usual manner is HashJoinTableIsShare(hashtable) but you see Assert(hashtable == NULL) a few lines earlier; this is the HJ_BUILD_HASHTABLE state where it hasn't been constructed yet. When ExecHashTableCreate (a bit further down) constructs it it'll assign hashtable->shared = state->shared_table_data (to point to a bit of DSM memory). The reason the usual test is based on the HashJoinTable pointer usually called 'hashtable' is because that is passed around almost everywhere so it's convenient to use that. > + if (HashJoinTableIsShared(hashtable)) > + { > + /* > + * An important optimization: > if this is a > + * single-batch join and not > an outer join, there is > + * no reason to synchronize > again when we've finished > + * probing. > + */ > + > Assert(BarrierPhase(&hashtable->shared->barrier) == > + > PHJ_PHASE_PROBING_BATCH(hashtable->curbatch)); > + if (hashtable->nbatch == 1 && > !HJ_FILL_INNER(node)) > + return NULL; /* > end of join */ > + > + /* > + * Check if we are a leader > that can't go further than > + * probing the first batch, > to avoid risk of deadlock > + * against workers. > + */ > + if > (!LeaderGateCanContinue(&hashtable->shared->leader_gate)) > + { > + /* > + * Other backends > will need to handle all future > + * batches written by > me. We don't detach until > + * after we've > finished writing to all batches so > + * that they are > flushed, otherwise another > + * participant might > try to read them too soon. > + */ > + > sts_end_write_all_partitions(hashNode->shared_inner_batches); > + > sts_end_write_all_partitions(hashNode->shared_outer_batches); > + > BarrierDetach(&hashtable->shared->barrier); > + > hashtable->detached_early = true; > + return NULL; > + } > + > + /* > + * We can't start searching > for unmatched tuples until > + * all participants have > finished probing, so we > + * synchronize here. > + */ > + > Assert(BarrierPhase(&hashtable->shared->barrier) == > + > PHJ_PHASE_PROBING_BATCH(hashtable->curbatch)); > + if > (BarrierWait(&hashtable->shared->barrier, > + > WAIT_EVENT_HASHJOIN_PROBING)) > + { > + /* Serial phase: > prepare for unmatched. */ > + if > (HJ_FILL_INNER(node)) > + { > + > hashtable->shared->chunk_work_queue = > + > hashtable->shared->chunks; > + > hashtable->shared->chunks = InvalidDsaPointer; > + } > + } > > Couldn't we skip that if this isn't an outer join? Not sure if the > complication would be worth it... Yes, well we don't even get this far in the very common case of a single batch inner join (see note above that about an "important optimization"). If it's outer you need this, and if there are multiple batches it hardly matters if you have to go through this extra step. But you're right that there are a few missed opportunities here and there. > +void > +ExecShutdownHashJoin(HashJoinState *node) > +{ > + /* > + * By the time ExecEndHashJoin runs in a work, shared memory has been > > s/work/worker/ Fixed. > + * destroyed. So this is our last chance to do any shared memory > cleanup. > + */ > + if (node->hj_HashTable) > + ExecHashTableDetach(node->hj_HashTable); > +} > > + There is no extra charge > + * for probing the hash table for outer path row, on the basis that > + * read-only access to a shared hash table shouldn't be any more > + * expensive. > + */ > > Hm, that's debatable. !shared will mostly be on the local numa node, > shared probably not. Agreed, NUMA surely changes the situation for probing. I wonder if it deserves a separate GUC. I'm actually quite hesitant to try to model things like that because it seems like a can of worms. I will try to come up with some numbers backed up with data though. Watch this space. > * Get hash table size that executor would use for inner relation. > * > + * Shared hash tables are allowed to use the work_mem of all > participants > + * combined to make up for the fact that there is only one copy > shared by > + * all. > > Hm. I don't quite understand that reasoning. Our model for memory usage limits is that every instance of an executor node is allowed to allocate up to work_mem. If I run a parallel hash join in 9.6 with 3 workers and I have set work_mem to 10MB, then the system will attempt to stay under 10MB in each participant, using up to 40MB across the 4 processes. The goal of Parallel Shared Hash is to divide the work of building the hash table up over the 4 backends, and combine the work_mem of the 4 backends to create a shared hash table. The total amount of memory used is the same, but we make much better use of it. Make sense? > * XXX for the moment, always assume that skew optimization will be > * performed. As long as SKEW_WORK_MEM_PERCENT is small, it's not > worth > * trying to determine that for sure. > > If we don't do skew for parallelism, should we skip that bit? I am looking into the skew optimisation. Will report back on that soon, and also try to get some data relevant to costing. -- Thomas Munro http://www.enterprisedb.com
parallel-shared-hash-v11.tgz
Description: GNU Zip compressed data
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers