Tim Armstrong has posted comments on this change.

Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
......................................................................


Patch Set 14:

(53 comments)

http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/analytic-eval-node.cc
File be/src/exec/analytic-eval-node.cc:

Line 207:         state, Substitute(PREPARE_FAILED_ERROR_MSG, "write"));
> old code and agg uses state_->block_mgr()->MemLimitTooLowError().  Any reas
Historical reasons it looks like:  the PrepareForRead() version below diverged 
from the PAGG/PHJ versions.

Switched to the block_mgr version for consistency.


http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/blocking-join-node.cc
File be/src/exec/blocking-join-node.cc:

PS14, Line 297: SCOPED_TIMER(build_timer_);
> We used to have two different timers in PHJ for measuring the time to hash 
It's not clear to me that the original timers were well thought out. It didn't 
look like there was a timer for how long it took to build the hash table - that 
was lumped into 'build_timer_'.

I added two new timers with clearer meanings that track the total hash table 
build timer and time spent partitioning rows. Also moved the remaining 
build-related timers into the Builder.

Now 'built_timer_' is the initial partitioning and hash table build, and 
'repartition_timer_' is the time spent repartitioning'.


http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-builder.cc
File be/src/exec/partitioned-hash-join-builder.cc:

Line 179:     largest_fraction = max(largest_fraction,
> DCHECK_GT(num_build_rows, 0);
I don't think that's valid, it's casting to double to avoid crashing on the 
divide-by-zero. I changed it to avoid the calculation if num_build_rows = 0 to 
avoid that problem.


PS14, Line 318: a hash table
> hash tables
Done. Both alternatives seem grammatically correct to me though.


Line 341:   // building hash tables to avoid building hash tables for 
partitions we'll spill anyway.
> That's an interesting comment. If I understand it correctly, it means that 
Done with some slight tweaks.

This is a lot simpler than the previous behaviour, where spilling can happen 
even during probing.

I didn't measure how often, but it's not at all improbable, since the probe 
buffers are 8MB each and we could allocate up to 16 of them.


Line 401:       RETURN_IF_ERROR(SpillPartition());
> May help to document that failure to find any partition to spill (e.g. all 
I added to the comment up the top of the loop to make the termination more 
explicit.


PS14, Line 529: PhjBuilder::HashTableStoresNulls()
> This seems to belong to PartitionedHashJoinNode conceptually.
I feel like it makes sense here though since the builder owns the hash tables.

Plumbing-wise it's also easier since PhjBuilder needs this info and starts 
executing before PartitionedHashJoinNode.


Line 646:   do {
> Please see comments in BlockingJoinNode,  it would be great to retain timer
Done


PS14, Line 782: process_build_batch_fn_level0
> 'process_build_batch_fn_level0_'
I just copied this verbatim - it seems to be referring to the local variable 
though.


http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-builder.h
File be/src/exec/partitioned-hash-join-builder.h:

Line 425: 
> nit: unnecessary blank line ?
Done


http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-node.cc
File be/src/exec/partitioned-hash-join-node.cc:

PS14, Line 151: 
              : 
              : 
> This may be important information to retain.
Not sure why I removed it. May have had a good reason but I can't recall it.


PS14, Line 87: Expr::CreateExprTree(pool_, eq_join_conjunct.right
> Does this overlap with CreateExprTree() in PhjBuilder::Init() ? Same questi
They are redundant currently, but the idea is that PhjBuilder and PhjNode will 
exist more independently in different plan fragments, so I wanted to 
encapsulate the state where possible.

The expr contexts need to be independent to eventually support many probe sides 
: 1 build side for broadcast joins.

I think documenting this explicitly may be confusing because it is sort-of 
explaining the current state of things relative to the previous way of doing 
things. Whereas I think with the separate build side, the default assumption is 
that builder data structures are private and not shared with the exec node.

It may make sense to share Expr trees globally between fragment instances as 
part of the multithreading changes, but I don't think it's worth trying to 
share them here until we have the right infrastructure.


Line 213:   for (unique_ptr<ProbePartition>& partition : spilled_partitions_)
> missing { }
Ah, missed that clang-format wrapped it.


PS14, Line 494: to output
> outputting
Done


PS14, Line 585: hash_partitions_
> 'hash_partitions_'
Done


Line 594:       continue;
> Is there a reason why we cannot call OutputUnmatchedBuild() directly at thi
I don't see any obvious reason why not. I think we could do a lot of 
simplification of this loop in general.


PS14, Line 996: s)
> next_state ?
Done


http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-node.h
File be/src/exec/partitioned-hash-join-node.h:

PS14, Line 437: 
> Is this merged into build_timer_ in BlockingJoinNode now ? It may be helpfu
I recreated this timer with a clearer name. I'm not convinced the original set 
of timers were particularly useful for fine-grained debugging, but I think the 
new set should be slightly better.

It's kind of tricky because you can slice it up in lots of different ways: 
different phases of the join algorithm x partitioning/building hash tables.


Line 79:   /// Logically, the algorithm has the following modes:.
> I think this comment could be a lot more useful if we explain how the phase
I think the comment was meant to be read after the class comment. I enhanced 
the class comment and moved this part up next to it.


PS14, Line 81: partition them into hash_partitions_. The input can 
> this comment is kind of disjoint since it talks about what the phase does, 
I reworked it a bit and described the state transitions at the end of each 
description.


PS14, Line 81: hash_partitions_
> 'hash_partitions_'
Done


PS14, Line 84: This is the only phase 
> These are the only phases
Done


PS14, Line 86: PROCESSING
> why do we use PROCESSING term here rather than PARTITIONING, which would be
I changed it. I think if all hash tables are in memory we're arguably not 
partitioning the probe since all we're doing is looking up hash tables. But by 
the same argument we shouldn't call it REPARTITIONING_PROBE. So it seems better 
to be consistent but arguably slightly. inaccurate.


PS14, Line 87: spilled
> single spilled
Reworded it to be clearer.


PS14, Line 88: probe_hash_partitions_
> 'probe_hash_partitions_'
Done


Line 89:   ///      table in memory, we perform the join, otherwise we spill 
the probe row.
> this last sentence is confusing since it mostly restates what the first say
Done - really reworded this completely.


PS14, Line 90: construct
> what does this phase construct? isn't the hash table already constructed?
Yeah, the hash table construction is essentially done as part of deciding 
whether to transition into this or REPARTITIONING_BUILD. should be more 
explicit.


PS14, Line 90: PROBING_SPILLED_PARTITION
> it would be good to be more upfront about when this phase is used and how, 
I think this should be a lot clearer - it's described in the initial high level 
algorithm and also in the state transitions.


PS14, Line 91: walking
> what does this mean? processing? 
I got rid of all instances of 'walk' in the header comments.

Hopefully the transitions will be clearer in the new version of this comment.


PS14, Line 97: *
> what does this star mean to say? We can go from REPARTITIONING_PROBE back t
Yeah, it made sense to me as a regular expression. I got rid of this since the 
transitions are documented elsewhere.


PS14, Line 112: input_partition_
> 'input_partition_'
Done


PS14, Line 118: input_partition_
> 'input_partition_'
Done


PS14, Line 143: buffer
> write buffer?
Done


PS14, Line 264: the
> its
Done


PS14, Line 267: the
> that
Done


PS14, Line 291: Walks
> What does this mean. Iterates over?
Yeah, not my wording - rephrased it.


PS14, Line 291: hash partitions
> is this talking about the probe hash partitions or the builder's?
Both in sync. Updated.


PS14, Line 298: spilled_partitions_
> 'spilled_partitions_'
Done


PS14, Line 302: probe_batch_pos_
> 'probe_batch_pos_'
Done


PS14, Line 306: probe_batch_pos_
> 'probe_batch_pos_'
Done


PS14, Line 309: input_partition_
> 'input_partition_'
Done


PS14, Line 384: builder_->hash_partitions
> is this referring to the entries in the builder_->hash_partition_ array?
Tried to clarify a bit.


PS14, Line 385: This is not used when processing a single partition.
> what does this mean?
Clarified in terms of the states defined above.. When we're processing a single 
spilled partition we don't need to repartition the probe.


PS14, Line 408: null_aware_
> 'null_aware_'
Done


PS14, Line 410:  we then iterate over all the probe rows
> I can't tell if this means literally all the probe rows, or all the rows in
Clarified that it's the partition's. Most of the text was just copied verbatim 
from the original comment - I didn't want to start rewriting all the comments 
and making this patch even larger.


PS14, Line 432: and
> this makes it sound like there are two conditions required to create a prob
Reworded


PS14, Line 436: preallocated
> what does this mean?
reworded to clarify


PS14, Line 437: and
> double and.  And is this saying the constructor prepares it or the caller s
Done


PS14, Line 443: should
> will
Done


Line 445:     /// block cannot be acquired. "delete_on_read" mode is used, so 
blocks in the stream
> ... used for the buffered tuple stream, so..
reworded in a slightly different way.


PS14, Line 464: meaning
              :     /// it has to call Close() on it) but transferred to the 
parent exec node
> this doesn't seem right. don't we either Close() it or transfer it (not bot
I interpreted the comment as meaning that it has to call Close() on it unless 
it transfers ownership. I just removed the parenthetical statement since it's 
more confusing than clarifying.


http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/util/runtime-profile.cc
File be/src/util/runtime-profile.cc:

PS14, Line 409: DCHECK
> DCHECK_EQ
Removed this line as part of refactoring.


Line 412:   children_.insert(children_.begin(), child_entry);
> rather than having two special cases (here and line 395-399, how about maki
Done


-- 
To view, visit http://gerrit.cloudera.org:8080/3873
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb
Gerrit-PatchSet: 14
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Tim Armstrong <tarmstr...@cloudera.com>
Gerrit-Reviewer: Alex Behm <alex.b...@cloudera.com>
Gerrit-Reviewer: Dan Hecht <dhe...@cloudera.com>
Gerrit-Reviewer: Michael Ho
Gerrit-Reviewer: Michael Ho <k...@cloudera.com>
Gerrit-Reviewer: Tim Armstrong <tarmstr...@cloudera.com>
Gerrit-HasComments: Yes

Reply via email to