Michael Ho has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 2:

(11 comments)

http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/rpc/rpc.h
File be/src/rpc/rpc.h:

PS2, Line 126: 'cb'
Can you please state the context in which 'cb' is called from (e.g. reactor 
thread) and also add the precaution caller should take when implementing 'cb' 
(e.g. no blocking etc) ?


PS2, Line 133: aattempted
typo


PS2, Line 230: //
///

Same below.


PS2, Line 319: Retries
Is there any way to write a be-test to exercise the retry path ?


PS2, Line 322:     auto cb_wrapper = [params = std::move(params), mgr, func, 
req, resp,
             :         cb = std::move(cb), controller_ptr = 
controller.release(), num_attempts]()
             :         mutable {
An alternative to this lambda implementation would be to define a separate 
function and uses boost::bind() to stash the arguments, right ?


PS2, Line 332: cb(Status::OK(), req, resp, controller_ptr);
Why do we pass Status::OK() for non-retryable error or after exceeding the 
maximum number of retries ?


Line 337:       kudu::MonoDelta retry_interval = 
kudu::MonoDelta::FromMilliseconds(params->retry_interval_ms);
long line


http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS1, Line 265: 10
Mind commenting above what 10 stands for ?


PS1, Line 266: numeric_limits<int32_t>::max()
Why is this not FLAGS_datastream_sender_timeout_ms ?


http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS2, Line 203:   auto cb = [self_ptr = 
weak_ptr<DataStreamSender::Channel>(self_),
             :       instance_id = fragment_instance_id_, proto_batch = batch]
             :       (const Status& status, TransmitDataRequestPb* request,
             :       TransmitDataResponsePb* response, RpcController* 
controller) {
             : 
             :     // Ensure that request and response get deleted when this 
callback returns.
             :     auto request_container = 
unique_ptr<TransmitDataRequestPb>(request);
             :     auto response_container = 
unique_ptr<TransmitDataResponsePb>(response);
             : 
             :     // Check if this channel still exists.
             :     auto channel = self_ptr.lock();
             :     if (!channel) return;
             :     {
             :       lock_guard<SpinLock> l(channel->lock_);
             :       Status rpc_status = status.ok() ? 
FromKuduStatus(controller->status()) : status;
             : 
             :       int32_t status_code = response->status().status_code();
             :       channel->recvr_gone_ = status_code == 
TErrorCode::DATASTREAM_RECVR_ALREADY_GONE;
             : 
             :       if (!rpc_status.ok()) {
             :         channel->last_rpc_status_ = rpc_status;
             :       } else if (!channel->recvr_gone_) {
             :         if (status_code != TErrorCode::OK) {
             :           // Don't bubble up the 'receiver gone' status, because 
it's not an error.
             :           channel->last_rpc_status_ = Status(response->status());
             :         } else {
             :           int size = proto_batch->GetSize();
             :           channel->num_data_bytes_sent_.Add(size);
             :           VLOG_ROW << "incremented #data_bytes_sent="
             :                    << channel->num_data_bytes_sent_.Load();
             :         }
             :       }
             :       channel->rpc_in_flight_ = false;
             :     }
             :     channel->rpc_done_cv_.notify_one();
             :   };
I am no C++ expert so this question may be stupid: can we not write this as a 
lambda function ? I am not sure how well gdb can handle lambda functions when 
compiled with optimization and this callback seems important enough that one 
may want to inspect its states in a core dump if necessary.


http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/service/impala-internal-service.cc
File be/src/service/impala-internal-service.cc:

PS1, Line 63: DataStreamService
nit: Just wondering why we didn't put DataStreamService in 
data-stream-service.cc ?


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 2
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Michael Ho <k...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sail...@cloudera.com>
Gerrit-HasComments: Yes

Reply via email to