Fang-Yu Rao has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 25:

(25 comments)

Hi Michael and Thomas, I have addressed most of Michael's comments in the 
previous iteration but have not uploaded the revised version since there are 
still four comments that I do not know exactly how to resolve. Please let me 
know how I should proceed. Thanks!

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator-backend-state.cc
File be/src/runtime/coordinator-backend-state.cc:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator-backend-state.cc@556
PS25, Line 556:     LOG(ERROR) << "PublishFilter() rpc failed: " << 
rpc_status.ToString();
> return here as there is no point in continuing to print the result status i
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1010
PS25, Line 1010: // >>>>>>> IMPALA-7984: Port runtime filter from Thrift RPC to 
KRPC
> To be removed ?!
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1022
PS25, Line 1022: std::
> nit: no need for std::
Thanks for pointing this out.

I have checked whether or not it is possible to move the declarations of 
'rpc_params', 'target_fragment_idxs', and 'bloom_filter_directory' closer to 
their use but since these three variables are used both inside and outside the 
critical section below, it seems that we are not able to do that.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1072
PS25, Line 1072:       BloomFilterPB& aggregated_filter = state->bloom_filter();
               :       
aggregated_filter.Swap(rpc_params.mutable_bloom_filter());
> This swapping tricks seems unnecessary now that the bloom_filter_directory
Thanks for the suggestion!

I have tried the following.

1.
BloomFilterPB aggregated_filter;
aggregated_filter.CopyFrom(state->bloom_filter());
rpc_params.set_allocated_bloom_filter(&aggregated_filter);

This approach would result in the following error at runtime according to 
AddressSanitizer.

==26704==ERROR: AddressSanitizer: stack-use-after-scope on address 
0x7fde29b39530 at pc 0x0000023b5cef bp 0x7fde29b392f0 sp 0x7fde29b392e8

2.
rpc_params.set_allocated_bloom_filter(std::move(&(state->bloom_filter())));

3.
rpc_params.set_allocated_bloom_filter(&(state->bloom_filter()));

Both the second and the third approaches would result in the following error 
reported by AddressSanitizer.

==19218==ERROR: AddressSanitizer: attempting free on address which was not 
malloc()-ed: 0x6130001a0ca0 in thread T126

I think this is because the contents we put in 'rpc_params' has to live beyond 
this function, i.e., Coordinator::UpdateFilter().

To address this issue, we may need to allocate a piece of memory in the heap 
and perform the copy operation by ourselves. But since 
'data_stream_service.pb.cc' (generated by the Protobuf compiler) has already 
provided such a functionality, i.e., 'Swap()', I was wondering if it would be 
easier to just use this provided function.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1074
PS25, Line 1074: bloom_filter_directory
> Is there a reason to assign state->bloom_filter_directory() to this local v
Thanks for the suggestion.

According to the current implementation, the variable 'state' is defined 
according to the variable 'it', both of which are defined inside the critical 
section. To address your suggestion, we may need to move the following two 
statements out of the critical section.

1. auto it = filter_routing_table_->id_to_filter.find(params.filter_id()), and
2. FilterState* state = &it->second.

However, it seems this will also move the use of the variable 'it' away from 
its definition.

Please let me know how you would like to proceed. Thanks!


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1098
PS25, Line 1098:  reinterpret_cast<const char*>(&(bloom_filter_directory[0])
> state->bloom_filter_directory().data()
Since 'state' is defined inside of the critical section as mentioned above, we 
might not be able to replace this argument with 
'state->bloom_filter_directory().data()'.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1153
PS25, Line 1153: bloom_filter_ = BloomFilterPB(params.bloom_filter());
> bloom_filter_ = params.bloom_filter();
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1155
PS25, Line 1155: std::string(
               :               reinterpret_cast<const 
char*>(sidecar_slice.data()), sidecar_slice.size())
> Can keep the original suggestion of sidecar_slice.ToString(). C++ ROV shoul
Thanks for pointing this out. I have changed this accordingly.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@190
PS25, Line 190: std::
> nit: no need for std::
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@204
PS25, Line 204: std::
> nit: no need for std::
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@205
PS25, Line 205:       ++num_inflight_rpcs_;
> DCHECK_GE(num_inflight_rpcs_, 0); before increment.
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@323
PS25, Line 323: std::
> nit: no need for std::
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/timestamp-value.h
File be/src/runtime/timestamp-value.h:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/timestamp-value.h@170
PS25, Line 170:   // Store the binary representation of this TimestampValue in 
'tvalue'.
              :   void ToTColumnValue(TColumnValue* tvalue) const {
              :     const uint8_t* data = reinterpret_cast<const 
uint8_t*>(this);
              :     tvalue->timestamp_val.assign(data, data + Size());
              :     tvalue->__isset.timestamp_val = true;
              :   }
> Is this not used anymore ?
Thanks for pointing this out! Indeed, it is not used anymore. I have removed it.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/timestamp-value.h@183
PS25, Line 183:   // Returns a new TimestampValue created from the value in 
'tvalue'.
              :   static TimestampValue FromTColumnValue(const TColumnValue& 
tvalue) {
              :     TimestampValue value;
              :     memcpy(&value, tvalue.timestamp_val.c_str(), Size());
              :     value.Validate();
              :     return value;
              :   }
> Is this not used anymore ?
Thanks! I have removed it.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h
File be/src/service/impala-server.h:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h@35
PS25, Line 35: #include "gen-cpp/ImpalaInternalService.h"
> This can be removed.
Thanks for this suggestion.

After trying to remove this header file, I got the following 2 error messages.

1.
In file included from be/src/rpc/thrift-server-test.cc:24:
be/src/rpc/thrift-client.h:134:18: error: allocation of incomplete type 
'impala::ImpalaInternalServiceClient'
      iface_(new InterfaceType(protocol_)),
                 ^~~~~~~~~~~~~
be/src/rpc/thrift-server-test.cc:526:30: note: in instantiation of member 
function 
'impala::ThriftClient<impala::ImpalaInternalServiceClient>::ThriftClient' 
requested here
        Client* client = new Client("127.0.0.1", port, "", nullptr, false);
                             ^
be/src/runtime/client-cache-types.h:32:7: note: forward declaration of 
'impala::ImpalaInternalServiceClient'
class ImpalaInternalServiceClient;
      ^

2.
In file included from be/src/rpc/thrift-server-test.cc:24:
be/src/rpc/thrift-client.h:162:20: error: allocation of incomplete type 
'impala::ImpalaInternalServiceClient'
  iface_.reset(new InterfaceType(protocol_));
                   ^~~~~~~~~~~~~
be/src/runtime/client-cache-types.h:32:7: note: forward declaration of 
'impala::ImpalaInternalServiceClient'
class ImpalaInternalServiceClient;
      ^
According to the errors, it seems that in 'thrift-server-test.cc', 
'ImpalaInternalServiceClient' is used in the following statement.

using Client = ThriftClient<ImpalaInternalServiceClient>; 
(https://github.com/apache/impala/blob/master/be/src/rpc/thrift-server-test.cc#L525)

Recall that 'ImpalaInternalServiceClient' is defined in 
'ImpalaInternalService.h' so that I guess it is the reason why we have these 
errors.

If this is the case, then I think we need to at least remove 
'thrift-server-test.cc' as well. Therefore my question here is whether or not 
we could remove this BE test here, or we do it in a follow-up JIRA. Thanks!


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h@70
PS25, Line 70: class TReportExecStatusArgs;
             : class TReportExecStatusResult;
> Not your change but these can be removed.
Thanks for pointing this out!

I also found that three classes above are not used as well. I guess we could 
also remove them, right?

1. class TCatalogUpdate;
2. class TPlanExecRequest;
3. class TPlanExecParams;


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter-test.cc
File be/src/util/bloom-filter-test.cc:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter-test.cc@75
PS25, Line 75: std::shared_ptr<RpcController
> Why shared_ptr ?
Thanks for pointing this out!

It does not have to be a shared_ptr. I have changed 'controller_x', 
'controller_y' to objects of RpcController's.

The types of 'controller_x2', and 'controller_y2' in the following are also 
changed accordingly.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter-test.cc@352
PS25, Line 352: std::shared_ptr
> Why shared_ptr ?
Thanks for pointing this out.

I have changed 'controller' to an object of RpcController instead like above.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc
File be/src/util/bloom-filter.cc:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@125
PS25, Line 125: DCHECK_EQ(protobuf->always_false(), false);
> DCHECK(!protobuf->always_false());
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@236
PS25, Line 236: &(directory_in[0])
> directory_in
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@237
PS25, Line 237: &(directory_out[0])
> directory_out
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@239
PS25, Line 239: &(directory_in[0])
> directory_in
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@241
PS25, Line 241: &(directory_in[0]
> directory_in
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@242
PS25, Line 242: &(directory_out[0])
> directory_out
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/common/protobuf/common.proto
File common/protobuf/common.proto:

http://gerrit.cloudera.org:8080/#/c/13882/25/common/protobuf/common.proto@43
PS25, Line 43: // This is a union over all possible return types.
> If we upgrade to proto3 above, we can use the oneof feature in protobuf3 if
Thanks very much for the suggestion. I have added a TODO here accordingly.



--
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 25
Gerrit-Owner: Fang-Yu Rao <[email protected]>
Gerrit-Reviewer: Fang-Yu Rao <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Michael Ho <[email protected]>
Gerrit-Reviewer: Thomas Tauber-Marshall <[email protected]>
Gerrit-Comment-Date: Mon, 28 Oct 2019 15:39:30 +0000
Gerrit-HasComments: Yes

Reply via email to