Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )
Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC ...................................................................... Patch Set 25: (25 comments) Hi Michael and Thomas, I have addressed most of Michael's comments in the previous iteration but have not uploaded the revised version since there are still four comments that I do not know exactly how to resolve. Please let me know how I should proceed. Thanks! http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator-backend-state.cc File be/src/runtime/coordinator-backend-state.cc: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator-backend-state.cc@556 PS25, Line 556: LOG(ERROR) << "PublishFilter() rpc failed: " << rpc_status.ToString(); > return here as there is no point in continuing to print the result status i Done http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1010 PS25, Line 1010: // >>>>>>> IMPALA-7984: Port runtime filter from Thrift RPC to KRPC > To be removed ?! Done http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1022 PS25, Line 1022: std:: > nit: no need for std:: Thanks for pointing this out. I have checked whether or not it is possible to move the declarations of 'rpc_params', 'target_fragment_idxs', and 'bloom_filter_directory' closer to their use but since these three variables are used both inside and outside the critical section below, it seems that we are not able to do that. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1072 PS25, Line 1072: BloomFilterPB& aggregated_filter = state->bloom_filter(); : aggregated_filter.Swap(rpc_params.mutable_bloom_filter()); > This swapping tricks seems unnecessary now that the bloom_filter_directory Thanks for the suggestion! I have tried the following. 1. BloomFilterPB aggregated_filter; aggregated_filter.CopyFrom(state->bloom_filter()); rpc_params.set_allocated_bloom_filter(&aggregated_filter); This approach would result in the following error at runtime according to AddressSanitizer. ==26704==ERROR: AddressSanitizer: stack-use-after-scope on address 0x7fde29b39530 at pc 0x0000023b5cef bp 0x7fde29b392f0 sp 0x7fde29b392e8 2. rpc_params.set_allocated_bloom_filter(std::move(&(state->bloom_filter()))); 3. rpc_params.set_allocated_bloom_filter(&(state->bloom_filter())); Both the second and the third approaches would result in the following error reported by AddressSanitizer. ==19218==ERROR: AddressSanitizer: attempting free on address which was not malloc()-ed: 0x6130001a0ca0 in thread T126 I think this is because the contents we put in 'rpc_params' has to live beyond this function, i.e., Coordinator::UpdateFilter(). To address this issue, we may need to allocate a piece of memory in the heap and perform the copy operation by ourselves. But since 'data_stream_service.pb.cc' (generated by the Protobuf compiler) has already provided such a functionality, i.e., 'Swap()', I was wondering if it would be easier to just use this provided function. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1074 PS25, Line 1074: bloom_filter_directory > Is there a reason to assign state->bloom_filter_directory() to this local v Thanks for the suggestion. According to the current implementation, the variable 'state' is defined according to the variable 'it', both of which are defined inside the critical section. To address your suggestion, we may need to move the following two statements out of the critical section. 1. auto it = filter_routing_table_->id_to_filter.find(params.filter_id()), and 2. FilterState* state = &it->second. However, it seems this will also move the use of the variable 'it' away from its definition. Please let me know how you would like to proceed. Thanks! http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1098 PS25, Line 1098: reinterpret_cast<const char*>(&(bloom_filter_directory[0]) > state->bloom_filter_directory().data() Since 'state' is defined inside of the critical section as mentioned above, we might not be able to replace this argument with 'state->bloom_filter_directory().data()'. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1153 PS25, Line 1153: bloom_filter_ = BloomFilterPB(params.bloom_filter()); > bloom_filter_ = params.bloom_filter(); Done http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1155 PS25, Line 1155: std::string( : reinterpret_cast<const char*>(sidecar_slice.data()), sidecar_slice.size()) > Can keep the original suggestion of sidecar_slice.ToString(). C++ ROV shoul Thanks for pointing this out. I have changed this accordingly. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@190 PS25, Line 190: std:: > nit: no need for std:: Done http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@204 PS25, Line 204: std:: > nit: no need for std:: Done http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@205 PS25, Line 205: ++num_inflight_rpcs_; > DCHECK_GE(num_inflight_rpcs_, 0); before increment. Done http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@323 PS25, Line 323: std:: > nit: no need for std:: Done http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/timestamp-value.h File be/src/runtime/timestamp-value.h: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/timestamp-value.h@170 PS25, Line 170: // Store the binary representation of this TimestampValue in 'tvalue'. : void ToTColumnValue(TColumnValue* tvalue) const { : const uint8_t* data = reinterpret_cast<const uint8_t*>(this); : tvalue->timestamp_val.assign(data, data + Size()); : tvalue->__isset.timestamp_val = true; : } > Is this not used anymore ? Thanks for pointing this out! Indeed, it is not used anymore. I have removed it. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/timestamp-value.h@183 PS25, Line 183: // Returns a new TimestampValue created from the value in 'tvalue'. : static TimestampValue FromTColumnValue(const TColumnValue& tvalue) { : TimestampValue value; : memcpy(&value, tvalue.timestamp_val.c_str(), Size()); : value.Validate(); : return value; : } > Is this not used anymore ? Thanks! I have removed it. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h File be/src/service/impala-server.h: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h@35 PS25, Line 35: #include "gen-cpp/ImpalaInternalService.h" > This can be removed. Thanks for this suggestion. After trying to remove this header file, I got the following 2 error messages. 1. In file included from be/src/rpc/thrift-server-test.cc:24: be/src/rpc/thrift-client.h:134:18: error: allocation of incomplete type 'impala::ImpalaInternalServiceClient' iface_(new InterfaceType(protocol_)), ^~~~~~~~~~~~~ be/src/rpc/thrift-server-test.cc:526:30: note: in instantiation of member function 'impala::ThriftClient<impala::ImpalaInternalServiceClient>::ThriftClient' requested here Client* client = new Client("127.0.0.1", port, "", nullptr, false); ^ be/src/runtime/client-cache-types.h:32:7: note: forward declaration of 'impala::ImpalaInternalServiceClient' class ImpalaInternalServiceClient; ^ 2. In file included from be/src/rpc/thrift-server-test.cc:24: be/src/rpc/thrift-client.h:162:20: error: allocation of incomplete type 'impala::ImpalaInternalServiceClient' iface_.reset(new InterfaceType(protocol_)); ^~~~~~~~~~~~~ be/src/runtime/client-cache-types.h:32:7: note: forward declaration of 'impala::ImpalaInternalServiceClient' class ImpalaInternalServiceClient; ^ According to the errors, it seems that in 'thrift-server-test.cc', 'ImpalaInternalServiceClient' is used in the following statement. using Client = ThriftClient<ImpalaInternalServiceClient>; (https://github.com/apache/impala/blob/master/be/src/rpc/thrift-server-test.cc#L525) Recall that 'ImpalaInternalServiceClient' is defined in 'ImpalaInternalService.h' so that I guess it is the reason why we have these errors. If this is the case, then I think we need to at least remove 'thrift-server-test.cc' as well. Therefore my question here is whether or not we could remove this BE test here, or we do it in a follow-up JIRA. Thanks! http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h@70 PS25, Line 70: class TReportExecStatusArgs; : class TReportExecStatusResult; > Not your change but these can be removed. Thanks for pointing this out! I also found that three classes above are not used as well. I guess we could also remove them, right? 1. class TCatalogUpdate; 2. class TPlanExecRequest; 3. class TPlanExecParams; http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter-test.cc File be/src/util/bloom-filter-test.cc: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter-test.cc@75 PS25, Line 75: std::shared_ptr<RpcController > Why shared_ptr ? Thanks for pointing this out! It does not have to be a shared_ptr. I have changed 'controller_x', 'controller_y' to objects of RpcController's. The types of 'controller_x2', and 'controller_y2' in the following are also changed accordingly. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter-test.cc@352 PS25, Line 352: std::shared_ptr > Why shared_ptr ? Thanks for pointing this out. I have changed 'controller' to an object of RpcController instead like above. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc File be/src/util/bloom-filter.cc: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@125 PS25, Line 125: DCHECK_EQ(protobuf->always_false(), false); > DCHECK(!protobuf->always_false()); Done http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@236 PS25, Line 236: &(directory_in[0]) > directory_in Done http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@237 PS25, Line 237: &(directory_out[0]) > directory_out Done http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@239 PS25, Line 239: &(directory_in[0]) > directory_in Done http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@241 PS25, Line 241: &(directory_in[0] > directory_in Done http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@242 PS25, Line 242: &(directory_out[0]) > directory_out Done http://gerrit.cloudera.org:8080/#/c/13882/25/common/protobuf/common.proto File common/protobuf/common.proto: http://gerrit.cloudera.org:8080/#/c/13882/25/common/protobuf/common.proto@43 PS25, Line 43: // This is a union over all possible return types. > If we upgrade to proto3 above, we can use the oneof feature in protobuf3 if Thanks very much for the suggestion. I have added a TODO here accordingly. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 25 Gerrit-Owner: Fang-Yu Rao <[email protected]> Gerrit-Reviewer: Fang-Yu Rao <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Michael Ho <[email protected]> Gerrit-Reviewer: Thomas Tauber-Marshall <[email protected]> Gerrit-Comment-Date: Mon, 28 Oct 2019 15:39:30 +0000 Gerrit-HasComments: Yes
