Marcel Kornacker has posted comments on this change. Change subject: IMPALA-2905: Handle coordinator fragment lifecycle like all others ......................................................................
Patch Set 10: (44 comments) haven't looked at plan-root-sink.cc yet http://gerrit.cloudera.org:8080/#/c/4402/10/be/src/exec/plan-root-sink.h File be/src/exec/plan-root-sink.h: Line 32: /// Sink which manages the handoff between a 'sender' (a plan fragment) that produces differentiate fragment/instance Line 50: /// The sink is thread safe up to a single producer and single consumer. i'm still nervous about the ping-pong behavior that this implementation requires between the pfe thread and the coordinator thread. that's usually a recipe for poor performance. leave a todo to investigate and fix if necessary by decoupling with a limited-size queue http://gerrit.cloudera.org:8080/#/c/4402/10/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: Line 1501 i think you should resolve this todo here PS10, Line 43: #include "exec/plan-root-sink.h" : #include "exec/scan-node.h" : #include "gen-cpp/Frontend_types.h" : : #include "gen-cpp/ImpalaInternalService.h" : #include "gen-cpp/ImpalaInternalService_constants.h" : #include "gen-cpp/ImpalaInternalService_types.h" : #include "gen-cpp/Partitions_types.h" : #include "gen-cpp/PlanNodes_types.h" : #include "runtime/backend-client.h" what's with the reordering? Line 484: fragment_instance_states_.resize(num_fragment_instances); is there any reason to do this here, instead of in start..()? this makes it easy to break in a subsequent change, someone simply needs to add a return_if_error(...) somewhere between this resize and the start..() call. Line 488: // The coordinator instance may require a query mem tracker even if there is no why does this still need to be done here? i wouldn't expect to see any more coord instance specialization here (or is this just a matter of fixing up the comment?) Line 524: executor_ = root_fragment_instance_->executor(); is there any reason why you need both root_finstance_ and executor_? executor() instead? Line 611: // remote fragments. This code anticipates the indices of the instance states created remove references to remote fragments, they're all remote now, unless you mean the non-coord fragments. Line 1129: RETURN_IF_ERROR(executor_->WaitForPrepare()); you already called this in Exec(). no harm, i guess, but i'm also scratching my head wondering whether it's needed. Line 1164: "Perhaps Prepare() failed and was not " i find this formatting counterproductive, everything is bunched up on the right. you can probably do this with one line less. also, if the condition becomes longer but still fits on a single line, this entire statement can end up taking up an arbitrary number of lines. Line 1179: // TODO: Why do we wait for instances here? Comment below is not clear. It would be have you tried removing that call? something about lifecycle problems? where did the cancellation go? Line 1232: lock_guard<SpinLock> l(exec_summary_lock_); why is this necessary? intuitively i'd assume that initialization doesn't need to deal with concurrency Line 1284: fragment_profiles_[i].averaged_profile = nullptr; i think we should get rid of the specialization of the coord instance, incl. the shape of it's profile(s). this complicates the code in various other places as well. Line 1925: // TODO-MT: use FragmentInstanceState::error_log_ instead resolve? Line 2131: executor_->root_sink()->CloseConsumer(); would be better not to have to reach into the executor here. why not also introduce a teardown() (actually, we agreed on the name ReleaseResources() here https://docs.google.com/document/d/1DLrpowF4YGmMhHrVYsyLHexk5uX1S-nSlrSbk3jjka4/edit http://gerrit.cloudera.org:8080/#/c/4402/10/be/src/runtime/coordinator.h File be/src/runtime/coordinator.h: PS10, Line 37: #include "common/global-types.h" : #include "common/hdfs.h" : #include "common/status.h" : #include "gen-cpp/Frontend_types.h" : #include "gen-cpp/Types_types.h" : #include "runtime/runtime-state.h" : #include "scheduling/simple-scheduler.h" : #include "service/fragment-exec-state.h" : #include "service/fragment-mgr.h" : #include "util/histogram-metric.h" : #include "util/progress-updater.h" : #include "util/runtime-profile.h" what's this about? Line 76: /// Query coordinator: handles execution of plan fragments on remote nodes, given update. also describe result materialization logic somewhere. Line 111: /// as all plan fragments have started executing at their respective backends. that doesn't seem quite right, or at least not very precise, Exec() calls WaitForPrepare() on the coord instance. Line 124: /// Fills 'results' with up to 'num_results' rows produced by the coordinator num_results -> num_rows? Line 280: /// them to the client in GetNext(). owner? who creates this? it's a good idea to describe the materialization mechanism here/somewhere, because that's critical to understanding this non-trivial control flow. Line 395: /// this fragment instance. set where? http://gerrit.cloudera.org:8080/#/c/4402/10/be/src/runtime/plan-fragment-executor.cc File be/src/runtime/plan-fragment-executor.cc: Line 407 we're purposely logging here before returning. Line 214: if (sink_->GetName() == PlanRootSink::NAME) { use TDataSink.type instead Line 265: SCOPED_TIMER(ADD_TIMER(timings_profile_, "OpenTime")); why not just add this counter to profile() and make total_time_counter derived? Line 274: RETURN_IF_ERROR(runtime_state_->desc_tbl().PrepareAndOpenPartitionExprs(runtime_state_.get())); long line Line 292: SCOPED_TIMER(ADD_TIMER(timings_profile_, "PlanOpenTimer")); we already have too many counters that people don't understand, does this really help? Line 306: // If there's no error, ExecInternal() completed the fragment execution. differentiate fragment vs. instance Line 328: RETURN_IF_ERROR(plan_->GetNext(runtime_state_.get(), row_batch_.get(), &done_)); done_ apparently doesn't simply mean opened. http://gerrit.cloudera.org:8080/#/c/4402/10/be/src/runtime/plan-fragment-executor.h File be/src/runtime/plan-fragment-executor.h: Line 55: /// if (Prepare().ok()) { PreparePlan/OpenPlan/ExecPlan or Prepare/Open/Exec? also, i want to get away from calling this 'plan', because plan elsewhere means "the set of fragments that together compute a result set" Line 112: /// plan tree. report_status_cb will have been called for the final time when Exec() plan tree -> fragment instance Line 146: /// Returns fragment instance's sink if this is the root fragment instance. when is it valid to call this? Line 154: ExecNode* plan_; // lives in runtime_state_->obj_pool() consider renaming to exec_tree_ or something like that to start the processes of cleaning up the confusing/overlapping/ambiguous terminology. Line 172: bool done_; let's pick a different, more meaningful name, we use 'done' all over the place to indicate different things. makes the code harder to read. opened? Line 197: RuntimeProfile* timings_profile_; why do we need this? Line 288: /// TODO: Move back to Prepare()? todo: consolidate across all instances of the same fragment http://gerrit.cloudera.org:8080/#/c/4402/10/be/src/service/impala-internal-service.h File be/src/service/impala-internal-service.h: Line 35: ImpalaInternalService(const boost::shared_ptr<ImpalaServer>& impala_server) why not also move ImpalaServer into the exec env? out of scope, but todo? http://gerrit.cloudera.org:8080/#/c/4402/10/be/src/service/impala-server.h File be/src/service/impala-server.h: Line 75: class QueryResultSet { please move this into a separate file altogether, there's no need for it to live in this .h file. also consider moving all subclasses into the same file. impala-server.{cc,h} and related are large enough as it is, and i think it would make the class hierarchy easier to follow, right now this is scattered over a number of .cc files. it will also help with breaking include dependencies (we have too many includes of impala-server.h). Line 81: /// comes from a select query, the row is in the form of expr values (void*). 'scales' not your fault, i know, but please tighten up the comment, it sounds a bit incoherent/rambling (and 'select stmt' -> QueryStmt, which i think what this is trying to say). Line 87: /// Add the TResultRow to this result set. When a row comes from a DDL/metadata this is a weird class, these functions are really specific to the subclasses of QueryResultSet. it feels like instead it should have a AddRowBatch() and the subclass itself does the appropriate conversion. outside the scope of this change, but unless you disagree, please leave a todo. http://gerrit.cloudera.org:8080/#/c/4402/10/be/src/service/query-exec-state.cc File be/src/service/query-exec-state.cc: Line 936 preserve dchecks? Line 939 at least preserve these comments. Line 945 what about this? Line 748: lock_.unlock(); i find the locking protocol in combination with the several layers of indirection near-impenetrable. why is this lock held here and why can't you continue to hold it? http://gerrit.cloudera.org:8080/#/c/4402/10/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java File fe/src/main/java/org/apache/impala/planner/PlanRootSink.java: Line 25: * A sink which acts as a buffer for results produced by a fragment instance, so that a not specific enough: this also applies to non-root fragment instances that send to exchange nodes. -- To view, visit http://gerrit.cloudera.org:8080/4402 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ibb0064ec2f085fa3a5598ea80894fb489a01e4df Gerrit-PatchSet: 10 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Henry Robinson <[email protected]> Gerrit-Reviewer: Alex Behm <[email protected]> Gerrit-Reviewer: Henry Robinson <[email protected]> Gerrit-Reviewer: Marcel Kornacker <[email protected]> Gerrit-Reviewer: Tim Armstrong <[email protected]> Gerrit-HasComments: Yes
