[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Internal Jenkins has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 22: Verified+1 -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 22 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Internal Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Internal Jenkins has submitted this change and it was merged. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder The main outcome of this patch is to split out PhjBuilder from PartitionedHashJoinNode, which manages the build partitions for the join and implements the DataSink interface. A lot of this work is fairly mechanical refactoring: dividing the state between the two classes and duplicating it where appropriate. One major change is that probe partitions need to be separated from build partitions. This required some significant algorithmic changes to memory management for probe partition buffers: memory management for probe partitions was entangled with the build partitions: small buffers were allocated for the probe partitions even for non-spilling joins and the join could spill additional partitions during probing if the probe partitions needed to switch from small to I/O buffers. The changes made were: - Probe partitions are only initialized after the build is partitioned, and only for spilled build partitions. - Probe partitions never use small buffers: once the initial write buffer is allocated, appending to the probe partition never fails. - All probe partition allocation is done after partitioning the build and before processing the probe input during the same phase as hash table building. (Aside from NAAJ partitions which are allocated upfront). The probe partition changes necessitated a change in BufferedTupleStream: allocation of write blocks is now explicit via the PrepareForWrite() API. Testing: Ran exhaustive build and local stress test. Memory Usage: Ran stress test binary search locally for TPC-DS SF-1 and TPC-H SF-20. No regressions on TPC-DS. TPC-H either stayed the same or improved in min memory requirement without spilling, but the min memory requirement with spilling regressed in some cases. I investigated each of the significant regressions on TPC-H and determined that they were all due to exec nodes racing for spillable or non-spillable memory. None of them were cases where exec nodes got their minimum reservation and failed to execute the spilling algorithm correctly. Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Reviewed-on: http://gerrit.cloudera.org:8080/3873 Reviewed-by: Tim Armstrong Reviewed-by: Dan Hecht Tested-by: Internal Jenkins --- M .clang-format M be/src/codegen/gen_ir_descriptions.py M be/src/codegen/impala-ir.cc M be/src/exec/CMakeLists.txt M be/src/exec/analytic-eval-node.cc M be/src/exec/blocking-join-node.cc M be/src/exec/blocking-join-node.h M be/src/exec/data-sink.cc M be/src/exec/hash-join-node.cc M be/src/exec/nested-loop-join-builder.cc M be/src/exec/nested-loop-join-builder.h M be/src/exec/nested-loop-join-node.cc M be/src/exec/partitioned-aggregation-node.cc A be/src/exec/partitioned-hash-join-builder-ir.cc A be/src/exec/partitioned-hash-join-builder.cc A be/src/exec/partitioned-hash-join-builder.h M be/src/exec/partitioned-hash-join-node-ir.cc M be/src/exec/partitioned-hash-join-node.cc M be/src/exec/partitioned-hash-join-node.h M be/src/exec/partitioned-hash-join-node.inline.h M be/src/runtime/buffered-block-mgr.cc M be/src/runtime/buffered-tuple-stream-test.cc M be/src/runtime/buffered-tuple-stream.cc M be/src/runtime/buffered-tuple-stream.h M be/src/util/runtime-profile.cc M be/src/util/runtime-profile.h M tests/stress/concurrent_select.py 27 files changed, 2,264 insertions(+), 1,490 deletions(-) Approvals: Internal Jenkins: Verified Dan Hecht: Looks good to me, approved Tim Armstrong: Looks good to me, approved -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: merged Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 23 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Internal Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Dan Hecht has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 22: Code-Review+2 -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 22 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Tim Armstrong has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 22: Code-Review+2 Rebase, carry +2 -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 22 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Tim Armstrong has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 20: (1 comment) http://gerrit.cloudera.org:8080/#/c/3873/20/be/src/exec/partitioned-hash-join-node.cc File be/src/exec/partitioned-hash-join-node.cc: Line 606: } > Done Oops, added wrong DCHECK. Fixed now -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 20 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Hello Michael Ho, Alex Behm, Dan Hecht, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/3873 to look at the new patch set (#22). Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder The main outcome of this patch is to split out PhjBuilder from PartitionedHashJoinNode, which manages the build partitions for the join and implements the DataSink interface. A lot of this work is fairly mechanical refactoring: dividing the state between the two classes and duplicating it where appropriate. One major change is that probe partitions need to be separated from build partitions. This required some significant algorithmic changes to memory management for probe partition buffers: memory management for probe partitions was entangled with the build partitions: small buffers were allocated for the probe partitions even for non-spilling joins and the join could spill additional partitions during probing if the probe partitions needed to switch from small to I/O buffers. The changes made were: - Probe partitions are only initialized after the build is partitioned, and only for spilled build partitions. - Probe partitions never use small buffers: once the initial write buffer is allocated, appending to the probe partition never fails. - All probe partition allocation is done after partitioning the build and before processing the probe input during the same phase as hash table building. (Aside from NAAJ partitions which are allocated upfront). The probe partition changes necessitated a change in BufferedTupleStream: allocation of write blocks is now explicit via the PrepareForWrite() API. Testing: Ran exhaustive build and local stress test. Memory Usage: Ran stress test binary search locally for TPC-DS SF-1 and TPC-H SF-20. No regressions on TPC-DS. TPC-H either stayed the same or improved in min memory requirement without spilling, but the min memory requirement with spilling regressed in some cases. I investigated each of the significant regressions on TPC-H and determined that they were all due to exec nodes racing for spillable or non-spillable memory. None of them were cases where exec nodes got their minimum reservation and failed to execute the spilling algorithm correctly. Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb --- M .clang-format M be/src/codegen/gen_ir_descriptions.py M be/src/codegen/impala-ir.cc M be/src/exec/CMakeLists.txt M be/src/exec/analytic-eval-node.cc M be/src/exec/blocking-join-node.cc M be/src/exec/blocking-join-node.h M be/src/exec/data-sink.cc M be/src/exec/hash-join-node.cc M be/src/exec/nested-loop-join-builder.cc M be/src/exec/nested-loop-join-builder.h M be/src/exec/nested-loop-join-node.cc M be/src/exec/partitioned-aggregation-node.cc A be/src/exec/partitioned-hash-join-builder-ir.cc A be/src/exec/partitioned-hash-join-builder.cc A be/src/exec/partitioned-hash-join-builder.h M be/src/exec/partitioned-hash-join-node-ir.cc M be/src/exec/partitioned-hash-join-node.cc M be/src/exec/partitioned-hash-join-node.h M be/src/exec/partitioned-hash-join-node.inline.h M be/src/runtime/buffered-block-mgr.cc M be/src/runtime/buffered-tuple-stream-test.cc M be/src/runtime/buffered-tuple-stream.cc M be/src/runtime/buffered-tuple-stream.h M be/src/util/runtime-profile.cc M be/src/util/runtime-profile.h M tests/stress/concurrent_select.py 27 files changed, 2,264 insertions(+), 1,490 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/73/3873/22 -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: newpatchset Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 22 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Hello Michael Ho, Alex Behm, Dan Hecht, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/3873 to look at the new patch set (#21). Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder The main outcome of this patch is to split out PhjBuilder from PartitionedHashJoinNode, which manages the build partitions for the join and implements the DataSink interface. A lot of this work is fairly mechanical refactoring: dividing the state between the two classes and duplicating it where appropriate. One major change is that probe partitions need to be separated from build partitions. This required some significant algorithmic changes to memory management for probe partition buffers: memory management for probe partitions was entangled with the build partitions: small buffers were allocated for the probe partitions even for non-spilling joins and the join could spill additional partitions during probing if the probe partitions needed to switch from small to I/O buffers. The changes made were: - Probe partitions are only initialized after the build is partitioned, and only for spilled build partitions. - Probe partitions never use small buffers: once the initial write buffer is allocated, appending to the probe partition never fails. - All probe partition allocation is done after partitioning the build and before processing the probe input during the same phase as hash table building. (Aside from NAAJ partitions which are allocated upfront). The probe partition changes necessitated a change in BufferedTupleStream: allocation of write blocks is now explicit via the PrepareForWrite() API. Testing: Ran exhaustive build and local stress test. Memory Usage: Ran stress test binary search locally for TPC-DS SF-1 and TPC-H SF-20. No regressions on TPC-DS. TPC-H either stayed the same or improved in min memory requirement without spilling, but the min memory requirement with spilling regressed in some cases. I investigated each of the significant regressions on TPC-H and determined that they were all due to exec nodes racing for spillable or non-spillable memory. None of them were cases where exec nodes got their minimum reservation and failed to execute the spilling algorithm correctly. Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb --- M .clang-format M be/src/codegen/gen_ir_descriptions.py M be/src/codegen/impala-ir.cc M be/src/exec/CMakeLists.txt M be/src/exec/analytic-eval-node.cc M be/src/exec/blocking-join-node.cc M be/src/exec/blocking-join-node.h M be/src/exec/data-sink.cc M be/src/exec/hash-join-node.cc M be/src/exec/nested-loop-join-builder.cc M be/src/exec/nested-loop-join-builder.h M be/src/exec/nested-loop-join-node.cc M be/src/exec/partitioned-aggregation-node.cc A be/src/exec/partitioned-hash-join-builder-ir.cc A be/src/exec/partitioned-hash-join-builder.cc A be/src/exec/partitioned-hash-join-builder.h M be/src/exec/partitioned-hash-join-node-ir.cc M be/src/exec/partitioned-hash-join-node.cc M be/src/exec/partitioned-hash-join-node.h M be/src/exec/partitioned-hash-join-node.inline.h M be/src/runtime/buffered-block-mgr.cc M be/src/runtime/buffered-tuple-stream-test.cc M be/src/runtime/buffered-tuple-stream.cc M be/src/runtime/buffered-tuple-stream.h M be/src/util/runtime-profile.cc M be/src/util/runtime-profile.h M tests/stress/concurrent_select.py 27 files changed, 2,264 insertions(+), 1,490 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/73/3873/21 -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: newpatchset Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 21 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Tim Armstrong has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 20: (1 comment) http://gerrit.cloudera.org:8080/#/c/3873/20/be/src/exec/partitioned-hash-join-node.cc File be/src/exec/partitioned-hash-join-node.cc: Line 606: } > DCHECK(build_->null_aware_partition() == NULL); Done -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 20 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Michael Ho has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 20: Code-Review+1 (1 comment) http://gerrit.cloudera.org:8080/#/c/3873/20/be/src/exec/partitioned-hash-join-node.cc File be/src/exec/partitioned-hash-join-node.cc: Line 606: } DCHECK(build_->null_aware_partition() == NULL); -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 20 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Tim Armstrong has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 14: (11 comments) http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/blocking-join-node.cc File be/src/exec/blocking-join-node.cc: PS14, Line 297: SCOPED_TIMER(build_timer_); > I am mostly concerned about preserving "BuildPartitionTime" and "BuildTime" Ah, you're right, the older code for some reason didn't include the partitioning time in BuildTime. This doesn't make a lot of sense to me, since build_timer_ is a BlockingJoinNode-level concept, where we shouldn't care about the implementation details of the build phase. With the old code, I don't think someone would guess that meaning of "BuildTime" without looking at the code. I think the new decomposition of BuildTime covering the whole initial build phase, then BuildRowsPartitionTime and HashTablesBuildTime covering the time spent in those operations is more intuitive. I guess it could cause some confusion comparing profiles from different versions, but I think it would be better to clean things up here instead of trying to preserve the profile counters exactly. The profile layout is changing anyway with the patch so probably a good time to do it. http://gerrit.cloudera.org:8080/#/c/3873/19/be/src/exec/partitioned-hash-join-builder.cc File be/src/exec/partitioned-hash-join-builder.cc: PS19, Line 377: ; > if Done http://gerrit.cloudera.org:8080/#/c/3873/19/be/src/exec/partitioned-hash-join-node.cc File be/src/exec/partitioned-hash-join-node.cc: Line 77: const vector& eq_join_conjuncts = > It helps to add a comment like: I added a different comment with the same intent - just pointing out that allocating PhjBuidler like this is a temporary measure. PS19, Line 479: while (true) { : DCHECK(status.ok()); : DCHECK_NE(state_, PARTITIONING_BUILD) << "Should not be in GetNext()"; : RETURN_IF_CANCELLED(state); : RETURN_IF_ERROR(QueryMaintenance(state)); : : if (!output_build_partitions_.empty()) { > It seems unfortunate that we cannot remove this given the change at line 58 Exactly, it's necessary for that case. My feeling is that we shouldn't try to refactor this loop piecemeal, rather we should just figure out the right abstraction/organisation (maybe a state machine plus some logical decomposition into functions?) and completely clean it up. Not in this patch though. Line 554: if (out_batch->AtCapacity() || ReachedLimit()) break; > DCHECK(input_partition_ == NULL); Done PS19, Line 599: if (got_partition) continue; // Probe the spilled partition. : > Not your change but I find this duplicated check with line 501 a bit confus Done. Should be equivalent, since the null_aware_partition is only non-NULL for NAAJ, and there are DCHECKs in PrepareNullAwarePartition() that both this partitions are non-NULL (it's not reachable otherwise because of the checks up the top of the loop). PS19, Line 724: > I suppose it's good to avoid small buffers on all probe partitions for cons Yes potentially, it could increase it by up to 16MB but I think it's worth accepting that increase for a rarely-used join type in order to avoid the possibility of spilling while probing. Line 740: state, this, builder_->null_aware_partition(), std::move(probe_rows))); > Is there an error path in which probe_rows == NULL ? Looks like line 725 im Should be ok to remove it. PS19, Line 751: > Same question as above. See response above. PS19, Line 968: ore > nit: newly created Done http://gerrit.cloudera.org:8080/#/c/3873/19/be/src/util/runtime-profile.h File be/src/util/runtime-profile.h: PS19, Line 129: AddChildFirst > Feel free to ignore but PrependChild() seems more appropriate. Done -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 14 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Hello Alex Behm, Dan Hecht, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/3873 to look at the new patch set (#20). Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder The main outcome of this patch is to split out PhjBuilder from PartitionedHashJoinNode, which manages the build partitions for the join and implements the DataSink interface. A lot of this work is fairly mechanical refactoring: dividing the state between the two classes and duplicating it where appropriate. One major change is that probe partitions need to be separated from build partitions. This required some significant algorithmic changes to memory management for probe partition buffers: memory management for probe partitions was entangled with the build partitions: small buffers were allocated for the probe partitions even for non-spilling joins and the join could spill additional partitions during probing if the probe partitions needed to switch from small to I/O buffers. The changes made were: - Probe partitions are only initialized after the build is partitioned, and only for spilled build partitions. - Probe partitions never use small buffers: once the initial write buffer is allocated, appending to the probe partition never fails. - All probe partition allocation is done after partitioning the build and before processing the probe input during the same phase as hash table building. (Aside from NAAJ partitions which are allocated upfront). The probe partition changes necessitated a change in BufferedTupleStream: allocation of write blocks is now explicit via the PrepareForWrite() API. Testing: Ran exhaustive build and local stress test. Memory Usage: Ran stress test binary search locally for TPC-DS SF-1 and TPC-H SF-20. No regressions on TPC-DS. TPC-H either stayed the same or improved in min memory requirement without spilling, but the min memory requirement with spilling regressed in some cases. I investigated each of the significant regressions on TPC-H and determined that they were all due to exec nodes racing for spillable or non-spillable memory. None of them were cases where exec nodes got their minimum reservation and failed to execute the spilling algorithm correctly. Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb --- M .clang-format M be/src/codegen/gen_ir_descriptions.py M be/src/codegen/impala-ir.cc M be/src/exec/CMakeLists.txt M be/src/exec/analytic-eval-node.cc M be/src/exec/blocking-join-node.cc M be/src/exec/blocking-join-node.h M be/src/exec/data-sink.cc M be/src/exec/hash-join-node.cc M be/src/exec/nested-loop-join-builder.cc M be/src/exec/nested-loop-join-builder.h M be/src/exec/nested-loop-join-node.cc M be/src/exec/partitioned-aggregation-node.cc A be/src/exec/partitioned-hash-join-builder-ir.cc A be/src/exec/partitioned-hash-join-builder.cc A be/src/exec/partitioned-hash-join-builder.h M be/src/exec/partitioned-hash-join-node-ir.cc M be/src/exec/partitioned-hash-join-node.cc M be/src/exec/partitioned-hash-join-node.h M be/src/exec/partitioned-hash-join-node.inline.h M be/src/runtime/buffered-block-mgr.cc M be/src/runtime/buffered-tuple-stream-test.cc M be/src/runtime/buffered-tuple-stream.cc M be/src/runtime/buffered-tuple-stream.h M be/src/util/runtime-profile.cc M be/src/util/runtime-profile.h M tests/stress/concurrent_select.py 27 files changed, 2,263 insertions(+), 1,490 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/73/3873/20 -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: newpatchset Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 20 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Michael Ho has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 19: (12 comments) http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/blocking-join-node.cc File be/src/exec/blocking-join-node.cc: PS14, Line 297: SCOPED_TIMER(build_timer_); > It's not clear to me that the original timers were well thought out. It did I am mostly concerned about preserving "BuildPartitionTime" and "BuildTime" which exist in the profile now. The former seems to measure the total time to partition all input rows into their partitions while the latter measures the total time to insert the non-spilled partitions into their corresponding hash tables. http://gerrit.cloudera.org:8080/#/c/3873/19/be/src/exec/partitioned-hash-join-builder.cc File be/src/exec/partitioned-hash-join-builder.cc: PS19, Line 377: is if http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-node.cc File be/src/exec/partitioned-hash-join-node.cc: PS14, Line 87: > They are redundant currently, but the idea is that PhjBuilder and PhjNode w OK. To be able to probe the same hash table in parallel, it's necessary to have the thread-private build_expr_ctx_. http://gerrit.cloudera.org:8080/#/c/3873/19/be/src/exec/partitioned-hash-join-node.cc File be/src/exec/partitioned-hash-join-node.cc: Line 77: builder_.reset( It helps to add a comment like: TODO: For now, the build_side_expr_ may be duplicated for now but the builder_ need to have the build_side_expr_ once the builder is separated out into a different fragment. PS19, Line 479: if (!output_build_partitions_.empty()) { : DCHECK(NeedToProcessUnmatchedBuildRows()); : : // Flush the remaining unmatched build rows of any partitions we are done : // processing before moving onto the next partition. : OutputUnmatchedBuild(out_batch); : if (!output_build_partitions_.empty()) break; It seems unfortunate that we cannot remove this given the change at line 588. I suppose this is needed if multiple output batches are needed full to hold all the unmatched build rows. The duplicated code block at line 487 almost looks like we should use a goto statement. Do you think there is a good way to refactor the code a bit ? Line 554: if (state_ == PARTITIONING_PROBE) { DCHECK(input_partition_ == NULL); PS19, Line 599: if (builder_->null_aware_partition() == NULL) { : DCHECK(null_aware_probe_partition_ == NULL); Not your change but I find this duplicated check with line 501 a bit confusing here. Essentially, this is a slightly different check as it covers cases for all joins while the one at line 501 seems to cover the case in which the null aware partition has been processed completely if I understand it correctly. I wonder if you can move this check into the if statement at line 599 and make it if (build_->null_aware_partition() != NULL) continue; And move *eos = true; to the common path shared by all join types. PS19, Line 724: false I suppose it's good to avoid small buffers on all probe partitions for consistency but does this increase the memory usage of NAAJ ? Line 740: if (probe_rows != NULL) { Is there an error path in which probe_rows == NULL ? Looks like line 725 implies probe_rows != NULL. PS19, Line 751: false Same question as above. PS19, Line 968: new nit: newly created http://gerrit.cloudera.org:8080/#/c/3873/19/be/src/util/runtime-profile.h File be/src/util/runtime-profile.h: PS19, Line 129: AddChildFirst Feel free to ignore but PrependChild() seems more appropriate. -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 19 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Dan Hecht has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 19: Code-Review+2 This looks fine to me for the first step but please check with Michael if he has any more comments. -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 19 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Tim Armstrong has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 19: Code-Review+1 carry +1 -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 19 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Hello Alex Behm, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/3873 to look at the new patch set (#19). Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder The main outcome of this patch is to split out PhjBuilder from PartitionedHashJoinNode, which manages the build partitions for the join and implements the DataSink interface. A lot of this work is fairly mechanical refactoring: dividing the state between the two classes and duplicating it where appropriate. One major change is that probe partitions need to be separated from build partitions. This required some significant algorithmic changes to memory management for probe partition buffers: memory management for probe partitions was entangled with the build partitions: small buffers were allocated for the probe partitions even for non-spilling joins and the join could spill additional partitions during probing if the probe partitions needed to switch from small to I/O buffers. The changes made were: - Probe partitions are only initialized after the build is partitioned, and only for spilled build partitions. - Probe partitions never use small buffers: once the initial write buffer is allocated, appending to the probe partition never fails. - All probe partition allocation is done after partitioning the build and before processing the probe input during the same phase as hash table building. (Aside from NAAJ partitions which are allocated upfront). The probe partition changes necessitated a change in BufferedTupleStream: allocation of write blocks is now explicit via the PrepareForWrite() API. Testing: Ran exhaustive build and local stress test. Memory Usage: Ran stress test binary search locally for TPC-DS SF-1 and TPC-H SF-20. No regressions on TPC-DS. TPC-H either stayed the same or improved in min memory requirement without spilling, but the min memory requirement with spilling regressed in some cases. I investigated each of the significant regressions on TPC-H and determined that they were all due to exec nodes racing for spillable or non-spillable memory. None of them were cases where exec nodes got their minimum reservation and failed to execute the spilling algorithm correctly. Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb --- M .clang-format M be/src/codegen/gen_ir_descriptions.py M be/src/codegen/impala-ir.cc M be/src/exec/CMakeLists.txt M be/src/exec/analytic-eval-node.cc M be/src/exec/blocking-join-node.cc M be/src/exec/blocking-join-node.h M be/src/exec/data-sink.cc M be/src/exec/hash-join-node.cc M be/src/exec/nested-loop-join-builder.cc M be/src/exec/nested-loop-join-builder.h M be/src/exec/nested-loop-join-node.cc M be/src/exec/partitioned-aggregation-node.cc A be/src/exec/partitioned-hash-join-builder-ir.cc A be/src/exec/partitioned-hash-join-builder.cc A be/src/exec/partitioned-hash-join-builder.h M be/src/exec/partitioned-hash-join-node-ir.cc M be/src/exec/partitioned-hash-join-node.cc M be/src/exec/partitioned-hash-join-node.h M be/src/exec/partitioned-hash-join-node.inline.h M be/src/runtime/buffered-block-mgr.cc M be/src/runtime/buffered-tuple-stream-test.cc M be/src/runtime/buffered-tuple-stream.cc M be/src/runtime/buffered-tuple-stream.h M be/src/util/runtime-profile.cc M be/src/util/runtime-profile.h M tests/stress/concurrent_select.py 27 files changed, 2,262 insertions(+), 1,491 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/73/3873/19 -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: newpatchset Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 19 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Tim Armstrong has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 18: (18 comments) http://gerrit.cloudera.org:8080/#/c/3873/18/be/src/exec/partitioned-hash-join-builder.cc File be/src/exec/partitioned-hash-join-builder.cc: PS18, Line 326: we'd like all partitions to > ... all partitions .. Done PS18, Line 327: be > are Done Line 353: // building hash tables because allocating probe buffers may cause some more partitions > duplicated sentences Done PS18, Line 797: filters_.size() > 0) > this doesn't seem to exactly match the logic at line 166 (that doesn't care I simplified the code around line 166 by calculating bool build_filters; outside of the branches. Should be equivalent and easier to reason about. http://gerrit.cloudera.org:8080/#/c/3873/18/be/src/exec/partitioned-hash-join-builder.h File be/src/exec/partitioned-hash-join-builder.h: Line 276: /// Returns non-ok status if we couldn't spill a partition. > nit: weird line breaking. Done PS18, Line 280: should have all of their build rows > what is this trying to say? that we're done partitioning the build? Yeah, clarified in terms of FlushFinal(). PS18, Line 286: no buffers in either the build or probe stream. > clarify whether this means neither has buffers or at least one does not hav Done PS18, Line 287: in_mem > are these states actually represented in code? (if not, why the underscore? Done PS18, Line 291: probe_buffers_for_spilled_partitions_ > what's that? Ah, stale comment from earlier version of the code. Updated it. PS18, Line 301: probe_buffers_for_spilled_partitions_ > same Ah, this method doesn't even exist and more. Removed up updated the CloseAndDeletePartitions() comment (since it does that work now).; PS18, Line 349: PartitionedHashJoinNode > who owns it? Clarified. Also added a TODO to indicate this will need to be updated for the spilling broadcast join case when we come to design a solution for that. PS18, Line 427: so that the : /// initial probe buffer is allocated > this doesn't really explain why the builder has to do it. Would be good to Done Line 432: /// phase of the algorithm without spilling more partitions. > this hints at it, so maybe move this up and reword to be more explicit. Done http://gerrit.cloudera.org:8080/#/c/3873/18/be/src/exec/partitioned-hash-join-node-ir.cc File be/src/exec/partitioned-hash-join-node-ir.cc: PS18, Line 285: *status > if we can now do this, can we also just make NextProbeRow() and others retu IIRC I started to do that refactoring at one point but quickly realised that it didn't work. NextProbeRow() returns false both when status is an error and when it does not have a next row to move to. Line 293: if (LIKELY(hash_tbl != NULL)) { > is probe_partition NULL in this case? Yes it is. I added additional DCHECKs to PrepareForProbe() to validate the consistency of the partition state. I think adding additional DCHECKs here would obscure the logic somewhat. I thought it was more helpful below since the logic relied more directly on the correlation between the different partitions. http://gerrit.cloudera.org:8080/#/c/3873/18/be/src/exec/partitioned-hash-join-node.cc File be/src/exec/partitioned-hash-join-node.cc: Line 47: "the memory limit may help this query to complete successfully."; > weird line breakages Done http://gerrit.cloudera.org:8080/#/c/3873/18/be/src/exec/partitioned-hash-join-node.h File be/src/exec/partitioned-hash-join-node.h: Line 404: /// builder_->hash_partitions. This is non-empty only in the PARTITIONING_PROBE or > underscore, right? I guess the accessor is hash_partitions() so changed it to that. PS18, Line 406: a > and? Done -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 18 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Dan Hecht has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 18: (18 comments) http://gerrit.cloudera.org:8080/#/c/3873/18/be/src/exec/partitioned-hash-join-builder.cc File be/src/exec/partitioned-hash-join-builder.cc: PS18, Line 326: we'd like all partitions to ... all partitions .. PS18, Line 327: be are (to match above edit) Line 353: // building hash tables because allocating probe buffers may cause some more partitions duplicated sentences PS18, Line 797: filters_.size() > 0) this doesn't seem to exactly match the logic at line 166 (that doesn't care about filters_.size(). http://gerrit.cloudera.org:8080/#/c/3873/18/be/src/exec/partitioned-hash-join-builder.h File be/src/exec/partitioned-hash-join-builder.h: Line 276: /// Returns non-ok status if we couldn't spill a partition. nit: weird line breaking. PS18, Line 280: should have all of their build rows what is this trying to say? that we're done partitioning the build? PS18, Line 286: no buffers in either the build or probe stream. clarify whether this means neither has buffers or at least one does not have buffers (the latter being what the current wording seems to imply). PS18, Line 287: in_mem are these states actually represented in code? (if not, why the underscore?). also, how about pointing at that for #2, no probe streams are needed. PS18, Line 291: probe_buffers_for_spilled_partitions_ what's that? PS18, Line 301: probe_buffers_for_spilled_partitions_ same PS18, Line 349: PartitionedHashJoinNode who owns it? PS18, Line 427: so that the : /// initial probe buffer is allocated this doesn't really explain why the builder has to do it. Would be good to explain that. i.e. this can lead to more spilling Line 432: /// phase of the algorithm without spilling more partitions. this hints at it, so maybe move this up and reword to be more explicit. http://gerrit.cloudera.org:8080/#/c/3873/18/be/src/exec/partitioned-hash-join-node-ir.cc File be/src/exec/partitioned-hash-join-node-ir.cc: PS18, Line 285: *status if we can now do this, can we also just make NextProbeRow() and others return Status directly? Line 293: if (LIKELY(hash_tbl != NULL)) { is probe_partition NULL in this case? http://gerrit.cloudera.org:8080/#/c/3873/18/be/src/exec/partitioned-hash-join-node.cc File be/src/exec/partitioned-hash-join-node.cc: Line 47: "the memory limit may help this query to complete successfully."; weird line breakages http://gerrit.cloudera.org:8080/#/c/3873/18/be/src/exec/partitioned-hash-join-node.h File be/src/exec/partitioned-hash-join-node.h: Line 404: /// builder_->hash_partitions. This is non-empty only in the PARTITIONING_PROBE or underscore, right? PS18, Line 406: a and? -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 18 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Dan Hecht has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 14: (1 comment) http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-builder.h File be/src/exec/partitioned-hash-join-builder.h: PS14, Line 90: Acquire ownership > I find release confusing, makes me think that the streams are being "releas It is being released from the object (the builder). But, I'm fine with Transfer too. -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 14 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Hello Alex Behm, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/3873 to look at the new patch set (#18). Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder The main outcome of this patch is to split out PhjBuilder from PartitionedHashJoinNode, which manages the build partitions for the join and implements the DataSink interface. A lot of this work is fairly mechanical refactoring: dividing the state between the two classes and duplicating it where appropriate. One major change is that probe partitions need to be separated from build partitions. This required some significant algorithmic changes to memory management for probe partition buffers: memory management for probe partitions was entangled with the build partitions: small buffers were allocated for the probe partitions even for non-spilling joins and the join could spill additional partitions during probing if the probe partitions needed to switch from small to I/O buffers. The changes made were: - Probe partitions are only initialized after the build is partitioned, and only for spilled build partitions. - Probe partitions never use small buffers: once the initial write buffer is allocated, appending to the probe partition never fails. - All probe partition allocation is done after partitioning the build and before processing the probe input during the same phase as hash table building. (Aside from NAAJ partitions which are allocated upfront). The probe partition changes necessitated a change in BufferedTupleStream: allocation of write blocks is now explicit via the PrepareForWrite() API. Testing: Ran exhaustive build and local stress test. Memory Usage: Ran stress test binary search locally for TPC-DS SF-1 and TPC-H SF-20. No regressions on TPC-DS. TPC-H either stayed the same or improved in min memory requirement without spilling, but the min memory requirement with spilling regressed in some cases. I investigated each of the significant regressions on TPC-H and determined that they were all due to exec nodes racing for spillable or non-spillable memory. None of them were cases where exec nodes got their minimum reservation and failed to execute the spilling algorithm correctly. Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb --- M .clang-format M be/src/codegen/gen_ir_descriptions.py M be/src/codegen/impala-ir.cc M be/src/exec/CMakeLists.txt M be/src/exec/analytic-eval-node.cc M be/src/exec/blocking-join-node.cc M be/src/exec/blocking-join-node.h M be/src/exec/data-sink.cc M be/src/exec/hash-join-node.cc M be/src/exec/nested-loop-join-builder.cc M be/src/exec/nested-loop-join-builder.h M be/src/exec/nested-loop-join-node.cc M be/src/exec/partitioned-aggregation-node.cc A be/src/exec/partitioned-hash-join-builder-ir.cc A be/src/exec/partitioned-hash-join-builder.cc A be/src/exec/partitioned-hash-join-builder.h M be/src/exec/partitioned-hash-join-node-ir.cc M be/src/exec/partitioned-hash-join-node.cc M be/src/exec/partitioned-hash-join-node.h M be/src/exec/partitioned-hash-join-node.inline.h M be/src/runtime/buffered-block-mgr.cc M be/src/runtime/buffered-tuple-stream-test.cc M be/src/runtime/buffered-tuple-stream.cc M be/src/runtime/buffered-tuple-stream.h M be/src/util/runtime-profile.cc M be/src/util/runtime-profile.h M tests/stress/concurrent_select.py 27 files changed, 2,242 insertions(+), 1,491 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/73/3873/18 -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: newpatchset Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 18 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Tim Armstrong has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 18: Code-Review+1 Carry +_1 -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 18 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Tim Armstrong has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 17: (6 comments) http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-builder.h File be/src/exec/partitioned-hash-join-builder.h: PS14, Line 90: > I guess one term used elsewhere is "release", right? (e.g. C++ unique_ptr). I find release confusing, makes me think that the streams are being "released" from the join as a whole. I changed it to Transfer() since that seems more neutral about the directionality. http://gerrit.cloudera.org:8080/#/c/3873/15/be/src/exec/partitioned-hash-join-node.cc File be/src/exec/partitioned-hash-join-node.cc: Line 372: { RETURN_IF_ERROR(build_partition->BuildHashTable(&built)); } > extraneous braces Done Line 395: } > same Done http://gerrit.cloudera.org:8080/#/c/3873/15/be/src/exec/partitioned-hash-join-node.h File be/src/exec/partitioned-hash-join-node.h: PS15, Line 81: build > probe Done PS15, Line 405: > nit: extra space Done http://gerrit.cloudera.org:8080/#/c/3873/15/be/src/util/runtime-profile.cc File be/src/util/runtime-profile.cc: Line 391: } > alternatively: initialize insert_pos to begin and then DCHECK_NE(insert_pos I think I'll stay with this approach. The alternative seems a little subtle (it depends on us adding + 1 to it above) -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 17 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Dan Hecht has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 17: (10 comments) These are the comments from re-reviewing the addressed comments. Will finish off the rest of builder.h and then the .cc files next. http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-builder.h File be/src/exec/partitioned-hash-join-builder.h: PS14, Line 90: > It passes from the class to the caller. I guess I don't find this ambiguous I guess one term used elsewhere is "release", right? (e.g. C++ unique_ptr). I think it's better than "acquire" given the direction of resource transfer, so that's one option. The reworded comment helps too. PS14, Line 95: streams are empty but prepared for : /// writing with a write buffer allocated. : std::vector> AcquireProbeStreams(); > The hash join node uses this to determine whether it's already cleaned up t Okay for now as an intermediate step, but let's definitely clean it up as work continues. PS14, Line 195: is destroyed i > Overall these bool arguments are really confusing, and the correctness of t Yeah, agree this bool was very confusing. Will take a look at the new stuff. PS14, Line 234: > Yes, since we share the reservation and hand off the probe buffers. Thanks, the new comment clarifies. From reading it I couldn't tell if there was a "* 2" missing because of the comment. http://gerrit.cloudera.org:8080/#/c/3873/15/be/src/exec/partitioned-hash-join-node.cc File be/src/exec/partitioned-hash-join-node.cc: Line 372: { RETURN_IF_ERROR(build_partition->BuildHashTable(&built)); } extraneous braces Line 395: } same http://gerrit.cloudera.org:8080/#/c/3873/15/be/src/exec/partitioned-hash-join-node.h File be/src/exec/partitioned-hash-join-node.h: PS15, Line 81: build probe Line 103: /// Thanks, I think this will make the code much more approachable. PS15, Line 405: nit: extra space http://gerrit.cloudera.org:8080/#/c/3873/15/be/src/util/runtime-profile.cc File be/src/util/runtime-profile.cc: Line 391: } alternatively: initialize insert_pos to begin and then DCHECK_NE(insert_pos, children_.begin()); Okay to ignore. -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 17 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Dan Hecht has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 14: > (2 comments) > > Before I responded to individual comments I wanted to point out > that the coupling is now one way - the builder never refers to > PhjNode aside from metadata provided via the constructor (you can > verify there are no references to partitioned-hash-join-node.h or > PartitionedHashJoin node in the builder code). Yup, that's good. I thought there was a back reference on my first pass through, but I was thinking of the reference from partition back to the builder/sink. Since it is the sink, it seems a little weird (once the builder is used by the join node in the other fragment), but okay for this first step. Splitting out the sink and builder is something we should revisit later. > > At this point it should only require minor changes to this hash > join code to create the builder in a separate fragment *before* the > join node, provided you have a 1:1 mapping from builder to node. > 1:n broadcast joins with no spilling would require some more > changes, but is a lot closer. > Yup, agree. > I chose this stopping point because there was a clear physical > separation between the classes, even if the spilling algorithm has > some logical coupling and assumes a 1:1 mapping from node to > builder. > > Reducing the logical coupling would require significant changes to > the spilling algorithm that is driven from PartitionedHashJoin::GetNext(), > which I think should be deferred until there is some consensus > about what spilling for 1:n broadcast joins with multithreading > might look like. I'm okay with this point more or less as an incremental first step, as well. I just wanted to understand better where we are going longer term and what the immediate requirements are. As mentioned, looking at whether the sink should be just a sink, and then the "builder" is something that's driven by both the sink and also the join node. But I'm fine with waiting on that since this is a good step in teasing things apart. -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 14 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Tim Armstrong has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 17: The memory usage experiment results look good (no major changes) -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 17 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Tim Armstrong has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 14: (16 comments) http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-builder.h File be/src/exec/partitioned-hash-join-builder.h: PS14, Line 45: build row partitions > is this the same as the "hash partitions" referred to in the next paragraph really the streams, reworded. Line 86: > is it expected that the renaming public functions would only be used by PHJ Done PS14, Line 90: Acquire ownership > Doesn't the ownership pass from this class to the exec node? In which case It passes from the class to the caller. I guess I don't find this ambiguous given the return type and can't think of a non-awkward alternative. PS14, Line 95: Called after probing of the partitions : /// is done. The partitions are not closed or destroyed, since they may be spilled or : /// may contain unmatched build rows for certain join modes (e.g. right outer join). > The fact that we need some much explanation kinda makes this seem like it m The hash join node uses this to determine whether it's already cleaned up the hash partitions, and to keep the build/probe hash partition vectors in sync. Agree this isn't the interface we ultimately want but I wanted to avoid additional restructuring of GetNext()/CleanupHashPartitions(). Line 106: } > nit: missing blank line Done PS14, Line 109: When this call returns > Is this stuff by this method or the caller? i.e. if this method closes inp Done PS14, Line 112: so that the probe phase has enough buffers preallocated : /// execute successfully. > slightly garbled. seems like a word is missing or something. Done PS14, Line 117: /// Iterates over all the partitions in 'hash_partitions_' and returns the largest : /// number of build rows. > rather than say how and referring to private member, and leaving it to the Done Line 123: bool HashTableStoresNulls() const; > Both classes already have their own hash table context. Also, I think this makes sense to have here since the builder manages the hash tables. PS14, Line 182: partition side of this partition > not sure what this is saying. maybe just "for this build partition"? Done Line 188: /// false. > maybe clarify that an error is not returned in that case. or clarify what Done PS14, Line 195: unpin_all_build > why "build"? shouldn't it be unpin_all_blocks? or even better would be to Overall these bool arguments are really confusing, and the correctness of the spilling depends a lot on using the right value in the right place. I ended up changing this significantly to use an enum all the way down to the underlying BufferedTupleStream method, and adding this to SpillPartition(). It turns out there was a bug in BuildHashTablesAndPrepareProbeStreams() where it should have been unpinning all of the blocks in the partition it chose to spill but wasn't. I'm rerunning the memory experiment just to confirm this doesn't change anything. PS14, Line 234: build or probe > does this care about the probe side partitioning? Yes, since we share the reservation and hand off the probe buffers. PS14, Line 244: all hash partitions for partitioning level > this is kinda misleading. maybe qualify with ... for the current input, or Done PS14, Line 252: in > into Done PS14, Line 256: in memory > what does this mean? is it trying to say it fits in the current (pinned) bl Reworded it - I think it makes more sense to think of it as whether appending to the stream succeeds (which may advance the block) or whether we have to start spilling things. -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 14 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Hello Alex Behm, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/3873 to look at the new patch set (#17). Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder The main outcome of this patch is to split out PhjBuilder from PartitionedHashJoinNode, which manages the build partitions for the join and implements the DataSink interface. A lot of this work is fairly mechanical refactoring: dividing the state between the two classes and duplicating it where appropriate. One major change is that probe partitions need to be separated from build partitions. This required some significant algorithmic changes to memory management for probe partition buffers: memory management for probe partitions was entangled with the build partitions: small buffers were allocated for the probe partitions even for non-spilling joins and the join could spill additional partitions during probing if the probe partitions needed to switch from small to I/O buffers. The changes made were: - Probe partitions are only initialized after the build is partitioned, and only for spilled build partitions. - Probe partitions never use small buffers: once the initial write buffer is allocated, appending to the probe partition never fails. - All probe partition allocation is done after partitioning the build and before processing the probe input during the same phase as hash table building. (Aside from NAAJ partitions which are allocated upfront). The probe partition changes necessitated a change in BufferedTupleStream: allocation of write blocks is now explicit via the PrepareForWrite() API. Testing: Ran exhaustive build and local stress test. Memory Usage: Ran stress test binary search locally for TPC-DS SF-1 and TPC-H SF-20. No regressions on TPC-DS. TPC-H either stayed the same or improved in min memory requirement without spilling, but the min memory requirement with spilling regressed in some cases. I investigated each of the significant regressions on TPC-H and determined that they were all due to exec nodes racing for spillable or non-spillable memory. None of them were cases where exec nodes got their minimum reservation and failed to execute the spilling algorithm correctly. Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb --- M .clang-format M be/src/codegen/gen_ir_descriptions.py M be/src/codegen/impala-ir.cc M be/src/exec/CMakeLists.txt M be/src/exec/analytic-eval-node.cc M be/src/exec/blocking-join-node.cc M be/src/exec/blocking-join-node.h M be/src/exec/data-sink.cc M be/src/exec/hash-join-node.cc M be/src/exec/nested-loop-join-builder.cc M be/src/exec/nested-loop-join-builder.h M be/src/exec/nested-loop-join-node.cc M be/src/exec/partitioned-aggregation-node.cc A be/src/exec/partitioned-hash-join-builder-ir.cc A be/src/exec/partitioned-hash-join-builder.cc A be/src/exec/partitioned-hash-join-builder.h M be/src/exec/partitioned-hash-join-node-ir.cc M be/src/exec/partitioned-hash-join-node.cc M be/src/exec/partitioned-hash-join-node.h M be/src/exec/partitioned-hash-join-node.inline.h M be/src/runtime/buffered-block-mgr.cc M be/src/runtime/buffered-tuple-stream-test.cc M be/src/runtime/buffered-tuple-stream.cc M be/src/runtime/buffered-tuple-stream.h M be/src/util/runtime-profile.cc M be/src/util/runtime-profile.h M tests/stress/concurrent_select.py 27 files changed, 2,244 insertions(+), 1,491 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/73/3873/17 -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: newpatchset Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 17 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Tim Armstrong has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 14: (2 comments) Before I responded to individual comments I wanted to point out that the coupling is now one way - the builder never refers to PhjNode aside from metadata provided via the constructor (you can verify there are no references to partitioned-hash-join-node.h or PartitionedHashJoin node in the builder code). At this point it should only require minor changes to this hash join code to create the builder in a separate fragment *before* the join node, provided you have a 1:1 mapping from builder to node. 1:n broadcast joins with no spilling would require some more changes, but is a lot closer. I chose this stopping point because there was a clear physical separation between the classes, even if the spilling algorithm has some logical coupling and assumes a 1:1 mapping from node to builder. Reducing the logical coupling would require significant changes to the spilling algorithm that is driven from PartitionedHashJoin::GetNext(), which I think should be deferred until there is some consensus about what spilling for 1:n broadcast joins with multithreading might look like. http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-builder.h File be/src/exec/partitioned-hash-join-builder.h: Line 123: bool HashTableStoresNulls() const; > why is this here rather than the join node? especially given that the join Both classes already have their own hash table context. Line 128: inline const std::vector& is_not_distinct_from() const { > and then should this be here? is it to be closer to being able to break the There isn't a parent link - all the necessary metadata about the join is passed in when the builder is initialised. -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 14 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Dan Hecht has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 14: (19 comments) Flushing out some more comments, and will move onto the later patch set. One thing that's not clear to me yet is what the right separation between node and sink is -- they are currently very tightly coupled which means they don't really benefit from being separated into separate classes. Is the goal of this change solely to be able to expose the DataSink interface, or is it to provide some separation as well? Do we see them becoming less coupled over time? http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-builder.h File be/src/exec/partitioned-hash-join-builder.h: PS14, Line 45: build row partitions is this the same as the "hash partitions" referred to in the next paragraph. let's be consistent in terminology. Line 86: is it expected that the renaming public functions would only be used by PHJN? If so, would be good to state that. PS14, Line 90: Acquire ownership Doesn't the ownership pass from this class to the exec node? In which case this method wouldn't acquire ownership. So let's come up with better terminology and naming. PS14, Line 95: Called after probing of the partitions : /// is done. The partitions are not closed or destroyed, since they may be spilled or : /// may contain unmatched build rows for certain join modes (e.g. right outer join). The fact that we need some much explanation kinda makes this seem like it might not be the right abstraction. Could we instead just clear the vector when creating the next set of partitions (it doesn't look like we delete partitions until either Close() or Reset() anyway), so why do we need to proactively clearly the hash_partitions_ vector before starting he next iteration? Line 106: } nit: missing blank line PS14, Line 109: When this call returns Is this stuff by this method or the caller? i.e. if this method closes input_partition, then say that. If it doesn't then either say "after" or don't say it at all (why is it relevant to this method documentation?). PS14, Line 112: so that the probe phase has enough buffers preallocated : /// execute successfully. slightly garbled. seems like a word is missing or something. PS14, Line 117: /// Iterates over all the partitions in 'hash_partitions_' and returns the largest : /// number of build rows. rather than say how and referring to private member, and leaving it to the reader to assume that "largest number of build rows" is talking about a per-partition thing, let's say something like: Returns the maximum number of build rows of a hash partition (or whatever terminology you choose in the class comment). Line 123: bool HashTableStoresNulls() const; why is this here rather than the join node? especially given that the join node owns the HashTableCtx. on a related note, I'm not really sure how the HashTable context is going to work out once this is driven like a real sync. Will we still have a parent_ link to the exec node living in a different fragment? Line 128: inline const std::vector& is_not_distinct_from() const { and then should this be here? is it to be closer to being able to break the parent_ link at some point and make the builder able to build the hash table without any info from parent_? PS14, Line 182: partition side of this partition not sure what this is saying. maybe just "for this build partition"? Line 188: /// false. maybe clarify that an error is not returned in that case. or clarify what cases are errors. etc. PS14, Line 195: unpin_all_build why "build"? shouldn't it be unpin_all_blocks? or even better would be to name it according to what the algorithm cares about. which I think is whether or not we might have more build rows to append, right? PS14, Line 234: build or probe does this care about the probe side partitioning? Line 237: /// 'null_aware_probe_partition_' and 'null_probe_rows_'. Also explain what this is computing, rather than just what the computation is. PS14, Line 244: all hash partitions for partitioning level this is kinda misleading. maybe qualify with ... for the current input, or the "current set of hash partitions for partition level...". i.e. this isn't all the hash partitions that are created for a particular level. PS14, Line 252: in into PS14, Line 256: in memory what does this mean? is it trying to say it fits in the current (pinned) block? http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-node.h File be/src/exec/partitioned-hash-join-node.h: PS14, Line 410: we then iterate over all the probe rows > Clarified that it's the partition's. Most of the text was just copied verba Thanks. From the perspective of the reviewer, the comment is al
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Hello Alex Behm, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/3873 to look at the new patch set (#16). Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder The main outcome of this patch is to split out PhjBuilder from PartitionedHashJoinNode, which manages the build partitions for the join and implements the DataSink interface. A lot of this work is fairly mechanical refactoring: dividing the state between the two classes and duplicating it where appropriate. One major change is that probe partitions need to be separated from build partitions. This required some significant algorithmic changes to memory management for probe partition buffers: memory management for probe partitions was entangled with the build partitions: small buffers were allocated for the probe partitions even for non-spilling joins and the join could spill additional partitions during probing if the probe partitions needed to switch from small to I/O buffers. The changes made were: - Probe partitions are only initialized after the build is partitioned, and only for spilled build partitions. - Probe partitions never use small buffers: once the initial write buffer is allocated, appending to the probe partition never fails. - All probe partition allocation is done after partitioning the build and before processing the probe input during the same phase as hash table building. (Aside from NAAJ partitions which are allocated upfront). The probe partition changes necessitated a change in BufferedTupleStream: allocation of write blocks is now explicit via the PrepareForWrite() API. Testing: Ran exhaustive build and local stress test. Memory Usage: Ran stress test binary search locally for TPC-DS SF-1 and TPC-H SF-20. No regressions on TPC-DS. TPC-H either stayed the same or improved in min memory requirement without spilling, but the min memory requirement with spilling regressed in some cases. I investigated each of the significant regressions on TPC-H and determined that they were all due to exec nodes racing for spillable or non-spillable memory. None of them were cases where exec nodes got their minimum reservation and failed to execute the spilling algorithm correctly. Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb --- M .clang-format M be/src/codegen/gen_ir_descriptions.py M be/src/codegen/impala-ir.cc M be/src/exec/CMakeLists.txt M be/src/exec/analytic-eval-node.cc M be/src/exec/blocking-join-node.cc M be/src/exec/blocking-join-node.h M be/src/exec/data-sink.cc M be/src/exec/hash-join-node.cc M be/src/exec/nested-loop-join-builder.cc M be/src/exec/nested-loop-join-builder.h M be/src/exec/nested-loop-join-node.cc M be/src/exec/partitioned-aggregation-node.cc A be/src/exec/partitioned-hash-join-builder-ir.cc A be/src/exec/partitioned-hash-join-builder.cc A be/src/exec/partitioned-hash-join-builder.h M be/src/exec/partitioned-hash-join-node-ir.cc M be/src/exec/partitioned-hash-join-node.cc M be/src/exec/partitioned-hash-join-node.h M be/src/exec/partitioned-hash-join-node.inline.h M be/src/runtime/buffered-block-mgr.cc M be/src/runtime/buffered-tuple-stream-test.cc M be/src/runtime/buffered-tuple-stream.cc M be/src/runtime/buffered-tuple-stream.h M be/src/util/runtime-profile.cc M be/src/util/runtime-profile.h M tests/stress/concurrent_select.py 27 files changed, 2,196 insertions(+), 1,476 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/73/3873/16 -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: newpatchset Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 16 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Tim Armstrong has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 16: Rebased onto master -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 16 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Hello Alex Behm, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/3873 to look at the new patch set (#15). Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder The main outcome of this patch is to split out PhjBuilder from PartitionedHashJoinNode, which manages the build partitions for the join and implements the DataSink interface. A lot of this work is fairly mechanical refactoring: dividing the state between the two classes and duplicating it where appropriate. One major change is that probe partitions need to be separated from build partitions. This required some significant algorithmic changes to memory management for probe partition buffers: memory management for probe partitions was entangled with the build partitions: small buffers were allocated for the probe partitions even for non-spilling joins and the join could spill additional partitions during probing if the probe partitions needed to switch from small to I/O buffers. The changes made were: - Probe partitions are only initialized after the build is partitioned, and only for spilled build partitions. - Probe partitions never use small buffers: once the initial write buffer is allocated, appending to the probe partition never fails. - All probe partition allocation is done after partitioning the build and before processing the probe input during the same phase as hash table building. (Aside from NAAJ partitions which are allocated upfront). The probe partition changes necessitated a change in BufferedTupleStream: allocation of write blocks is now explicit via the PrepareForWrite() API. Testing: Ran exhaustive build and local stress test. Memory Usage: Ran stress test binary search locally for TPC-DS SF-1 and TPC-H SF-20. No regressions on TPC-DS. TPC-H either stayed the same or improved in min memory requirement without spilling, but the min memory requirement with spilling regressed in some cases. I investigated each of the significant regressions on TPC-H and determined that they were all due to exec nodes racing for spillable or non-spillable memory. None of them were cases where exec nodes got their minimum reservation and failed to execute the spilling algorithm correctly. Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb --- M .clang-format M be/src/codegen/gen_ir_descriptions.py M be/src/codegen/impala-ir.cc M be/src/exec/CMakeLists.txt M be/src/exec/analytic-eval-node.cc M be/src/exec/blocking-join-node.cc M be/src/exec/blocking-join-node.h M be/src/exec/data-sink.cc M be/src/exec/hash-join-node.cc M be/src/exec/nested-loop-join-builder.cc M be/src/exec/nested-loop-join-builder.h M be/src/exec/nested-loop-join-node.cc M be/src/exec/partitioned-aggregation-node.cc A be/src/exec/partitioned-hash-join-builder-ir.cc A be/src/exec/partitioned-hash-join-builder.cc A be/src/exec/partitioned-hash-join-builder.h M be/src/exec/partitioned-hash-join-node-ir.cc M be/src/exec/partitioned-hash-join-node.cc M be/src/exec/partitioned-hash-join-node.h M be/src/exec/partitioned-hash-join-node.inline.h M be/src/runtime/buffered-block-mgr.cc M be/src/runtime/buffered-tuple-stream-test.cc M be/src/runtime/buffered-tuple-stream.cc M be/src/runtime/buffered-tuple-stream.h M be/src/util/runtime-profile.cc M be/src/util/runtime-profile.h M tests/stress/concurrent_select.py 27 files changed, 2,196 insertions(+), 1,478 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/73/3873/15 -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: newpatchset Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 15 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Tim Armstrong has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 14: (53 comments) http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/analytic-eval-node.cc File be/src/exec/analytic-eval-node.cc: Line 207: state, Substitute(PREPARE_FAILED_ERROR_MSG, "write")); > old code and agg uses state_->block_mgr()->MemLimitTooLowError(). Any reas Historical reasons it looks like: the PrepareForRead() version below diverged from the PAGG/PHJ versions. Switched to the block_mgr version for consistency. http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/blocking-join-node.cc File be/src/exec/blocking-join-node.cc: PS14, Line 297: SCOPED_TIMER(build_timer_); > We used to have two different timers in PHJ for measuring the time to hash It's not clear to me that the original timers were well thought out. It didn't look like there was a timer for how long it took to build the hash table - that was lumped into 'build_timer_'. I added two new timers with clearer meanings that track the total hash table build timer and time spent partitioning rows. Also moved the remaining build-related timers into the Builder. Now 'built_timer_' is the initial partitioning and hash table build, and 'repartition_timer_' is the time spent repartitioning'. http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-builder.cc File be/src/exec/partitioned-hash-join-builder.cc: Line 179: largest_fraction = max(largest_fraction, > DCHECK_GT(num_build_rows, 0); I don't think that's valid, it's casting to double to avoid crashing on the divide-by-zero. I changed it to avoid the calculation if num_build_rows = 0 to avoid that problem. PS14, Line 318: a hash table > hash tables Done. Both alternatives seem grammatically correct to me though. Line 341: // building hash tables to avoid building hash tables for partitions we'll spill anyway. > That's an interesting comment. If I understand it correctly, it means that Done with some slight tweaks. This is a lot simpler than the previous behaviour, where spilling can happen even during probing. I didn't measure how often, but it's not at all improbable, since the probe buffers are 8MB each and we could allocate up to 16 of them. Line 401: RETURN_IF_ERROR(SpillPartition()); > May help to document that failure to find any partition to spill (e.g. all I added to the comment up the top of the loop to make the termination more explicit. PS14, Line 529: PhjBuilder::HashTableStoresNulls() > This seems to belong to PartitionedHashJoinNode conceptually. I feel like it makes sense here though since the builder owns the hash tables. Plumbing-wise it's also easier since PhjBuilder needs this info and starts executing before PartitionedHashJoinNode. Line 646: do { > Please see comments in BlockingJoinNode, it would be great to retain timer Done PS14, Line 782: process_build_batch_fn_level0 > 'process_build_batch_fn_level0_' I just copied this verbatim - it seems to be referring to the local variable though. http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-builder.h File be/src/exec/partitioned-hash-join-builder.h: Line 425: > nit: unnecessary blank line ? Done http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-node.cc File be/src/exec/partitioned-hash-join-node.cc: PS14, Line 151: : : > This may be important information to retain. Not sure why I removed it. May have had a good reason but I can't recall it. PS14, Line 87: Expr::CreateExprTree(pool_, eq_join_conjunct.right > Does this overlap with CreateExprTree() in PhjBuilder::Init() ? Same questi They are redundant currently, but the idea is that PhjBuilder and PhjNode will exist more independently in different plan fragments, so I wanted to encapsulate the state where possible. The expr contexts need to be independent to eventually support many probe sides : 1 build side for broadcast joins. I think documenting this explicitly may be confusing because it is sort-of explaining the current state of things relative to the previous way of doing things. Whereas I think with the separate build side, the default assumption is that builder data structures are private and not shared with the exec node. It may make sense to share Expr trees globally between fragment instances as part of the multithreading changes, but I don't think it's worth trying to share them here until we have the right infrastructure. Line 213: for (unique_ptr& partition : spilled_partitions_) > missing { } Ah, missed that clang-format wrapped it. PS14, Line 494: to output > outputting Done PS14, Line 585: hash_partitions_ > 'hash_partitions_' Done Line 594: continue; > Is there a reason why we cannot call OutputUnmatchedBuild() directly at thi I don't
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Dan Hecht has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 14: (25 comments) Here's a first set of comments, mostly focused on phjn.h. I need to go back through phjn.cc, builder.h/cc. http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/analytic-eval-node.cc File be/src/exec/analytic-eval-node.cc: Line 207: state, Substitute(PREPARE_FAILED_ERROR_MSG, "write")); old code and agg uses state_->block_mgr()->MemLimitTooLowError(). Any reason this uses MemLimitExceeded() directly instead? Just wondering the reasoning. http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-node.h File be/src/exec/partitioned-hash-join-node.h: Line 79: /// Logically, the algorithm has the following modes:. I think this comment could be a lot more useful if we explain how the phases work together to give the full algorithm. Probably that could replace the state transition graph which I find confusing. PS14, Line 81: partition them into hash_partitions_. The input can this comment is kind of disjoint since it talks about what the phase does, then where the input comes from, and then explains more about what the phase does. Let's format each phase consistently, maybe describe inputs first, then actions, then outputs? PS14, Line 86: PROCESSING why do we use PROCESSING term here rather than PARTITIONING, which would be more consistent? PS14, Line 87: spilled single spilled (or remove single from #3 since having it in one place but not the other makes the reader wonder if there's a difference). Line 89: /// table in memory, we perform the join, otherwise we spill the probe row. this last sentence is confusing since it mostly restates what the first says adds a small amount of new info. Let's combine them. PS14, Line 90: PROBING_SPILLED_PARTITION it would be good to be more upfront about when this phase is used and how, for a spilled partition, we either do this phase or REPARTITIONING_PROBE phase. PS14, Line 90: construct what does this phase construct? isn't the hash table already constructed? PS14, Line 91: walking what does this mean? processing? also, from these comments it's not clear how these stages relate. You could read this to mean that #2 a fallback if we can't do #3, or that #3 is the probing that "perform the join" of #2. PS14, Line 97: * what does this star mean to say? We can go from REPARTITIONING_PROBE back to PROBING_SPILLED_PARTITION, right? PS14, Line 143: buffer write buffer? PS14, Line 264: the its PS14, Line 267: the that PS14, Line 291: Walks What does this mean. Iterates over? PS14, Line 291: hash partitions is this talking about the probe hash partitions or the builder's? PS14, Line 384: builder_->hash_partitions is this referring to the entries in the builder_->hash_partition_ array? PS14, Line 385: This is not used when processing a single partition. what does this mean? PS14, Line 410: we then iterate over all the probe rows I can't tell if this means literally all the probe rows, or all the rows in this stream. clarify this paragraph. PS14, Line 432: and this makes it sound like there are two conditions required to create a probe partition, but I think you mean this is implied by the first part. "because"? PS14, Line 436: preallocated what does this mean? PS14, Line 437: and double and. And is this saying the constructor prepares it or the caller should have already? is this right: ... should be an unpinned stream that has been prepared for writing with ... PS14, Line 443: should will PS14, Line 464: meaning : /// it has to call Close() on it) but transferred to the parent exec node this doesn't seem right. don't we either Close() it or transfer it (not both), in PartitionedHashJoinNode::ProbePartition::Close()? http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/util/runtime-profile.cc File be/src/util/runtime-profile.cc: PS14, Line 409: DCHECK DCHECK_EQ Line 412: children_.insert(children_.begin(), child_entry); rather than having two special cases (here and line 395-399, how about making AddChildLocked() take the position iterator? This case passes begin() and then AddChild() case passes either end() or the middle iterator. -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 14 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Tim Armstrong Gerrit-Reviewer: Alex Behm Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Michael Ho has posted comments on this change. Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder .. Patch Set 14: (31 comments) Some comments and questions. http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/blocking-join-node.cc File be/src/exec/blocking-join-node.cc: PS14, Line 297: SCOPED_TIMER(build_timer_); We used to have two different timers in PHJ for measuring the time to hash the input rows into different partitions and the time to build the hash tables. It would be good to be able to retain that fined grained tracking PHJBuilder. http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-builder.cc File be/src/exec/partitioned-hash-join-builder.cc: Line 179: largest_fraction = max(largest_fraction, DCHECK_GT(num_build_rows, 0); PS14, Line 318: a hash table hash tables Line 341: // building hash tables to avoid building hash tables for partitions we'll spill anyway. That's an interesting comment. If I understand it correctly, it means that calling InitSpilledPartitionProbeStreams() may cause more partitions to be spilled as it needs to pin write buffers for the spilled streams so we should make sure we allocate all the write streams to trigger all the extra spills which may happen before building hash tables. If so, would you mind elaborating a bit in the comment: "Allocate probe buffers for all partitions that are already spilled. Do this before building hash tables as allocating probe buffers may cause some more partitions to be spilled. This avoids wasted work on building hash tables for partitions which end up being spilled eventually." Do you know how often this case happens ? I suppose this complication will be gone eventually once reservation is in place. Line 401: RETURN_IF_ERROR(SpillPartition()); May help to document that failure to find any partition to spill (e.g. all partitions are spilled) will return an error code. It looks as if we may be in an infinite loop on the first glance. PS14, Line 529: PhjBuilder::HashTableStoresNulls() This seems to belong to PartitionedHashJoinNode conceptually. Line 646: do { Please see comments in BlockingJoinNode, it would be great to retain timers for InsertBatch() and ProcessBuildInput() separately for finer grain tracking. PS14, Line 782: process_build_batch_fn_level0 'process_build_batch_fn_level0_' http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-builder.h File be/src/exec/partitioned-hash-join-builder.h: Line 425: nit: unnecessary blank line ? http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-node.cc File be/src/exec/partitioned-hash-join-node.cc: PS14, Line 151: : : This may be important information to retain. PS14, Line 87: Expr::CreateExprTree(pool_, eq_join_conjunct.right Does this overlap with CreateExprTree() in PhjBuilder::Init() ? Same question for build_expr_ctxs_. If they are not redundant, mind commenting on how they are used differently ? Line 213: for (unique_ptr& partition : spilled_partitions_) missing { } PS14, Line 494: to output outputting PS14, Line 585: hash_partitions_ 'hash_partitions_' Line 594: continue; Is there a reason why we cannot call OutputUnmatchedBuild() directly at this point ? PS14, Line 996: s) next_state ? http://gerrit.cloudera.org:8080/#/c/3873/12/be/src/exec/partitioned-hash-join-node.h File be/src/exec/partitioned-hash-join-node.h: PS12, Line 81: hash_partitions_ 'hash_partitions_' PS12, Line 88: probe_hash_partitions_ 'probe_hash_partitions_' PS12, Line 112: input_partition_ 'input_partition_' http://gerrit.cloudera.org:8080/#/c/3873/14/be/src/exec/partitioned-hash-join-node.h File be/src/exec/partitioned-hash-join-node.h: PS14, Line 437: Is this merged into build_timer_ in BlockingJoinNode now ? It may be helpful for debugging to maintain two separate timers. PS14, Line 81: hash_partitions_ 'hash_partitions_' PS14, Line 84: This is the only phase These are the only phases PS14, Line 88: probe_hash_partitions_ 'probe_hash_partitions_' PS14, Line 112: input_partition_ 'input_partition_' PS14, Line 118: input_partition_ 'input_partition_' PS14, Line 298: spilled_partitions_ 'spilled_partitions_' PS14, Line 302: probe_batch_pos_ 'probe_batch_pos_' PS14, Line 306: probe_batch_pos_ 'probe_batch_pos_' PS14, Line 309: input_partition_ 'input_partition_' PS14, Line 408: null_aware_ 'null_aware_' Line 445: /// block cannot be acquired. "delete_on_read" mode is used, so blocks in the stream ... used for the buffered tuple stream, so.. -- To view, visit http://gerrit.cloudera.org:8080/3873 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Gerrit-PatchSet: 14 Gerrit-Project: Impala-ASF Gerrit