Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )
Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC ...................................................................... Patch Set 17: (15 comments) http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.h File be/src/runtime/coordinator.h: http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.h@48 PS17, Line 48: using kudu::rpc::RpcContext; don't use 'using' in header files http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@986 PS17, Line 986: { : return; : } You can leave the formatting as-is http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@1041 PS17, Line 1041: void Coordinator::SetupSidecarForBloomFilter(PublishFilterParamsPB* rpc_params, Its unfortunate that this logic is duplicated between here and BloomFilter::ToProtobuf, eg. because duplicating code makes it more likely there are bugs (for example I think that you have a bug here in the case where AddOutboundSidecar fails, since you don't disable the bloom filter, even though that bug is already fixed in BloomFilter::ToProtobuf) There are a couple of possible solutions to this: 1. Move this to be a static function in BloomFilter, eg. BloomFilter::AddSidecar(), and call it from BloomFilter::ToProtobuf() 2. Instead of having Coordinator::FilterState store a BloomFilterPB and 'string directory' just actually reconstruct a BloomFilter object, i.e. around line 1110 below, by calling BloomFilter::Init() with sidecar_slice.data(). Then we would be able to just call the existing BloomFilter::ToProtobuf() 3. some other idea I didn't think of Option 2 seems cleaner overall to me, since it allows us to better hide the details of how bloom filters work in the BloomFilter class instead of leaking things like the existence of the 'directory' string into FilterState. However, its slightly less efficient (cause we add a transition from BloomFilterPB -> Bloom Filter object and back, with the required setup, eg. in BloomFilter::Init()), though not a ton less efficient since I think it can be done without adding any copies of the directory, and it probably also is more work for you in terms of code changes (eg. BloomFilter::Or would have to be slightly rewritten to take BloomFilter objects instead). And, option 1 is consistent with the way things already work. So, I'll leave it up to you to decide. http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@1089 PS17, Line 1089: !params->bloom_filter().always_true() && : !params->bloom_filter().always_false()) Might be cleaner to check has_directory_sidecar_idx() here, which should be equivalent http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@1096 PS17, Line 1096: return; You need to handle this error, i.e. call Disable() http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h File be/src/runtime/runtime-filter-bank.h: http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h@52 PS17, Line 52: class UniqueIdPB; I don't think this is used anywhere http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h@142 PS17, Line 142: void SendFilterToCoordinatorCompleteCb(const kudu::rpc::RpcController* rpc_controller); I think this can be private. Lets also rename it UpdateFilterCompleteCb, to keep it consistent with the name of the rpc. http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h@154 PS17, Line 154: num_krpcs_ num_inflight_rpcs_ http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@133 PS17, Line 133: DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF) While thinking about the locking protocol in Close(), I noticed that we don't check 'closed_' here like we do at the start of PublishGlobalFilter() I spent a little time looking at the code, and I think that its correct as is, because RuntimeFilterBank::Close() only gets called from RuntimeState::ReleaseResources(), which is only called from FragmentInstanceState::Close(), which can't be called until after Open() is run on exec_tree_ and UpdateFilterFromLocal() is only called during Open() (via PartitionedHashJoinNode::Open() -> BlockingJoinNode::ProcessBuildInputAndOpenProbe(), which calls on a thread and then blocks until completion for BlockingJoinNode::ProcessBuildInputAsync() -> BlockingJoinNode::SendBuildInputToSink() -> PhjBuilder::FlushFinal()) It might be nice if you could confirm my understanding is correct, and then add a DCHECK(!closed_); and a brief comment (eg. "This is only called from ExecNode::Open()). I won't hold up +2ing this patch for it, though, since its not a regression http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@187 PS17, Line 187: { : std::unique_lock<SpinLock> l(num_krpcs_counter_lock_); : num_krpcs_++; : } This needs to be moved down after the check for get_proxy_status.ok() Otherwise, we could increment this, fail to get the proxy, never send the rpc so the callback is never called, never decrement it, and then Close() would hang forever http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@238 PS17, Line 238: return; I don't think that we can just return here, we need to actually handle the error, i.e. do what we've done elsewhere in this function when an error occurs and disable the bloom filter by setting it to BloomFilter::ALWAYS_TRUE_FILTER http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@243 PS17, Line 243: sidecar_slice.ToString() This is doing an unnecessary copy, to Slice::ToString() copies all of the data but then BloomFilter::Init() also copies all of the data. We can just have BloomFilter::Init() take a 'const char*' and call sidecar_slice.data() here http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@314 PS17, Line 314: // Use this lock to prevent the BloomFilter's in this Bloom filter bank from : // being closed before their respective asynchronous KRPC calls in : // RuntimeFilterBank::SendFilterToCoordinator() finishes. This comment is a little weird, maybe rephrase it more like "Wait for all inflight rpcs to complete before closing the filters" http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@317 PS17, Line 317: std::unique_lock<SpinLock> l1(num_krpcs_counter_lock_); : while (num_krpcs_ > 0) { : krpcs_done_cv_.wait_for(l1, std::chrono::milliseconds(50)); : } I don't think it technically matters here, but always better not to hold two locks at the same time if its not necessary (to preclude the possibility of deadlock), and in general to hold locks for as short as possible, so I think that we can put this in its own {} block http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@324 PS17, Line 324: unnecessary extra whitespace -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 17 Gerrit-Owner: Fang-Yu Rao <[email protected]> Gerrit-Reviewer: Fang-Yu Rao <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Michael Ho <[email protected]> Gerrit-Reviewer: Thomas Tauber-Marshall <[email protected]> Gerrit-Comment-Date: Wed, 04 Sep 2019 20:40:41 +0000 Gerrit-HasComments: Yes
