Michael Ho has posted comments on this change. Change subject: IMPALA-2550: Switch to per-query exec rpc ......................................................................
Patch Set 4: (41 comments) Some more comments. Still going through the QueryState code and need to do more passes with the coordinator code. http://gerrit.cloudera.org:8080/#/c/6535/4/be/src/exec/data-sink.cc File be/src/exec/data-sink.cc: PS4, Line 66: nit: wrong indentation http://gerrit.cloudera.org:8080/#/c/6535/4/be/src/runtime/coordinator-backend-state.cc File be/src/runtime/coordinator-backend-state.cc: PS4, Line 58: //done_(false) { delete PS4, Line 104: rpc_params->fragment_ctxs.back().fragment.idx : != params.fragment_exec_params.fragment.idx) { The assumption is that all fragment instances belonging to the same fragment are grouped together in params.fragment_exec_params. Any chance we can have a DCHECK for that here or Init() ? PS4, Line 112: params.fragment_exec_params.fragment.idx May as well factor this out as a local variable as it's used above too. The code would be easier to parse that way. PS4, Line 170: lock_guard<mutex> l(lock_); Is it necessary to hold the lock while creating BackendConnection below ? Is it not safe to not hold the lock while making the RPC ? Is there some sort of race if we only acquire the lock after making the RPC to update rpc_sent_ and rpc_latency_ ? PS4, Line 173: client_connect_status; Is this not used or did you intend to use this instead of status_ ? We may skip holding the lock here and only merge client_connect_status with status_ if there is any error. PS4, Line 184: const string ERR_TEMPLATE = : "ExecQueryFInstances rpc query_id=$0 failed: $1"; Shouldn't this live in common/thrift/generate_error_codes.py ? PS4, Line 332: lock_guard<mutex> l2(lock_); How heavily contended is this lock ? Will this become a bottleneck if we have a lot of fragment instances reporting status ? PS4, Line 346: nit: indentation is off. Line 365: if (status_.ok()) { Is this block empty now ? It only contains comment, right ? PS4, Line 379: //UpdateAverageProfile(exec_state); Not needed ? Done in UpdateExecStats(), right ? PS4, Line 391: num_remaining_instances_ == 0 || !status_.ok(); Why not call IsDone() here ? PS4, Line 393: //if (*done) stopwatch_.Stop(); delete PS4, Line 444: 3 magic constant ? #define or constexpr ? PS4, Line 513: NULL nullptr PS4, Line 549: //DCHECK_EQ(fragment_profiles_[instance_state.fragment_idx()].num_instances, : //node_exec_summary.exec_stats.size()); ? http://gerrit.cloudera.org:8080/#/c/6535/4/be/src/runtime/coordinator-backend-state.h File be/src/runtime/coordinator-backend-state.h: PS4, Line 45: . Can you please also add a comment stating that multiple fragments run a same backend share a single BackendState. So the hierarchy is: BackendState -> Fragment -> Fragment Instance PS4, Line 97: boost::mutex* lock() { return &lock_; } Is it possible to not expose the lock ? It'd be easier to reason about the code if all uses of 'lock_' happens within class functions. PS4, Line 100: //const vector<InstanceStats>& instance_stats() const { return instance_stats_; } : //MonotonicStopWatch* stopwatch() { return &stopwatch_; } delete PS4, Line 109: //InstanceStats* GetInstanceStats(const TUniqueId& instance_id); delete Line 111: return num_remaining_instances_ == 0 || !status_.ok(); nit: one line ? PS4, Line 113: &error_log_; How does this work if multiple callers modify the map at the same time ? PS4, Line 114: //const std::vector<RuntimeProfile*>& instance_profiles() const { : //return instance_profiles_; : //} delete PS4, Line 133: #if 0 delete PS4, Line 143: / delete PS4, Line 181: std::vector<InstanceStats> instance_stats_; : : /// map from instance idx to InstanceStats : std::unordered_map<int, InstanceStats*> instance_stats_map_; Why the two separate data structures ? Why cannot this be a map of instance idx to InstanceStats directly ? Not cache friendly ? PS4, Line 223: //bool done_; Still needed ? http://gerrit.cloudera.org:8080/#/c/6535/4/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: Line 113: torn_down_(false) {} Should the constructor also initialize coord_state_idx_ to -1 in case there is no coordinator fragment ? PS4, Line 234: new FragmentStats(avg_profile_name, root_profile_name, num_instances, obj_pool()) Should this be added to obj_pool() too ? fragment_stats_ only contains pointers. Line 397: //CountingBarrier* exec_complete_barrier = exec_complete_barrier_.get(); Delete ? PS4, Line 400: [backend_state, this, &debug_options]() { : backend_state->Exec(query_ctx_, debug_options, filter_routing_table_, : exec_complete_barrier_.get()); : }) IMHO, it seems easier to read if this is defined as a function like the existing code. http://gerrit.cloudera.org:8080/#/c/6535/4/be/src/runtime/coordinator.h File be/src/runtime/coordinator.h: PS4, Line 246: Filled in : /// StartBackendExec(). Filled in by InitBackendStates() ? PS4, Line 270: = nullptr; Do we have a guideline on whether pointer like this should be initialized in constructor or in class declaration ? http://gerrit.cloudera.org:8080/#/c/6535/4/be/src/runtime/fragment-instance-state.cc File be/src/runtime/fragment-instance-state.cc: Line 139: new RuntimeState(query_state_, fragment_ctx_, instance_ctx_, ExecEnv::GetInstance())); long line http://gerrit.cloudera.org:8080/#/c/6535/4/be/src/runtime/query-exec-mgr.cc File be/src/runtime/query-exec-mgr.cc: PS4, Line 56: // start new thread for query startup This is done so the RPC won't be blocked waiting for all FInstances to start. This allows fragments to be started on multiple backends in parallel. http://gerrit.cloudera.org:8080/#/c/6535/4/be/src/service/impala-internal-service.cc File be/src/service/impala-internal-service.cc: Line 56: void ImpalaInternalService::CancelQueryFInstances(TCancelQueryFInstancesResult& return_val, nit: long line http://gerrit.cloudera.org:8080/#/c/6535/4/common/thrift/ImpalaInternalService.thrift File common/thrift/ImpalaInternalService.thrift: PS4, Line 399: fragment "it parent fragments". It's unique across all fragments as it's query-wide instance index, right ? I was misled into believing that each fragment has its own index space. PS4, Line 399: accross across PS4, Line 401: per_ Why per_ ? The comment calls it fragment_instance_idx. Line 411: // Id of this instance in its role as a sender. Used by DataStreamSender ? PS4, Line 433: coord Will backend_state_idx or coord_backend_state_idx be better ? -- To view, visit http://gerrit.cloudera.org:8080/6535 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I20769e420711737b6b385c744cef4851cee3facd Gerrit-PatchSet: 4 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Marcel Kornacker <[email protected]> Gerrit-Reviewer: Dan Hecht <[email protected]> Gerrit-Reviewer: Henry Robinson <[email protected]> Gerrit-Reviewer: Marcel Kornacker <[email protected]> Gerrit-Reviewer: Michael Ho <[email protected]> Gerrit-HasComments: Yes
