Fang-Yu Rao has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 24:

(18 comments)

Hi Michael and Thomas, please review my revised patch. I have addressed all of 
Michael's comments in the previous iteration. Thanks!

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator-filter-state.h
File be/src/runtime/coordinator-filter-state.h:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator-filter-state.h@124
PS24, Line 124: string
> Please also document what this field stands for.
Done


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1103
PS24, Line 1103:    // Workaround for fact that parameters are const& for 
Protobuf RPCs.
               :     BloomFilterPB* non_const_filter = 
&const_cast<BloomFilterPB&>(params->bloom_filter());
> I don't think this makes sense anymore with the implementation using protob
Thanks! In this regard, I have used a copy constructor to initialize a 
BloomFilterPB instead of calling Swap().


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1108
PS24, Line 1108: if (!bloom_filter_.has_log_bufferpool_space()) {
> May be I am missing the details here but why is it not a no-op if the incom
Thanks for pointing out this. There is a reason for this.

After taking a closer look at the code here and performing some tests, I found 
that there is a corner case we have to take care of when params->bloom_filter() 
is always_false. Specifically, I found that we need to initialize 
'bloom_filter_' even if params->bloom_filter() is always_false when 
'bloom_filter_' has never been initialized by any incoming Bloom filter 
previously.

If 'bloom_filter_' has never been initialized, its field of 
'log_bufferpool_space_' would be 0, which in turn will result in an error due 
to a DCHECK in 'buffer-allocator.cc' 
(https://github.com/apache/impala/blob/master/be/src/runtime/bufferpool/buffer-allocator.cc#L278)
 that requires that the buffer length of the directory associated with the 
Bloom filter to be published is greater than some lower bound when attempting 
to call BloomFilter::Init() in RuntimeFilterBank::PublishGlobalFilter().


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1110
PS24, Line 1110: bloom_filter_.Swap(non_const_filter);
> See comments above.
Done


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1132
PS24, Line 1132:  bloom_filter_.Swap(non_const_filter);
> See comments above.
Done


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1134
PS24, Line 1134: std::string(
               :               reinterpret_cast<const 
char*>(sidecar_slice.data()), sidecar_slice.size());
> Thanks for pointing this out! I have replaced the original statement with '
I actually found out that if we are using 
"std::move(sidecar_slice.ToString())", then we will not be able to build Impala 
with the asan flag due to the following error message, and thus for now I am 
using the previous approach.

"Moving a temporary object prevents copy elision."


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1140
PS24, Line 1140: sidecar_slice.ToString(),
> Seems less expensive if we can pass the slice directly. This avoids doing a
Thanks for pointing this out! I have slightly changed the signature of 
BloomFilter::Or() to avoid this additional copy. Due to this change, I have 
also modified bloom-filter-test.cc and bloom-filter-benchmark.cc where 
BloomFilter::Or() is tested.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1141
PS24, Line 1141: bloom_filter_directory_
> May need to do some const casting but please consider using:
Indeed, because of the change to BloomFilter::Or(), I have to use the following 
statement to pass to BloomFilter::Or() the pointer to the beginning of the 
Bloom filter directory.

reinterpret_cast<uint8_t *>(const_cast<char *>(bloom_filter_directory_.data()))


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/query-state.h
File be/src/runtime/query-state.h:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/query-state.h@202
PS24, Line 202: const PublishFilterParamsPB*
> Why not keep it as const PublishFilterParamsPB& ?
Thanks for the comment. I have changed the type of 'params' to "const 
PublishFilterParamsPB&".

To be more specific, in DataStreamService::PublishFilter(), I pass a "const 
PublishFilterParamsPB&" to QueryState::PublishFilter() and have made the 
corresponding changes to the following functions:
1. FragmentInstanceState::PublishFilter(),
2. RuntimeFilterBank::PublishGlobalFilter().

The reason I used "const PublishFilterParamsPB*" is because in the 
automatically generated data_stream_service.service.cc, 
DataStreamServiceIf::DataStreamServiceIf() passes a "const 
PublishFilterParamsPB*" to its member function PublishFilter(), which is 
implemented by DataStreamService::PublishFilter().

QueryState::PublishFilter() is called in DataStreamService::PublishFilter() and 
therefore if the argument is of the type "const PublishFilterParamsPB*", we do 
not have to dereference in DataStreamService::PublishFilter() when calling 
QueryState::PublishFilter().


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@174
PS24, Line 174: UpdateFilterParamsPB* params = obj_pool_.Add(new 
UpdateFilterParamsPB);
> Or you can just allocate it on the stack as it isn't too large:
Done


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@175
PS24, Line 175:     UpdateFilterResultPB* res = obj_pool_.Add(new 
UpdateFilterResultPB);
              :     RpcController* controller = obj_pool_.Add(new 
RpcController);
> Don't think it's safe to free the memory in the completion callback as the
Thanks! I will keep them as is.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@196
PS24, Line 196: Couldn't send filter to coordinator: "
> Substitute("Failed to get proxy to coordinator $0: $1", host_address, get_p
Done


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@240
PS24, Line 240: LOG(WARNING) << "Cannot get inbound sidecar:
> LOG(ERROR) << "Failed to get bloom filter sidecar" << ...
Done


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@320
PS24, Line 320:   // Wait for all inflight rpcs to complete before closing the 
filters.
              :   {
              :     std::unique_lock<SpinLock> l1(num_inflight_rpcs_lock_);
              :     while (num_inflight_rpcs_ > 0) {
              :       krpcs_done_cv_.wait(l1);
              :     }
              :   }
              :
              :   lock_guard<mutex> l2(runtime_filter_lock_);
              :   closed_ = true;
> Actually, it's possible for RuntimeFilterBank::UpdateFilterFromLocal() and
Thanks for the suggestion. I have added a comment accordingly.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc@119
PS24, Line 119: req
> Per comments elsewhere, we usually pass const value by reference *req.
Thanks for pointing this out! I have changed the signature of 
ImpalaServer::UpdateFilter() accordingly.

Due to the change above, I have also made the corresponding changes to the 
following functions:
1. ClientRequestState::UpdateFilter(),
2. Coordinator::UpdateFilter(), and
3. Coordinator::FilterState::ApplyUpdate().


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/impala-server.h
File be/src/service/impala-server.h:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/impala-server.h@348
PS24, Line 348: const UpdateFilterParamsPB*
> Use const UpdateFilterParamsPB& params. See comments in data-stream-service
Done


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/util/bloom-filter.cc
File be/src/util/bloom-filter.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/util/bloom-filter.cc@218
PS24, Line 218: const string& directory_in
> Please see comments in coordinator.cc. Please consider passing in the slide
Thanks! I have changed the signature of BloomFilter::Or() to the following.

void BloomFilter::Or(const BloomFilterPB& in, const uint8_t* directory_in, 
BloomFilterPB* out, uint8_t* directory_out, size_t directory_size);


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/util/bloom-filter.cc@219
PS24, Line 219: string*
> This may be char* instead.
Done.



--
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 24
Gerrit-Owner: Fang-Yu Rao <[email protected]>
Gerrit-Reviewer: Fang-Yu Rao <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Michael Ho <[email protected]>
Gerrit-Reviewer: Thomas Tauber-Marshall <[email protected]>
Gerrit-Comment-Date: Mon, 14 Oct 2019 15:17:05 +0000
Gerrit-HasComments: Yes

Reply via email to