Sailesh Mukil has posted comments on this change. ( http://gerrit.cloudera.org:8080/11055 )
Change subject: IMPALA-3825: Distribute Runtime Filtering Aggregation ...................................................................... Patch Set 3: (40 comments) http://gerrit.cloudera.org:8080/#/c/11055/3//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/11055/3//COMMIT_MSG@14 PS3, Line 14: Add comments about the following, so that it's easier for reviewers: - Explain in a para or two how your patch achieves the distribution; i.e. explain your approach in plain english. - What kind of new failure modes can happen because of this change, as opposed to before? (Talk about the Exec() RPC race with the UpdateFilter() RPC) - How long do we expect the aggregators to be accessible? i.e. what is the lifetime of an aggregator tied to? - How are you updating the runtime profile? http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator-backend-state.cc File be/src/runtime/coordinator-backend-state.cc: http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator-backend-state.cc@95 PS3, Line 95: x naming http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator-backend-state.cc@98 PS3, Line 98: LOG(INFO) << "DebuggingPublishFilter AggregatorAddress " << tfs.aggregator_address; remove http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator-backend-state.cc@95 PS3, Line 95: for (auto const& x : filter_routing_table) { : TFilterState tfs; : x.second.ToThrift(&tfs); : LOG(INFO) << "DebuggingPublishFilter AggregatorAddress " << tfs.aggregator_address; : rpc_params->filter_routing_table.insert( : std::pair<int32_t, TFilterState>(x.first, tfs)); : } Add a comment about what you're doing here. http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@260 PS3, Line 260: x naming. Call it 'filter' or something similar http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@262 PS3, Line 262: num_filters_ Is this needed as a member variable? Doesn't seem to be used anywhere. http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@690 PS3, Line 690: if (params.__isset.filter_updates_received) { Add a comment above this line: "Update aggregator's filter metrics in the runtime profile" http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@691 PS3, Line 691: if (backend_state->GetNumReceivedFilters() < params.filter_updates_received) { Add another comment above this line: "Make sure not to double count filter updates from the same aggregator." http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@690 PS3, Line 690: if (params.__isset.filter_updates_received) { : if (backend_state->GetNumReceivedFilters() < params.filter_updates_received) { : filter_updates_received_->Add( : params.filter_updates_received - backend_state->GetNumReceivedFilters()); : backend_state->SetNumReceivedFilters(params.filter_updates_received); : } : } There's a race here. If 2 updates for the same Backend execute in parallel, you'll end up having an incorrect number of updated filters. http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@884 PS3, Line 884: LOG(INFO) : << "DebuggingPublishFilter Coordinator sending filter to fragment with id " : << fragment_idx; remove http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h File be/src/runtime/filter-state.h: http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@141 PS3, Line 141: FilterTarget const ref http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@150 PS3, Line 150: /// Need to cast the int type of this class to int32_t of thrift Still needed? http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@151 PS3, Line 151: set Why not unordered? http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@152 PS3, Line 152: int i int32_t here and remove the cast in the next line. Also, rename to 'idx'. http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@167 PS3, Line 167: TFilterTarget const ref http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@171 PS3, Line 171: boost std http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@130 PS3, Line 130: bool disabled() const { : if (is_bloom_filter()) { : return bloom_filter_.always_true; : } else { : DCHECK(is_min_max_filter()); : return min_max_filter_.always_true; : } : } : : void ToThrift(TFilterState* f) const { : std::vector<TFilterTarget> t_targets; : for (FilterTarget filter_target : targets_) { : TFilterTarget thrift_filter_target; : filter_target.ToThrift(&thrift_filter_target); : t_targets.push_back(thrift_filter_target); : } : f->__set_targets(t_targets); : f->__set_desc(desc_); : f->__set_src(src_); : f->__set_pending_count(pending_count_); : /// Need to cast the int type of this class to int32_t of thrift : std::set<int32_t> src_fragment_instance_idxs; : for (int i : src_fragment_instance_idxs_) { : src_fragment_instance_idxs.insert((int32_t)i); : } : f->__set_src_fragment_instance_idxs(src_fragment_instance_idxs); : f->__set_bloom_filter(bloom_filter_); : f->__set_min_max_filter(min_max_filter_); : f->__set_first_arrival_time(first_arrival_time_); : f->__set_completion_time(completion_time_); : f->__set_aggregator_address(aggregator_address_); : } : : static FilterState FromThrift(const TFilterState& f) { : FilterState fs(f.desc, f.src, f.aggregator_address, f.pending_count, : f.first_arrival_time, f.completion_time, f.bloom_filter, f.min_max_filter); : std::vector<FilterTarget> targets; : for (TFilterTarget filter_target : f.targets) { : targets.push_back(FilterTarget::FromThrift(filter_target)); : } : fs.set_targets(targets); : boost::unordered_set<int> src_fragment_instance_idxs; : for (int idx : f.src_fragment_instance_idxs) { : src_fragment_instance_idxs.insert(idx); : } : fs.set_src_fragment_instance_idxs(src_fragment_instance_idxs); : : return fs; : } These can be moved to the .cc file. http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@191 PS3, Line 191: TPlanNodeId src_; : std::vector<FilterTarget> targets_; Add a comment for these both. It wasn't added to the CoordinatorFilterState, but it should have. http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@198 PS3, Line 198: boost::unordered_set We can use the std version for this http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-exec-mgr.cc File be/src/runtime/query-exec-mgr.cc: http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-exec-mgr.cc@58 PS3, Line 58: if (params.filter_routing_table.size() != 0) { Add a comment above this: "Initialize the filter routing table given by the coordinator." http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.h File be/src/runtime/query-state.h: http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.h@181 PS3, Line 181: const std::map<int32_t, TFilterState> Add the parameter name as well. http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.h@206 PS3, Line 206: std::map<int32_t, FilterState> filter_routing_table_; : : SpinLock filter_lock_; : : std::map<TNetworkAddress, std::set<int32_t>> backend_list_; Add comments for these. http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc File be/src/runtime/query-state.cc: http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@457 PS3, Line 457: const std::map<int32_t, TFilterState> Take this as a const ref, so that we avoid copying the map on calling this function. http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@458 PS3, Line 458: std::pair<int, TFilterState> it Using 'auto' for iterators is reasonable. https://google.github.io/styleguide/cppguide.html#auto http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@459 PS3, Line 459: s Avoid using names that aren't self explanatory. Eg: This could be called 'filter' http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@466 PS3, Line 466: std::map<int32_t, FilterState>::iterator auto http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@551 PS3, Line 551: t 'backend' http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@134 PS3, Line 134: Couldn't send filter to aggregator Couldn't open a connection to the aggregator node http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@137 PS3, Line 137: for (int i = 0; i < MAX_FSEND_RETRY; ++i) { (Just FYI for now) If we decide to make this run asynchronously as part of the rpc_pool() ThreadPool (see comment on L205), then we can add some sleeps between the retries. http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@138 PS3, Line 138: LOG(INFO) << "DebuggingPublishFilter trying to contact aggregator\n"; This isn't needed. It might make the logs too noisy. http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@141 PS3, Line 141: LOG(INFO) << res.status.status_code; Not needed. http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@142 PS3, Line 142: if (res.status.status_code == TErrorCode::OK) { : break; : } You need to check 'status' too. That is the status of the RPC from the senders point of view. If it's not OK, that means the RPC failed to send for some reason (host not reachable, connection timed out, etc.) 'res.status' will contain the status set by the remote node as a response to this RPC. (Eg: "Aggregator not found, etc.) So you need to check both here. We don't do this in SendFilterToCoordinator() since we don't care if the RPC fails in that case. http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@147 PS3, Line 147: } Add a newline before this http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@196 PS3, Line 196: LOG(INFO) << "DebuggingPublishFilter Produced filter with filter id" << filter_id; Not needed. The logs will get too noisy. http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@203 PS3, Line 203: TODO super-nit: "TODO:" http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@204 PS3, Line 204: SendFilterToAggregator( : aggregator_address, params, ExecEnv::GetInstance()->impalad_client_cache()); My take is that we can also run this as part of the rpc_pool() threadpool, so that it doesn't slow this fragment instance down. Let's make a decision once Michael and/or Tim have had a chance to chime in. http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/service/impala-server.cc File be/src/service/impala-server.cc: http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/service/impala-server.cc@2221 PS3, Line 2221: s naming http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/service/impala-server.cc@2221 PS3, Line 2221: QueryState* s = : ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(params.query_id); Move inside the 'if' condition. http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/service/impala-server.cc@2230 PS3, Line 2230: result.status.__set_status_code(TErrorCode::OK); : result.__isset.status = true; Better to set this after the call to UpdateFilter http://gerrit.cloudera.org:8080/#/c/11055/3/common/thrift/ImpalaInternalService.thrift File common/thrift/ImpalaInternalService.thrift: http://gerrit.cloudera.org:8080/#/c/11055/3/common/thrift/ImpalaInternalService.thrift@822 PS3, Line 822: nit:whitespace -- To view, visit http://gerrit.cloudera.org:8080/11055 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I94e183a0353fc46f8d3eccae029d2d52c5cdc40c Gerrit-Change-Number: 11055 Gerrit-PatchSet: 3 Gerrit-Owner: Rahul Shivu Mahadev <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Michael Ho <[email protected]> Gerrit-Reviewer: Sailesh Mukil <[email protected]> Gerrit-Comment-Date: Sun, 29 Jul 2018 22:31:17 +0000 Gerrit-HasComments: Yes
