Sailesh Mukil has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/11055 )

Change subject: IMPALA-3825: Distribute Runtime Filtering Aggregation
......................................................................


Patch Set 3:

(40 comments)

http://gerrit.cloudera.org:8080/#/c/11055/3//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/11055/3//COMMIT_MSG@14
PS3, Line 14:
Add comments about the following, so that it's easier for reviewers:

- Explain in a para or two how your patch achieves the distribution; i.e. 
explain your approach in plain english.
- What kind of new failure modes can happen because of this change, as opposed 
to before? (Talk about the Exec() RPC race with the UpdateFilter() RPC)
- How long do we expect the aggregators to be accessible? i.e. what is the 
lifetime of an aggregator tied to?
- How are you updating the runtime profile?


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator-backend-state.cc
File be/src/runtime/coordinator-backend-state.cc:

http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator-backend-state.cc@95
PS3, Line 95: x
naming


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator-backend-state.cc@98
PS3, Line 98:     LOG(INFO) << "DebuggingPublishFilter AggregatorAddress " << 
tfs.aggregator_address;
remove


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator-backend-state.cc@95
PS3, Line 95:   for (auto const& x : filter_routing_table) {
            :     TFilterState tfs;
            :     x.second.ToThrift(&tfs);
            :     LOG(INFO) << "DebuggingPublishFilter AggregatorAddress " << 
tfs.aggregator_address;
            :     rpc_params->filter_routing_table.insert(
            :         std::pair<int32_t, TFilterState>(x.first, tfs));
            :   }
Add a comment about what you're doing here.


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@260
PS3, Line 260: x
naming. Call it 'filter' or something similar


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@262
PS3, Line 262: num_filters_
Is this needed as a member variable? Doesn't seem to be used anywhere.


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@690
PS3, Line 690:   if (params.__isset.filter_updates_received) {
Add a comment above this line:

"Update aggregator's filter metrics in the runtime profile"


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@691
PS3, Line 691:     if (backend_state->GetNumReceivedFilters() < 
params.filter_updates_received) {
Add another comment above this line:
"Make sure not to double count filter updates from the same aggregator."


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@690
PS3, Line 690:   if (params.__isset.filter_updates_received) {
             :     if (backend_state->GetNumReceivedFilters() < 
params.filter_updates_received) {
             :       filter_updates_received_->Add(
             :           params.filter_updates_received - 
backend_state->GetNumReceivedFilters());
             :       
backend_state->SetNumReceivedFilters(params.filter_updates_received);
             :     }
             :   }
There's a race here. If 2 updates for the same Backend execute in parallel, 
you'll end up having an incorrect number of updated filters.


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@884
PS3, Line 884:       LOG(INFO)
             :           << "DebuggingPublishFilter Coordinator sending filter 
to fragment with id "
             :           << fragment_idx;
remove


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h
File be/src/runtime/filter-state.h:

http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@141
PS3, Line 141: FilterTarget
const ref


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@150
PS3, Line 150: /// Need to cast the int type of this class to int32_t of thrift
Still needed?


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@151
PS3, Line 151: set
Why not unordered?


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@152
PS3, Line 152: int i
int32_t here and remove the cast in the next line. Also, rename to 'idx'.


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@167
PS3, Line 167: TFilterTarget
const ref


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@171
PS3, Line 171: boost
std


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@130
PS3, Line 130:   bool disabled() const {
             :     if (is_bloom_filter()) {
             :       return bloom_filter_.always_true;
             :     } else {
             :       DCHECK(is_min_max_filter());
             :       return min_max_filter_.always_true;
             :     }
             :   }
             :
             :   void ToThrift(TFilterState* f) const {
             :     std::vector<TFilterTarget> t_targets;
             :     for (FilterTarget filter_target : targets_) {
             :       TFilterTarget thrift_filter_target;
             :       filter_target.ToThrift(&thrift_filter_target);
             :       t_targets.push_back(thrift_filter_target);
             :     }
             :     f->__set_targets(t_targets);
             :     f->__set_desc(desc_);
             :     f->__set_src(src_);
             :     f->__set_pending_count(pending_count_);
             :     /// Need to cast the int type of this class to int32_t of 
thrift
             :     std::set<int32_t> src_fragment_instance_idxs;
             :     for (int i : src_fragment_instance_idxs_) {
             :       src_fragment_instance_idxs.insert((int32_t)i);
             :     }
             :     
f->__set_src_fragment_instance_idxs(src_fragment_instance_idxs);
             :     f->__set_bloom_filter(bloom_filter_);
             :     f->__set_min_max_filter(min_max_filter_);
             :     f->__set_first_arrival_time(first_arrival_time_);
             :     f->__set_completion_time(completion_time_);
             :     f->__set_aggregator_address(aggregator_address_);
             :   }
             :
             :   static FilterState FromThrift(const TFilterState& f) {
             :     FilterState fs(f.desc, f.src, f.aggregator_address, 
f.pending_count,
             :         f.first_arrival_time, f.completion_time, f.bloom_filter, 
f.min_max_filter);
             :     std::vector<FilterTarget> targets;
             :     for (TFilterTarget filter_target : f.targets) {
             :       targets.push_back(FilterTarget::FromThrift(filter_target));
             :     }
             :     fs.set_targets(targets);
             :     boost::unordered_set<int> src_fragment_instance_idxs;
             :     for (int idx : f.src_fragment_instance_idxs) {
             :       src_fragment_instance_idxs.insert(idx);
             :     }
             :     
fs.set_src_fragment_instance_idxs(src_fragment_instance_idxs);
             :
             :     return fs;
             :   }
These can be moved to the .cc file.


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@191
PS3, Line 191:   TPlanNodeId src_;
             :   std::vector<FilterTarget> targets_;
Add a comment for these both. It wasn't added to the CoordinatorFilterState, 
but it should have.


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@198
PS3, Line 198: boost::unordered_set
We can use the std version for this


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-exec-mgr.cc
File be/src/runtime/query-exec-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-exec-mgr.cc@58
PS3, Line 58:   if (params.filter_routing_table.size() != 0) {
Add a comment above this:
"Initialize the filter routing table given by the coordinator."


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.h
File be/src/runtime/query-state.h:

http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.h@181
PS3, Line 181: const std::map<int32_t, TFilterState>
Add the parameter name as well.


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.h@206
PS3, Line 206:   std::map<int32_t, FilterState> filter_routing_table_;
             :
             :   SpinLock filter_lock_;
             :
             :   std::map<TNetworkAddress, std::set<int32_t>> backend_list_;
Add comments for these.


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc
File be/src/runtime/query-state.cc:

http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@457
PS3, Line 457: const std::map<int32_t, TFilterState>
Take this as a const ref, so that we avoid copying the map on calling this 
function.


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@458
PS3, Line 458: std::pair<int, TFilterState> it
Using 'auto' for iterators is reasonable.
https://google.github.io/styleguide/cppguide.html#auto


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@459
PS3, Line 459: s
Avoid using names that aren't self explanatory.
Eg: This could be called 'filter'


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@466
PS3, Line 466: std::map<int32_t, FilterState>::iterator
auto


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@551
PS3, Line 551: t
'backend'


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@134
PS3, Line 134: Couldn't send filter to aggregator
Couldn't open a connection to the aggregator node


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@137
PS3, Line 137:   for (int i = 0; i < MAX_FSEND_RETRY; ++i) {
(Just FYI for now) If we decide to make this run asynchronously as part of the 
rpc_pool() ThreadPool (see comment on L205), then we can add some sleeps 
between the retries.


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@138
PS3, Line 138: LOG(INFO) << "DebuggingPublishFilter trying to contact 
aggregator\n";
This isn't needed. It might make the logs too noisy.


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@141
PS3, Line 141: LOG(INFO) << res.status.status_code;
Not needed.


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@142
PS3, Line 142:     if (res.status.status_code == TErrorCode::OK) {
             :       break;
             :     }
You need to check 'status' too. That is the status of the RPC from the senders 
point of view. If it's not OK, that means the RPC failed to send for some 
reason (host not reachable, connection timed out, etc.)

'res.status' will contain the status set by the remote node as a response to 
this RPC. (Eg: "Aggregator not found, etc.)

So you need to check both here.

We don't do this in SendFilterToCoordinator() since we don't care if the RPC 
fails in that case.


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@147
PS3, Line 147: }
Add a newline before this


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@196
PS3, Line 196: LOG(INFO) << "DebuggingPublishFilter Produced filter with filter 
id" << filter_id;
Not needed. The logs will get too noisy.


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@203
PS3, Line 203: TODO
super-nit: "TODO:"


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@204
PS3, Line 204:       SendFilterToAggregator(
             :           aggregator_address, params, 
ExecEnv::GetInstance()->impalad_client_cache());
My take is that we can also run this as part of the rpc_pool() threadpool, so 
that it doesn't slow this fragment instance down. Let's make a decision once 
Michael and/or Tim have had a chance to chime in.


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/service/impala-server.cc
File be/src/service/impala-server.cc:

http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/service/impala-server.cc@2221
PS3, Line 2221: s
naming


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/service/impala-server.cc@2221
PS3, Line 2221: QueryState* s =
              :       
ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(params.query_id);
Move inside the 'if' condition.


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/service/impala-server.cc@2230
PS3, Line 2230:       result.status.__set_status_code(TErrorCode::OK);
              :       result.__isset.status = true;
Better to set this after the call to UpdateFilter


http://gerrit.cloudera.org:8080/#/c/11055/3/common/thrift/ImpalaInternalService.thrift
File common/thrift/ImpalaInternalService.thrift:

http://gerrit.cloudera.org:8080/#/c/11055/3/common/thrift/ImpalaInternalService.thrift@822
PS3, Line 822:
nit:whitespace



--
To view, visit http://gerrit.cloudera.org:8080/11055
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I94e183a0353fc46f8d3eccae029d2d52c5cdc40c
Gerrit-Change-Number: 11055
Gerrit-PatchSet: 3
Gerrit-Owner: Rahul Shivu Mahadev <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Michael Ho <[email protected]>
Gerrit-Reviewer: Sailesh Mukil <[email protected]>
Gerrit-Comment-Date: Sun, 29 Jul 2018 22:31:17 +0000
Gerrit-HasComments: Yes

Reply via email to