Tim Armstrong has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/15926 )

Change subject: IMPALA-9655: Dynamic intra-node load balancing for HDFS scans
......................................................................


Patch Set 1:

(25 comments)

This is really nice!

http://gerrit.cloudera.org:8080/#/c/15926/1//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/15926/1//COMMIT_MSG@39
PS1, Line 39: Testing:
Can we add some more test coverage for mt_dop for edge cases like Avro/Parquet 
files split across blocks? I think we have non-mt_dop tests for those.


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h
File be/src/exec/hdfs-scan-node-base.h:

http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@174
PS1, Line 174:   /// The following public methods are only used by MT scan 
nodes.
Can we add DCHECKS to these functions to make sure that they're only used by 
the mt scan node?


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@176
PS1, Line 176:
nit: mismatched quotes


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@237
PS1, Line 237:   AtomicInt32 remaining_scan_range_submissions_ = { 0 };
nit: would usually init these as

  remaining_scan_range_submissions_{0}


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@240
PS1, Line 240: use_mt_scan_node
nit: needs trailing _


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@242
PS1, Line 242:   /// The following are only used by MT scan nodes.
nit: might want to make this comment more prominent


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@246
PS1, Line 246: ;
.


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@247
PS1, Line 247: last_scan_range_submitted_
Not sure I understand why this is needed, since you can infer it from 
'remaining_scan_range_submissions_'. I guess to make the condition variable 
pattern more obvious instead of doing something weird like setting the atomic, 
then acquiring the lock?


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@317
PS1, Line 317:   ScanRangeSharedState shared_state_;
I'm a little skeptical about introducing mutable state into the PlanNode 
structure - the PlanNode has the right cardinality, but it would be nice to 
keep execution state out of it.


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@671
PS1, Line 671: seperate
nit: separate


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@766
PS1, Line 766:   inline ProgressUpdater& progress() { return 
shared_state_->progress(); }
Consider inlining this function - the callsites might actually be easier to 
understand if it's explicit that it's updating shared_state_


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.h@830
PS1, Line 830:   ///  Fetches the next range in queue to read.
nit: extra space after ///. I think the comment could be fleshed out a little, 
e.g. why it's overridden by subclasses, any invariants


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc
File be/src/exec/hdfs-scan-node-base.cc:

http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@242
PS1, Line 242:   for (auto ctx : instance_ctxs) {
It's kinda weird that we split up the scan ranges between instances then merge 
them back together again. I think this is fine since we still need to do that 
for other scan nodes and we don't want to invent a different place to push hdfs 
scan ranges.

We should probably rip out the LPT algorithm in 
Scheduler::AssignRangesToInstances() and just split them evenly using a simpler 
algorithm, to keep the code simpler and faster. Could be a follow-up patch.


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@272
PS1, Line 272: NULL
nit: nullptr while you're here


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@315
PS1, Line 315: Setup
nit: Set up


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@321
PS1, Line 321:   // Distribute files evenly among all instances.
This is just to distribute the work for issuing initial scan ranges, right? 
Comment could be clearer


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@327
PS1, Line 327:   DCHECK_GT(instance_ctxs.size(), 0);
This code feels more complex than it needs to be - it has a bunch of edge 
cases. AFAICT it's correct and we have plenty of test coverage, but it would be 
nice to simplify it. Could we instead construct min(file_descs.size(), 
instance_ctxs.size()) vectors, then round-robin assign the files?

Or if you wanted to keep the current distribution, I'd find something like this 
a little easier to follow.

  int num_lists = min(file_descs.size(), instance_ctxs.size());
  auto fd_it = file_descs.begin();
  for (int i = 0; i < num_lists; ++i) {
    vector<HdfsFileDesc*>* curr_file_list =
          &shared_state_.file_assignment_per_instance_[instance_ctxs[i]
                                                           
->fragment_instance_id];
    for (int j = 0; j < files_per_instance + (i < remainder); ++j) {
      curr_file_lists.push_back(fd_it->second);
      ++fd_it;
    }
  }
  DCHECK(fd_it == file_descs.end());


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@354
PS1, Line 354:     RawValue::Write(eval->GetValue(NULL), template_tuple, 
slot_desc, NULL);
nit: nullptr


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@1226
PS1, Line 1226:   while (remaining_scan_range_submissions_.Load() > 0) {
Doesn't this block any instances from processing scan ranges until all have 
issued their ranges? This doesn't seem to match the function comment exactly. I 
didn't see a reason why we wouldn't want fragment instances to start processing 
ranges ASAP. Maybe we can restructure a bit.

I think it's  also best to avoid using both last_scan_range_submitted_ and 
remaining_scan_range_submissions_ in the loop, since they're essentially 
synonyms. We could also get rid of last_scan_range_submitted_, I think - I left 
a comment in the header.

It would also be nice if we didn't acquire scan_range_submission_lock_ on every 
call.

  while (true) {
    *scan_range = scan_range_queue_.Dequeue();
    if (*scan_range != nullptr) return Status::OK();
    {
      unique_lock<mutex> l(scan_range_submission_lock_);
      while (scan_range_queue_.empty() && !last_scan_range_submitted_
          && !state->is_cancelled()) {
        range_submission_cv_.Wait(l);
      }
      // No more work to do.
      if (scan_range_queue_.empty() && last_scan_range_submitted_) break;
    }
    if (state->is_cancelled()) return Status::CANCELLED;
  }


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@1234
PS1, Line 1234:     if(state->is_cancelled()) return Status::CANCELLED;
nit: missing space before ( here and below


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@1238
PS1, Line 1238:   if(state->is_cancelled()) return Status::CANCELLED;
Is this check actually necessary? It's needed in the loop because otherwise we 
may get stuck indefinitely, but it seems benign otherwise to start processing 
the scan range and notice cancellation later.


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-base.cc@1244
PS1, Line 1244:   state->AddCancellationCV(&scan_range_submission_lock_, 
&range_submission_cv_);
This is a little janky because we're signalling a query-level CV as part of the 
finstance-level state cancellation. I think this is probably fine though..


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-mt.cc
File be/src/exec/hdfs-scan-node-mt.cc:

http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-mt.cc@140
PS1, Line 140:     shared_state_->EnqueueScanRange(ranges);
Seems weird to pass in the bool on one branch and not the other. Could make 
more sense to just call EnqueueScanRange once with a local variable at_front.

    shared_state_->EnqueueScanRange(ranges, at_front);


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node-mt.cc@142
PS1, Line 142:     shared_state_->EnqueueScanRange(ranges, true /*at front*/);
DCHECK that it's TAIL on the else branch


http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node.cc
File be/src/exec/hdfs-scan-node.cc:

http://gerrit.cloudera.org:8080/#/c/15926/1/be/src/exec/hdfs-scan-node.cc@73
PS1, Line 73: HdfsScanNode::HdfsScanNode(ObjectPool* pool, const 
HdfsScanPlanNode& pnode,
Can we add a DCHECK or anything to make sure that there is only one instance of 
the legacy scan node on a backend? Not sure, but the interaction with 
shared_state_ might be depending on that.

I don't think it's possible now but there used to be some weird things where we 
instantiated the legacy scan node for different file formats.



--
To view, visit http://gerrit.cloudera.org:8080/15926
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I9a101d0d98dff6e3779f85bc466e4c0bdb38094b
Gerrit-Change-Number: 15926
Gerrit-PatchSet: 1
Gerrit-Owner: Bikramjeet Vig <[email protected]>
Gerrit-Reviewer: Bikramjeet Vig <[email protected]>
Gerrit-Reviewer: Csaba Ringhofer <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Joe McDonnell <[email protected]>
Gerrit-Reviewer: Tim Armstrong <[email protected]>
Gerrit-Comment-Date: Mon, 18 May 2020 22:03:26 +0000
Gerrit-HasComments: Yes

Reply via email to