Fang-Yu Rao has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 24:

(6 comments)

Hi Michael and Thomas, I have addressed some of the comments in the previous 
iteration. There are still two unresolved comments in runtime-filter-bank.cc. 
Please let me know your thoughts on my replies. Thanks!

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1134
PS24, Line 1134: std::string(
               :               reinterpret_cast<const 
char*>(sidecar_slice.data()), sidecar_slice.size());
> Why not std::move(sidecar_slice.ToString()) ?
Thanks for pointing this out! I have replaced the original statement with 
'std::move(sidecar_slice.ToString())' to make it more elegant.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@174
PS24, Line 174: UpdateFilterParamsPB* params = obj_pool_.Add(new 
UpdateFilterParamsPB);
> Can you please double check if the parameters need to be preserved beyond t
Thank you for pointing this out! You are correct. 'params' can be freed once 
the asynchronous RPC call is done. I have replaced the original statement with 
'std::unique_ptr<UpdateFilterParamsPB> params = 
std::make_unique<UpdateFilterParamsPB>();' and revised the following code 
accordingly.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@175
PS24, Line 175:     UpdateFilterResultPB* res = obj_pool_.Add(new 
UpdateFilterResultPB);
              :     RpcController* controller = obj_pool_.Add(new 
RpcController);
> I wonder if we can keep these in thread local storage and initialize them o
Thank you very much for the suggestion!

After briefly taking a look at the related sequence of calls, I found that
starting from QueryState::ExecFInstance() (which will call 
FragmentInstanceState::Exec()), we will reach PhjBuilder::FlushFinal(). In 
PhjBuilder::FlushFinal(), we have a function call to 
PhjBuilder::PublishRuntimeFilters(), which will make a call to 
RuntimeFilterBank::UpdateFilterFromLocal() for each FilterContext associated 
with this PhjBuilder (refer to 
https://github.com/apache/impala/blob/master/be/src/exec/partitioned-hash-join-builder.cc#L488-L507).

Considering that RuntimeFilterBank::UpdateFilterFromLocal() is an asynchronous 
RPC now, it seems we are not able to reuse 'UpdateFilterResultPB' and 
'UpdateFilterResultPB'.

Another possible solution is to create an instance of UpdateFilterResultPB and 
UpdateFilterResultPB here in UpdateFilterFromLocal() and then release the 
memory they occupy in RuntimeFilterBank::UpdateFilterCompleteCb().

I could probably miss something. Please let me know if I misunderstand anything.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@320
PS24, Line 320:   // Wait for all inflight rpcs to complete before closing the 
filters.
              :   {
              :     std::unique_lock<SpinLock> l1(num_inflight_rpcs_lock_);
              :     while (num_inflight_rpcs_ > 0) {
              :       krpcs_done_cv_.wait(l1);
              :     }
              :   }
              :
              :   lock_guard<mutex> l2(runtime_filter_lock_);
              :   closed_ = true;
> Do you need to set closed_ to true before waiting for all in-flight RPCs to
Thanks very much for the comment.

After reading the related code paths, I think the case where a thread may sneak 
in and try to issue an RPC again after we break out of the critical section 
could not happen.

According to my current understanding, in the propagation stage of the runtime 
filter protocol, starting from FragmentInstanceState::Exec(), the thread 
executing this function will first call FragmentInstanceState::Open(), which 
will (1) call 
RuntimeFilterBank::AllocateScratchBloomFilter()/AllocateScratchMinMaxFilter() 
to allocate the memory space for the corresponding RuntimeFilter, and (2) call 
RuntimeFilterBank::UpdateFilterFromLocal() on the RuntimeFilter just allocated. 
After FragmentInstanceState::Open() returns, the same thread above will call 
FragmentInstanceState::Close(), which in turn will lead us to 
RuntimeFilterBank::Close().

When RuntimeFilterBank::Close() is called, the calling thread has already made 
all the RPC's it wants to perform (There could still be inflight RPC's but 
there won't be any new RPC's issued). No thread will attempt to make any RPC 
resulting from RuntimeFilterBank::UpdateFilterFromLocal() and then increment 
'num_inflight_rpcs_' associated with this instance of RuntimeFilterBank. Hence 
the case where a thread sneaks in and try to issue RPC after leaving the 
critical section cannot happen.

We have also added a DCHECK for '!closed_' at the beginning of 
RuntimeFilterBank::UpdateFilterFromLocal() to empirically show that the 
RuntimeFilterBank::Close() (where 'closed_' will be set to true) will not be 
called until the calling thread has issued the all the RPC's it needs to issue 
(https://gerrit.cloudera.org/c/13882/24/be/src/runtime/runtime-filter-bank.cc#139).

Please let me know if the argument above makes sense since I could possibly 
have missed something.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc@137
PS24, Line 137:  Substitute("Query State not found for query_id=$0",
              :         PrintId(ProtoToQueryId(req->dst_query_id()))
> This can be refactored into a single string object and reuse them here and
Done


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc@141
PS24, Line 141: this->
> no need for this
Thanks for pointing this out! I have removed 'this->' in 
DataStreamService::PublishFilter() as well as in 
DataStreamService::UpdateFilter().



--
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 24
Gerrit-Owner: Fang-Yu Rao <[email protected]>
Gerrit-Reviewer: Fang-Yu Rao <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Michael Ho <[email protected]>
Gerrit-Reviewer: Thomas Tauber-Marshall <[email protected]>
Gerrit-Comment-Date: Fri, 27 Sep 2019 22:05:21 +0000
Gerrit-HasComments: Yes

Reply via email to