Henry Robinson has posted comments on this change. Change subject: IMPALA-4856: Port data stream service to KRPC ......................................................................
Patch Set 2: (8 comments) http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/rpc/rpc.h File be/src/rpc/rpc.h: PS2, Line 126: 'cb' > Can you please state the context in which 'cb' is called from (e.g. reactor Done PS2, Line 133: aattempted > typo Done PS2, Line 230: // > /// Done PS2, Line 319: Retries > Is there any way to write a be-test to exercise the retry path ? Yep - see rpc-mgr-test.cc, RetryAsyncTest. That injects ERROR_SERVER_TOO_BUSY into an RPC response which triggers the retry logic. PS2, Line 322: auto cb_wrapper = [params = std::move(params), mgr, func, req, resp, : cb = std::move(cb), controller_ptr = controller.release(), num_attempts]() : mutable { > An alternative to this lambda implementation would be to define a separate It would, yeah. My preference is for using a lambda here partly because it's very clear about how the arguments are copied, and how ownership is managed (I find the copying behaviour of bind() a bit inscrutable). PS2, Line 332: cb(Status::OK(), req, resp, controller_ptr); > Re-reading the comments above, status seems to indicate whether status was Right - I was in two minds about using different statuses, or merging together the one from the RpcController and the one that we must provide somehow. I think this is the simplest way to pass both statuses, but let me know if you have an idea! I added a comment for now. Line 337: kudu::MonoDelta retry_interval = kudu::MonoDelta::FromMilliseconds(params->retry_interval_ms); > long line Done http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/runtime/data-stream-sender.cc File be/src/runtime/data-stream-sender.cc: PS2, Line 203: auto cb = [self_ptr = weak_ptr<DataStreamSender::Channel>(self_), : instance_id = fragment_instance_id_, proto_batch = batch] : (const Status& status, TransmitDataRequestPb* request, : TransmitDataResponsePb* response, RpcController* controller) { : : // Ensure that request and response get deleted when this callback returns. : auto request_container = unique_ptr<TransmitDataRequestPb>(request); : auto response_container = unique_ptr<TransmitDataResponsePb>(response); : : // Check if this channel still exists. : auto channel = self_ptr.lock(); : if (!channel) return; : { : lock_guard<SpinLock> l(channel->lock_); : Status rpc_status = status.ok() ? FromKuduStatus(controller->status()) : status; : : int32_t status_code = response->status().status_code(); : channel->recvr_gone_ = status_code == TErrorCode::DATASTREAM_RECVR_ALREADY_GONE; : : if (!rpc_status.ok()) { : channel->last_rpc_status_ = rpc_status; : } else if (!channel->recvr_gone_) { : if (status_code != TErrorCode::OK) { : // Don't bubble up the 'receiver gone' status, because it's not an error. : channel->last_rpc_status_ = Status(response->status()); : } else { : int size = proto_batch->GetSize(); : channel->num_data_bytes_sent_.Add(size); : VLOG_ROW << "incremented #data_bytes_sent=" : << channel->num_data_bytes_sent_.Load(); : } : } : channel->rpc_in_flight_ = false; : } : channel->rpc_done_cv_.notify_one(); : }; > I am no C++ expert so this question may be stupid: can we not write this as We could write this as a method, and use bind(), or we could create a struct with one method (this one) that captures the context upon construction. The latter is what a lambda compiles down to, and I prefer the syntax sugar a lambda gives you. The former uses bind(), which I am not a great fan of. In my experience, gdb handles this just fine, and the stack is IMHO cleaner than using bind() (I've broken in this method lots of times!). -- To view, visit http://gerrit.cloudera.org:8080/7103 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b Gerrit-PatchSet: 2 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Henry Robinson <[email protected]> Gerrit-Reviewer: Henry Robinson <[email protected]> Gerrit-Reviewer: Michael Ho <[email protected]> Gerrit-Reviewer: Sailesh Mukil <[email protected]> Gerrit-HasComments: Yes
