Tim Armstrong has posted comments on this change. ( http://gerrit.cloudera.org:8080/15926 )
Change subject: IMPALA-9655: Dynamic intra-node load balancing for HDFS scans ...................................................................... Patch Set 1: (25 comments) This is really nice! http://gerrit.cloudera.org:8080/#/c/15926/1//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/15926/1//COMMIT_MSG@39 PS1, Line 39: Testing: Can we add some more test coverage for mt_dop for edge cases like Avro/Parquet files split across blocks? I think we have non-mt_dop tests for those. http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h File be/src/exec/hdfs-scan-node-base.h: http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@174 PS1, Line 174: /// The following public methods are only used by MT scan nodes. Can we add DCHECKS to these functions to make sure that they're only used by the mt scan node? http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@176 PS1, Line 176: nit: mismatched quotes http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@237 PS1, Line 237: AtomicInt32 remaining_scan_range_submissions_ = { 0 }; nit: would usually init these as remaining_scan_range_submissions_{0} http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@240 PS1, Line 240: use_mt_scan_node nit: needs trailing _ http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@242 PS1, Line 242: /// The following are only used by MT scan nodes. nit: might want to make this comment more prominent http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@246 PS1, Line 246: ; . http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@247 PS1, Line 247: last_scan_range_submitted_ Not sure I understand why this is needed, since you can infer it from 'remaining_scan_range_submissions_'. I guess to make the condition variable pattern more obvious instead of doing something weird like setting the atomic, then acquiring the lock? http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@317 PS1, Line 317: ScanRangeSharedState shared_state_; I'm a little skeptical about introducing mutable state into the PlanNode structure - the PlanNode has the right cardinality, but it would be nice to keep execution state out of it. http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@671 PS1, Line 671: seperate nit: separate http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@766 PS1, Line 766: inline ProgressUpdater& progress() { return shared_state_->progress(); } Consider inlining this function - the callsites might actually be easier to understand if it's explicit that it's updating shared_state_ http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@830 PS1, Line 830: /// Fetches the next range in queue to read. nit: extra space after ///. I think the comment could be fleshed out a little, e.g. why it's overridden by subclasses, any invariants http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc File be/src/exec/hdfs-scan-node-base.cc: http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@242 PS1, Line 242: for (auto ctx : instance_ctxs) { It's kinda weird that we split up the scan ranges between instances then merge them back together again. I think this is fine since we still need to do that for other scan nodes and we don't want to invent a different place to push hdfs scan ranges. We should probably rip out the LPT algorithm in Scheduler::AssignRangesToInstances() and just split them evenly using a simpler algorithm, to keep the code simpler and faster. Could be a follow-up patch. http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@272 PS1, Line 272: NULL nit: nullptr while you're here http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@315 PS1, Line 315: Setup nit: Set up http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@321 PS1, Line 321: // Distribute files evenly among all instances. This is just to distribute the work for issuing initial scan ranges, right? Comment could be clearer http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@327 PS1, Line 327: DCHECK_GT(instance_ctxs.size(), 0); This code feels more complex than it needs to be - it has a bunch of edge cases. AFAICT it's correct and we have plenty of test coverage, but it would be nice to simplify it. Could we instead construct min(file_descs.size(), instance_ctxs.size()) vectors, then round-robin assign the files? Or if you wanted to keep the current distribution, I'd find something like this a little easier to follow. int num_lists = min(file_descs.size(), instance_ctxs.size()); auto fd_it = file_descs.begin(); for (int i = 0; i < num_lists; ++i) { vector<HdfsFileDesc*>* curr_file_list = &shared_state_.file_assignment_per_instance_[instance_ctxs[i] ->fragment_instance_id]; for (int j = 0; j < files_per_instance + (i < remainder); ++j) { curr_file_lists.push_back(fd_it->second); ++fd_it; } } DCHECK(fd_it == file_descs.end()); http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@354 PS1, Line 354: RawValue::Write(eval->GetValue(NULL), template_tuple, slot_desc, NULL); nit: nullptr http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@1226 PS1, Line 1226: while (remaining_scan_range_submissions_.Load() > 0) { Doesn't this block any instances from processing scan ranges until all have issued their ranges? This doesn't seem to match the function comment exactly. I didn't see a reason why we wouldn't want fragment instances to start processing ranges ASAP. Maybe we can restructure a bit. I think it's also best to avoid using both last_scan_range_submitted_ and remaining_scan_range_submissions_ in the loop, since they're essentially synonyms. We could also get rid of last_scan_range_submitted_, I think - I left a comment in the header. It would also be nice if we didn't acquire scan_range_submission_lock_ on every call. while (true) { *scan_range = scan_range_queue_.Dequeue(); if (*scan_range != nullptr) return Status::OK(); { unique_lock<mutex> l(scan_range_submission_lock_); while (scan_range_queue_.empty() && !last_scan_range_submitted_ && !state->is_cancelled()) { range_submission_cv_.Wait(l); } // No more work to do. if (scan_range_queue_.empty() && last_scan_range_submitted_) break; } if (state->is_cancelled()) return Status::CANCELLED; } http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@1234 PS1, Line 1234: if(state->is_cancelled()) return Status::CANCELLED; nit: missing space before ( here and below http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@1238 PS1, Line 1238: if(state->is_cancelled()) return Status::CANCELLED; Is this check actually necessary? It's needed in the loop because otherwise we may get stuck indefinitely, but it seems benign otherwise to start processing the scan range and notice cancellation later. http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@1244 PS1, Line 1244: state->AddCancellationCV(&scan_range_submission_lock_, &range_submission_cv_); This is a little janky because we're signalling a query-level CV as part of the finstance-level state cancellation. I think this is probably fine though.. http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-mt.cc File be/src/exec/hdfs-scan-node-mt.cc: http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-mt.cc@140 PS1, Line 140: shared_state_->EnqueueScanRange(ranges); Seems weird to pass in the bool on one branch and not the other. Could make more sense to just call EnqueueScanRange once with a local variable at_front. shared_state_->EnqueueScanRange(ranges, at_front); http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-mt.cc@142 PS1, Line 142: shared_state_->EnqueueScanRange(ranges, true /*at front*/); DCHECK that it's TAIL on the else branch http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node.cc File be/src/exec/hdfs-scan-node.cc: http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node.cc@73 PS1, Line 73: HdfsScanNode::HdfsScanNode(ObjectPool* pool, const HdfsScanPlanNode& pnode, Can we add a DCHECK or anything to make sure that there is only one instance of the legacy scan node on a backend? Not sure, but the interaction with shared_state_ might be depending on that. I don't think it's possible now but there used to be some weird things where we instantiated the legacy scan node for different file formats. -- To view, visit http://gerrit.cloudera.org:8080/15926 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I9a101d0d98dff6e3779f85bc466e4c0bdb38094b Gerrit-Change-Number: 15926 Gerrit-PatchSet: 1 Gerrit-Owner: Bikramjeet Vig <[email protected]> Gerrit-Reviewer: Bikramjeet Vig <[email protected]> Gerrit-Reviewer: Csaba Ringhofer <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Joe McDonnell <[email protected]> Gerrit-Reviewer: Tim Armstrong <[email protected]> Gerrit-Comment-Date: Mon, 18 May 2020 22:03:26 +0000 Gerrit-HasComments: Yes
