Henry Robinson has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 2:

(8 comments)

http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/rpc/rpc.h
File be/src/rpc/rpc.h:

PS2, Line 126: 'cb'
> Can you please state the context in which 'cb' is called from (e.g. reactor
Done


PS2, Line 133: aattempted
> typo
Done


PS2, Line 230: //
> ///
Done


PS2, Line 319: Retries
> Is there any way to write a be-test to exercise the retry path ?
Yep - see rpc-mgr-test.cc, RetryAsyncTest. That injects ERROR_SERVER_TOO_BUSY 
into an RPC response which triggers the retry logic.


PS2, Line 322:     auto cb_wrapper = [params = std::move(params), mgr, func, 
req, resp,
             :         cb = std::move(cb), controller_ptr = 
controller.release(), num_attempts]()
             :         mutable {
> An alternative to this lambda implementation would be to define a separate 
It would, yeah. My preference is for using a lambda here partly because it's 
very clear about how the arguments are copied, and how ownership is managed (I 
find the copying behaviour of bind() a bit inscrutable).


PS2, Line 332: cb(Status::OK(), req, resp, controller_ptr);
> Re-reading the comments above, status seems to indicate whether status was 
Right - I was in two minds about using different statuses, or merging together 
the one from the RpcController and the one that we must provide somehow. I 
think this is the simplest way to pass both statuses, but let me know if you 
have an idea! I added a comment for now.


Line 337:       kudu::MonoDelta retry_interval = 
kudu::MonoDelta::FromMilliseconds(params->retry_interval_ms);
> long line
Done


http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS2, Line 203:   auto cb = [self_ptr = 
weak_ptr<DataStreamSender::Channel>(self_),
             :       instance_id = fragment_instance_id_, proto_batch = batch]
             :       (const Status& status, TransmitDataRequestPb* request,
             :       TransmitDataResponsePb* response, RpcController* 
controller) {
             : 
             :     // Ensure that request and response get deleted when this 
callback returns.
             :     auto request_container = 
unique_ptr<TransmitDataRequestPb>(request);
             :     auto response_container = 
unique_ptr<TransmitDataResponsePb>(response);
             : 
             :     // Check if this channel still exists.
             :     auto channel = self_ptr.lock();
             :     if (!channel) return;
             :     {
             :       lock_guard<SpinLock> l(channel->lock_);
             :       Status rpc_status = status.ok() ? 
FromKuduStatus(controller->status()) : status;
             : 
             :       int32_t status_code = response->status().status_code();
             :       channel->recvr_gone_ = status_code == 
TErrorCode::DATASTREAM_RECVR_ALREADY_GONE;
             : 
             :       if (!rpc_status.ok()) {
             :         channel->last_rpc_status_ = rpc_status;
             :       } else if (!channel->recvr_gone_) {
             :         if (status_code != TErrorCode::OK) {
             :           // Don't bubble up the 'receiver gone' status, because 
it's not an error.
             :           channel->last_rpc_status_ = Status(response->status());
             :         } else {
             :           int size = proto_batch->GetSize();
             :           channel->num_data_bytes_sent_.Add(size);
             :           VLOG_ROW << "incremented #data_bytes_sent="
             :                    << channel->num_data_bytes_sent_.Load();
             :         }
             :       }
             :       channel->rpc_in_flight_ = false;
             :     }
             :     channel->rpc_done_cv_.notify_one();
             :   };
> I am no C++ expert so this question may be stupid: can we not write this as
We could write this as a method, and use bind(), or we could create a struct 
with one method (this one) that captures the context upon construction. The 
latter is what a lambda compiles down to, and I prefer the syntax sugar a 
lambda gives you. The former uses bind(), which I am not a great fan of. 

In my experience, gdb handles this just fine, and the stack is IMHO cleaner 
than using bind() (I've broken in this method lots of times!).


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 2
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <[email protected]>
Gerrit-Reviewer: Henry Robinson <[email protected]>
Gerrit-Reviewer: Michael Ho <[email protected]>
Gerrit-Reviewer: Sailesh Mukil <[email protected]>
Gerrit-HasComments: Yes

Reply via email to