Michael Ho has posted comments on this change. Change subject: IMPALA-4856: Port data stream service to KRPC ......................................................................
Patch Set 2: (11 comments) http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/rpc/rpc.h File be/src/rpc/rpc.h: PS2, Line 126: 'cb' Can you please state the context in which 'cb' is called from (e.g. reactor thread) and also add the precaution caller should take when implementing 'cb' (e.g. no blocking etc) ? PS2, Line 133: aattempted typo PS2, Line 230: // /// Same below. PS2, Line 319: Retries Is there any way to write a be-test to exercise the retry path ? PS2, Line 322: auto cb_wrapper = [params = std::move(params), mgr, func, req, resp, : cb = std::move(cb), controller_ptr = controller.release(), num_attempts]() : mutable { An alternative to this lambda implementation would be to define a separate function and uses boost::bind() to stash the arguments, right ? PS2, Line 332: cb(Status::OK(), req, resp, controller_ptr); Why do we pass Status::OK() for non-retryable error or after exceeding the maximum number of retries ? Line 337: kudu::MonoDelta retry_interval = kudu::MonoDelta::FromMilliseconds(params->retry_interval_ms); long line http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/runtime/data-stream-sender.cc File be/src/runtime/data-stream-sender.cc: PS1, Line 265: 10 Mind commenting above what 10 stands for ? PS1, Line 266: numeric_limits<int32_t>::max() Why is this not FLAGS_datastream_sender_timeout_ms ? http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/runtime/data-stream-sender.cc File be/src/runtime/data-stream-sender.cc: PS2, Line 203: auto cb = [self_ptr = weak_ptr<DataStreamSender::Channel>(self_), : instance_id = fragment_instance_id_, proto_batch = batch] : (const Status& status, TransmitDataRequestPb* request, : TransmitDataResponsePb* response, RpcController* controller) { : : // Ensure that request and response get deleted when this callback returns. : auto request_container = unique_ptr<TransmitDataRequestPb>(request); : auto response_container = unique_ptr<TransmitDataResponsePb>(response); : : // Check if this channel still exists. : auto channel = self_ptr.lock(); : if (!channel) return; : { : lock_guard<SpinLock> l(channel->lock_); : Status rpc_status = status.ok() ? FromKuduStatus(controller->status()) : status; : : int32_t status_code = response->status().status_code(); : channel->recvr_gone_ = status_code == TErrorCode::DATASTREAM_RECVR_ALREADY_GONE; : : if (!rpc_status.ok()) { : channel->last_rpc_status_ = rpc_status; : } else if (!channel->recvr_gone_) { : if (status_code != TErrorCode::OK) { : // Don't bubble up the 'receiver gone' status, because it's not an error. : channel->last_rpc_status_ = Status(response->status()); : } else { : int size = proto_batch->GetSize(); : channel->num_data_bytes_sent_.Add(size); : VLOG_ROW << "incremented #data_bytes_sent=" : << channel->num_data_bytes_sent_.Load(); : } : } : channel->rpc_in_flight_ = false; : } : channel->rpc_done_cv_.notify_one(); : }; I am no C++ expert so this question may be stupid: can we not write this as a lambda function ? I am not sure how well gdb can handle lambda functions when compiled with optimization and this callback seems important enough that one may want to inspect its states in a core dump if necessary. http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/service/impala-internal-service.cc File be/src/service/impala-internal-service.cc: PS1, Line 63: DataStreamService nit: Just wondering why we didn't put DataStreamService in data-stream-service.cc ? -- To view, visit http://gerrit.cloudera.org:8080/7103 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b Gerrit-PatchSet: 2 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Henry Robinson <he...@cloudera.com> Gerrit-Reviewer: Henry Robinson <he...@cloudera.com> Gerrit-Reviewer: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Sailesh Mukil <sail...@cloudera.com> Gerrit-HasComments: Yes