Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )
Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC ...................................................................... Patch Set 24: (18 comments) Hi Michael and Thomas, please review my revised patch. I have addressed all of Michael's comments in the previous iteration. Thanks! http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator-filter-state.h File be/src/runtime/coordinator-filter-state.h: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator-filter-state.h@124 PS24, Line 124: string > Please also document what this field stands for. Done http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1103 PS24, Line 1103: // Workaround for fact that parameters are const& for Protobuf RPCs. : BloomFilterPB* non_const_filter = &const_cast<BloomFilterPB&>(params->bloom_filter()); > I don't think this makes sense anymore with the implementation using protob Thanks! In this regard, I have used a copy constructor to initialize a BloomFilterPB instead of calling Swap(). http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1108 PS24, Line 1108: if (!bloom_filter_.has_log_bufferpool_space()) { > May be I am missing the details here but why is it not a no-op if the incom Thanks for pointing out this. There is a reason for this. After taking a closer look at the code here and performing some tests, I found that there is a corner case we have to take care of when params->bloom_filter() is always_false. Specifically, I found that we need to initialize 'bloom_filter_' even if params->bloom_filter() is always_false when 'bloom_filter_' has never been initialized by any incoming Bloom filter previously. If 'bloom_filter_' has never been initialized, its field of 'log_bufferpool_space_' would be 0, which in turn will result in an error due to a DCHECK in 'buffer-allocator.cc' (https://github.com/apache/impala/blob/master/be/src/runtime/bufferpool/buffer-allocator.cc#L278) that requires that the buffer length of the directory associated with the Bloom filter to be published is greater than some lower bound when attempting to call BloomFilter::Init() in RuntimeFilterBank::PublishGlobalFilter(). http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1110 PS24, Line 1110: bloom_filter_.Swap(non_const_filter); > See comments above. Done http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1132 PS24, Line 1132: bloom_filter_.Swap(non_const_filter); > See comments above. Done http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1134 PS24, Line 1134: std::string( : reinterpret_cast<const char*>(sidecar_slice.data()), sidecar_slice.size()); > Thanks for pointing this out! I have replaced the original statement with ' I actually found out that if we are using "std::move(sidecar_slice.ToString())", then we will not be able to build Impala with the asan flag due to the following error message, and thus for now I am using the previous approach. "Moving a temporary object prevents copy elision." http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1140 PS24, Line 1140: sidecar_slice.ToString(), > Seems less expensive if we can pass the slice directly. This avoids doing a Thanks for pointing this out! I have slightly changed the signature of BloomFilter::Or() to avoid this additional copy. Due to this change, I have also modified bloom-filter-test.cc and bloom-filter-benchmark.cc where BloomFilter::Or() is tested. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1141 PS24, Line 1141: bloom_filter_directory_ > May need to do some const casting but please consider using: Indeed, because of the change to BloomFilter::Or(), I have to use the following statement to pass to BloomFilter::Or() the pointer to the beginning of the Bloom filter directory. reinterpret_cast<uint8_t *>(const_cast<char *>(bloom_filter_directory_.data())) http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/query-state.h File be/src/runtime/query-state.h: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/query-state.h@202 PS24, Line 202: const PublishFilterParamsPB* > Why not keep it as const PublishFilterParamsPB& ? Thanks for the comment. I have changed the type of 'params' to "const PublishFilterParamsPB&". To be more specific, in DataStreamService::PublishFilter(), I pass a "const PublishFilterParamsPB&" to QueryState::PublishFilter() and have made the corresponding changes to the following functions: 1. FragmentInstanceState::PublishFilter(), 2. RuntimeFilterBank::PublishGlobalFilter(). The reason I used "const PublishFilterParamsPB*" is because in the automatically generated data_stream_service.service.cc, DataStreamServiceIf::DataStreamServiceIf() passes a "const PublishFilterParamsPB*" to its member function PublishFilter(), which is implemented by DataStreamService::PublishFilter(). QueryState::PublishFilter() is called in DataStreamService::PublishFilter() and therefore if the argument is of the type "const PublishFilterParamsPB*", we do not have to dereference in DataStreamService::PublishFilter() when calling QueryState::PublishFilter(). http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@174 PS24, Line 174: UpdateFilterParamsPB* params = obj_pool_.Add(new UpdateFilterParamsPB); > Or you can just allocate it on the stack as it isn't too large: Done http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@175 PS24, Line 175: UpdateFilterResultPB* res = obj_pool_.Add(new UpdateFilterResultPB); : RpcController* controller = obj_pool_.Add(new RpcController); > Don't think it's safe to free the memory in the completion callback as the Thanks! I will keep them as is. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@196 PS24, Line 196: Couldn't send filter to coordinator: " > Substitute("Failed to get proxy to coordinator $0: $1", host_address, get_p Done http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@240 PS24, Line 240: LOG(WARNING) << "Cannot get inbound sidecar: > LOG(ERROR) << "Failed to get bloom filter sidecar" << ... Done http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@320 PS24, Line 320: // Wait for all inflight rpcs to complete before closing the filters. : { : std::unique_lock<SpinLock> l1(num_inflight_rpcs_lock_); : while (num_inflight_rpcs_ > 0) { : krpcs_done_cv_.wait(l1); : } : } : : lock_guard<mutex> l2(runtime_filter_lock_); : closed_ = true; > Actually, it's possible for RuntimeFilterBank::UpdateFilterFromLocal() and Thanks for the suggestion. I have added a comment accordingly. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc File be/src/service/data-stream-service.cc: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc@119 PS24, Line 119: req > Per comments elsewhere, we usually pass const value by reference *req. Thanks for pointing this out! I have changed the signature of ImpalaServer::UpdateFilter() accordingly. Due to the change above, I have also made the corresponding changes to the following functions: 1. ClientRequestState::UpdateFilter(), 2. Coordinator::UpdateFilter(), and 3. Coordinator::FilterState::ApplyUpdate(). http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/impala-server.h File be/src/service/impala-server.h: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/impala-server.h@348 PS24, Line 348: const UpdateFilterParamsPB* > Use const UpdateFilterParamsPB& params. See comments in data-stream-service Done http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/util/bloom-filter.cc File be/src/util/bloom-filter.cc: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/util/bloom-filter.cc@218 PS24, Line 218: const string& directory_in > Please see comments in coordinator.cc. Please consider passing in the slide Thanks! I have changed the signature of BloomFilter::Or() to the following. void BloomFilter::Or(const BloomFilterPB& in, const uint8_t* directory_in, BloomFilterPB* out, uint8_t* directory_out, size_t directory_size); http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/util/bloom-filter.cc@219 PS24, Line 219: string* > This may be char* instead. Done. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 24 Gerrit-Owner: Fang-Yu Rao <[email protected]> Gerrit-Reviewer: Fang-Yu Rao <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Michael Ho <[email protected]> Gerrit-Reviewer: Thomas Tauber-Marshall <[email protected]> Gerrit-Comment-Date: Mon, 14 Oct 2019 15:17:05 +0000 Gerrit-HasComments: Yes
