Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )
Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC ...................................................................... Patch Set 24: (6 comments) Hi Michael and Thomas, I have addressed some of the comments in the previous iteration. There are still two unresolved comments in runtime-filter-bank.cc. Please let me know your thoughts on my replies. Thanks! http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1134 PS24, Line 1134: std::string( : reinterpret_cast<const char*>(sidecar_slice.data()), sidecar_slice.size()); > Why not std::move(sidecar_slice.ToString()) ? Thanks for pointing this out! I have replaced the original statement with 'std::move(sidecar_slice.ToString())' to make it more elegant. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@174 PS24, Line 174: UpdateFilterParamsPB* params = obj_pool_.Add(new UpdateFilterParamsPB); > Can you please double check if the parameters need to be preserved beyond t Thank you for pointing this out! You are correct. 'params' can be freed once the asynchronous RPC call is done. I have replaced the original statement with 'std::unique_ptr<UpdateFilterParamsPB> params = std::make_unique<UpdateFilterParamsPB>();' and revised the following code accordingly. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@175 PS24, Line 175: UpdateFilterResultPB* res = obj_pool_.Add(new UpdateFilterResultPB); : RpcController* controller = obj_pool_.Add(new RpcController); > I wonder if we can keep these in thread local storage and initialize them o Thank you very much for the suggestion! After briefly taking a look at the related sequence of calls, I found that starting from QueryState::ExecFInstance() (which will call FragmentInstanceState::Exec()), we will reach PhjBuilder::FlushFinal(). In PhjBuilder::FlushFinal(), we have a function call to PhjBuilder::PublishRuntimeFilters(), which will make a call to RuntimeFilterBank::UpdateFilterFromLocal() for each FilterContext associated with this PhjBuilder (refer to https://github.com/apache/impala/blob/master/be/src/exec/partitioned-hash-join-builder.cc#L488-L507). Considering that RuntimeFilterBank::UpdateFilterFromLocal() is an asynchronous RPC now, it seems we are not able to reuse 'UpdateFilterResultPB' and 'UpdateFilterResultPB'. Another possible solution is to create an instance of UpdateFilterResultPB and UpdateFilterResultPB here in UpdateFilterFromLocal() and then release the memory they occupy in RuntimeFilterBank::UpdateFilterCompleteCb(). I could probably miss something. Please let me know if I misunderstand anything. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@320 PS24, Line 320: // Wait for all inflight rpcs to complete before closing the filters. : { : std::unique_lock<SpinLock> l1(num_inflight_rpcs_lock_); : while (num_inflight_rpcs_ > 0) { : krpcs_done_cv_.wait(l1); : } : } : : lock_guard<mutex> l2(runtime_filter_lock_); : closed_ = true; > Do you need to set closed_ to true before waiting for all in-flight RPCs to Thanks very much for the comment. After reading the related code paths, I think the case where a thread may sneak in and try to issue an RPC again after we break out of the critical section could not happen. According to my current understanding, in the propagation stage of the runtime filter protocol, starting from FragmentInstanceState::Exec(), the thread executing this function will first call FragmentInstanceState::Open(), which will (1) call RuntimeFilterBank::AllocateScratchBloomFilter()/AllocateScratchMinMaxFilter() to allocate the memory space for the corresponding RuntimeFilter, and (2) call RuntimeFilterBank::UpdateFilterFromLocal() on the RuntimeFilter just allocated. After FragmentInstanceState::Open() returns, the same thread above will call FragmentInstanceState::Close(), which in turn will lead us to RuntimeFilterBank::Close(). When RuntimeFilterBank::Close() is called, the calling thread has already made all the RPC's it wants to perform (There could still be inflight RPC's but there won't be any new RPC's issued). No thread will attempt to make any RPC resulting from RuntimeFilterBank::UpdateFilterFromLocal() and then increment 'num_inflight_rpcs_' associated with this instance of RuntimeFilterBank. Hence the case where a thread sneaks in and try to issue RPC after leaving the critical section cannot happen. We have also added a DCHECK for '!closed_' at the beginning of RuntimeFilterBank::UpdateFilterFromLocal() to empirically show that the RuntimeFilterBank::Close() (where 'closed_' will be set to true) will not be called until the calling thread has issued the all the RPC's it needs to issue (https://gerrit.cloudera.org/c/13882/24/be/src/runtime/runtime-filter-bank.cc#139). Please let me know if the argument above makes sense since I could possibly have missed something. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc File be/src/service/data-stream-service.cc: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc@137 PS24, Line 137: Substitute("Query State not found for query_id=$0", : PrintId(ProtoToQueryId(req->dst_query_id())) > This can be refactored into a single string object and reuse them here and Done http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc@141 PS24, Line 141: this-> > no need for this Thanks for pointing this out! I have removed 'this->' in DataStreamService::PublishFilter() as well as in DataStreamService::UpdateFilter(). -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 24 Gerrit-Owner: Fang-Yu Rao <[email protected]> Gerrit-Reviewer: Fang-Yu Rao <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Michael Ho <[email protected]> Gerrit-Reviewer: Thomas Tauber-Marshall <[email protected]> Gerrit-Comment-Date: Fri, 27 Sep 2019 22:05:21 +0000 Gerrit-HasComments: Yes
