Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 )
Change subject: IMPALA-4856: Port data stream service to KRPC ...................................................................... Patch Set 4: (86 comments) Reply to some of the comments for now. Will look into removing ProtoRowBatch next. Will not rebase until next version of the patch is pushed. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc File be/src/common/status.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc@246 PS3, Line 246: void Status::FromProto(const StatusPB& status) { > this is the same as FromThrift() effectively, right? Can we make the two lo Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc@262 PS3, Line 262: void Status::FreeMessage() noexcept { > same comment. let's make this and ToThrift look the same so it's obvious th Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/exec/exchange-node.cc File be/src/exec/exchange-node.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/exec/exchange-node.cc@109 PS3, Line 109: RETURN_IF_CANCELLED(state); > why do we do this in some Open() but not all? Should we just do it in ExecN Actually, I noticed similar patterns in other exec nodes. Let me keep this line of change for now and do the refactoring in another change. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.h File be/src/rpc/rpc-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.h@154 PS3, Line 154: ~RpcMgr() { > nit: one-liner? Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.cc File be/src/rpc/rpc-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.cc@77 PS3, Line 77: VLOG_QUERY << "Registered KRPC service: " << service_pool->service_name(); > Should we add a log message stating which services we registered with KRPC? Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@63 PS3, Line 63: tiple RPCs. The logical connection between a pair of client > I don't think that's accurate. see questions in krpc-data-stream-recvr.h ab Comment rephrased. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@93 PS3, Line 93: After the first batch has been received, a sender continues to send batches, one > XXX check whether these are really different Rephrased. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@94 PS3, Line 94: () RPC > what buffer? do you mean queue? Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@108 PS3, Line 108: > what does that mean? Is it saying that during unordinary operation, a send It means the fragment instance completes without hitting any error. If a fragment instance ends early, it may end up not calling EOS() RPC. For instance, if there is any cancellation, the stream will just be torn down without sending EOS as it's expected that the receivers' fragments will be cancelled too. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@140 PS3, Line 140: RPCs, and may be cancell > what are "it" and "its" here? "the sender" and "the RPC's"? the result will be dropped silently by the RPC layer. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@141 PS3, Line 141: /// time. If an in-flight RPC is cancelled at the sender side, the reply from the receiver > is that still true now that we have cancellation of RPCs? Yup. If an RPC is cancelled before the result arrives, the KRPC code will just ignore the result. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@153 PS3, Line 153: > sending fragment? Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@164 PS3, Line 164: structure is const > is that because the recvr hasn't showed up yet, or because the stream's que both. Comments updated. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@166 PS3, Line 166: > is that talking about the 'request' field below, or something different? Yes. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@175 PS3, Line 175: kudu::rpc::RpcContext* rpc_context; > what's the relationship between this and proto_batch? proto_batch is the inbound row_batch populated from information in 'request' and 'rpc_context'. I agree that it's not strictly necessary to keep it in TransmitDataCtx. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@178 PS3, Line 178: /// such as the destination finst ID, plan node ID and the row batch header. > who owns it? 'context'. Commends added. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@235 PS3, Line 235: the mai > dest_node_id? Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@239 PS3, Line 239: Create a receiver for a specific fragment_instance_id/dest_node_id. : /// If is_m > that seems unnecessary but don't change it now. The problem is that Close() of a receiver is not synchronized with the service threads which add the row batches. So, it's possible that there are still outstanding references to the receiver after it has been closed by the owning exchange node. We should fix this by synchronizing the Close() of a receiver and all outstanding service threads which hold reference to it. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@246 PS3, Line 246: t > 'proto_batch'? Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@248 PS3, Line 248: o > 'request'. Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@266 PS3, Line 266: The RPC may not be res > is this an RPC handler? I think we should just be explicit about which of t Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@267 PS3, Line 267: > what RPC is this talking about? If this is a handler, then it's clear. Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@274 PS3, Line 274: > Does it close or cancel? (or is there no difference?) Cancel is more accurate. Close() will free all the buffered row batches. Cancel() will just mark all sender queues as cancelled so no more row batches can be enqueued. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@284 PS3, Line 284: > To be consistent with terminology used in class comment, maybe say "deferre Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@314 PS3, Line 314: > susper-nit: Capital 'W' Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@340 PS3, Line 340: > what is that saying? is that a misplaced comma or am I reading this wrong? We sort all receiver IDs based on (finst_id, dest_node_id). To locate all receivers for a given fragment instance, we call std::set::lower_bound(finst_id, 0) to identify the first entry and then iterate until finst_id stops matching. I rephrased the comment to make it less confusing. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@341 PS3, Line 341: } else if (a.first. > I don't understand this. it kinda sounds like we're trying to be able to f Clarified in the new comment. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@349 PS3, Line 349: } > hmm, I guess we need this now that we can't block the RPC thread? Yes. In essence, anything which blocks needs to be either stashed somewhere or replied to with an error status. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@358 PS3, Line 358: ntRe > Monotonic time Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@374 PS3, Line 374: > monotonic time Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@382 PS3, Line 382: /// or b) the Maintenance() thread detects that the longest-waiting sender has been > all this parallel startup stuff really needs to be revisited (but not for t Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@386 PS3, Line 386: > maybe call it DeserializeDeferred() or DeserializeWorker() to make it clear Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@404 PS3, Line 404: void EnqueueDeferredBatch(DeserializeWorkItem&& payload); > how about grouping this with Deferred function above since it's related. Al Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@413 PS3, Line 413: ndDat > what's that? Removed. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@414 PS3, Line 414: /// st > I think that status is not getting checked by the caller. I thought Tim mad Added check for return status. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@416 PS3, Line 416: void AddEarlyClosedSender(const TUniqueId& fragment_instance_id, > let's add a quick comment. Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@421 PS3, Line 421: n empty shared_ptr if > RespondToTimedOutSender() or RespondTimeOutToSender()? Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@112 PS3, Line 112: for (unique_ptr<TransmitDataCtx>& ctx : early_senders.waiting_sender_ctxs) { : EnqueueDeferredBatch({recvr->fragment_instance_id(), move(ctx)}); : num_senders_waiting_->Increment(-1); : } : for (unique_ptr<EndDataStreamCtx>& ctx : early_senders.closed_sender_ctxs) { : recvr->RemoveSender(ctx->request->sender_id()); : Status::OK().ToProto(ctx->response->mutable_status()); : ctx->rpc_context->RespondSuccess(); : num_senders_waiting_->Increment(-1); : } > It's not possible for the same sender to be in waiting_senders_ctxs and clo Yes, it's impossible as the early senders shouldn't have sent EOS() without waiting for the reply for its previous TransmitData() RPC. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@140 PS3, Line 140: } : RecvrId recvr_id = make_pair(finst_id, dest_node_id); : if (closed_stream_cache_.find(recvr_id) != closed_stream_cache_.end()) { : *already_unregistered = true; : } : return shared_ptr<KrpcDataStreamRecvr>(); : } : > I'm thinking it makes sense to prioritize finding the receiver with the ass Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@151 PS3, Line 151: vrId recvr_id = make_pair(finst_i > We could merge the implementations of AddEarlySender() and AddEarlyClosedSe Probably not worth it. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@174 PS3, Line 174: // closed_stream_cache_), the sender is timed out by the maintenance thread. > Add comment "In the worst case, this RPC is so late that the receiver is al Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@242 PS3, Line 242: : { : // TODO: Move this to maintenance thread. : // Remove any closed streams that have been in the cache for more than : // STREAM_EXPIRATION_TIME_MS. : lock_guard<mutex> l(lock_); : ClosedStreamMap::iterator it = closed_stream_expirations_.begin(); : int64_t now = MonotonicMillis(); : int32_t before = closed_stream_cache_.size(); : while (it != closed_stream_expirations_.end() && it->first < now) { : closed_stream_cache_.erase(it->second); : closed_stream_expirations_.erase(it++); : } : DCHECK_EQ(closed_stream_cache_.size(), closed_stream_expirations_.size()); : int32_t after = closed_stream_cache_.size(); : if (before != after) { : VLOG_QUERY << "Reduced stream ID cache from " << before << " items, to " << after : << ", eviction took: " : > Historically, we never had a maintenance thread, which is why we did the st Not a faster response to EOS RPC because the RPC is responded to already at line 240 but I agree that we should avoid blocking the service threads for this chore. Added a TODO. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@289 PS3, Line 289: } : : const string msg = Substitute( : "Unknown row r > Should this be a DCHECK instead? This may still be useful if the receiver is closed twice due to error. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@303 PS3, Line 303: ile (iter != fragment_recvr_set_.end() && iter->first == finst_id) { > Just realized that we could do this more efficiently. Instead of doing an O Not sure I understand the proposed idea. As discussed offline, we shouldn't call FindRecvr() for more than the number of receivers in a fragment instance. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.h File be/src/runtime/krpc-data-stream-recvr.h: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.h@48 PS3, Line 48: Single receiver of an m:n data stream. > Either the "an" shouldn't be there or streams shouldn't be plural. But I'm Yes, it should be m:n data stream without 's'. So, the definition would be all (send, recvr) pairs. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.h@116 PS3, Line 116: sponded t > that doesn't seem accurate Oops..fixed. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.h@117 PS3, Line 117: an't be > what's that? Fixed. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.h@126 PS3, Line 126: > mentioned above, we didn't clearly define what a "stream" actually is, so i See reply above. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.h@141 PS3, Line 141: > is this in bytes? Yes. Fixed. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.h@142 PS3, Line 142: _; > wouldn't it be more accurate (and consistent with mgr terminology) to say " Done http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@225 PS1, Line 225: ++num_pending_enqueue_; > But batch_queue_ will still be empty, so other callers of AddBatch() will s The lock at line 196 serializes all callers but yes, this is fixed in PS4. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.cc@192 PS3, Line 192: RpcContext* context) { This lock is pessimistic and prevents multiple threads from deserializing multiple row batches in parallel. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.cc@253 PS3, Line 253: lock_guard<SpinLock> l(lock_); : DCHECK_GT(num_remaining_senders_, 0); : num_remaining_senders_ = max(0, num_remaining_senders_ - 1); : VLOG_FILE << "decremented senders: fragment_instance > Is this that important that we have to do it while holding the lock? Useful for debugging missing EOS but can be done outside of the lock. It's not on for the default log level though. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.h File be/src/runtime/krpc-data-stream-sender.h: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.h@53 PS3, Line 53: ender instance, and is unique within a fragment. > it's not clear what that means from just reading the header, though i know Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.h@59 PS3, Line 59: ONED (broadc > that's not documented. Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.h@97 PS3, Line 97: friend class DataStreamTest; > should that really be public? seems more like a worker function. Moved to private. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.h@101 PS3, Line 101: virtual Status Init(const std::vector<TExpr>& thrift_output_exprs, > that seems weird. is it for testing? if so, can it be protected instead? Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.h@109 PS3, Line 109: class Channel; > why is that protected? is this a testing thing? Overriding the Init() in DataSink which is protected. This is called from DataSink::Create() so it shouldn't be public. The derived classes of DataSink may still call DataSink::Init() so it's protected. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@89 PS3, Line 89: // Note that due to KUDU-2011, timeout cannot be used with outbound sidecars. The client > I think we should add a reference to KUDU-2011 somewhere here like: Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@92 PS3, Line 92: r ref > query? does it mean fragment instance? Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@124 PS3, Line 124: status if serialization > is that still accurate? Removed. Comments carried over from the thrift implementation. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@139 PS3, Line 139: // Flushes any buffered row batches and sends the EOS RPC to close the channel. > that could use a comment. Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@141 PS3, Line 141: : int64_t num_data_bytes_sent() const { return num_data_bytes_sent_; } : > those could be commented together to say they identify the destination. it' The names are carried over from the existing data-stream-sender.cc. Comments may help. Please see my reply to the comments about "req_" and "response_" fields below. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@152 PS3, Line 152: int buffer_size_; > Add "Not used for unpartitioned and random partitioned types". Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@159 PS3, Line 159: ress address_; > caps since constant? I renamed this to NUM_OUTBOUND_BATCHES for now. Google C++ style guideline (http://google.github.io/styleguide/cppguide.html#Constant_Names) suggests otherwise but I think we don't follow the guideline strictly (e.g. the use of LLVM_CLASS_NAME in various places). Yes, there is implicit assumption about how the WaitForRpc() will wait for the in-flight RPC to complete so bumping this number will not provide any benefit. That said, it will help with readability to have a name for this value instead of using a magic constant. I plan to rewrite this part of the code later to use a queue which will allow more than 2 outbound row batches. I don't want to add extra complication to this already large patch. Will leave a TODO. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@175 PS3, Line 175: llptr > proxy_ Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@178 PS3, Line 178: // buffer for the serialized row batches. When one is used for the in-flight RPC, : // the execution thread can continue to run and serialize another row batch to the : // other entry. 'current_batch_idx_' is the index of the entry being used by the : // in-flight or last completed RPC. : // TODO: replace this with an actual queue. Schedule another RPC callback in the : // completion callback if the queue is not empty. : CachedProtoRowBatch cached_proto > Why do we need to store these in the class? Can't they be local in the func We need to access these fields from the callback (e.g. due to retry). The req_ fields may be generated on the fly but the response buffer definitely needs to live for the entire duration of RPC call. Will look into generating 'req_' on the fly as its lifetime need not be longer than the RPC invocation itself. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@197 PS3, Line 197: RpcController rpc_controller_; > why is that needed now? It saves some bandwidth if the receiver is already closed. It's also a stop-gap fix for IMPALA-3990. If the remote receiver is closed (e.g. hitting a limit), it will live in the closed_stream_cache_ until it gets evicted after STREAM_EXPIRATION_TIME_MS ms. After the receiver is evicted from this cache, all future calls to this receiver will fail with a timeout. If we have this flag here, we will save the sender from hitting the timeout issue in most cases. Granted, this is not a fool-proof solution for IMPALA-3990 (e.g. the sender may not send anything for STREAM_EXPIRATION_TIME_MS ms) but it will help in most cases. In the long run, we should have a better answer for IMPALA-3990 in the long run. Please also see my previous reply to Tim's comments. Cancelling the fragment instance may need some thought. For the current partitioning strategies, closing one of the channels shouldn't prevent a sender from sending to other channels unless there is only one channel. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@200 PS3, Line 200: TransmitDataRequestPB req_; > I think it would help to associate this with rpc_in_flight_ - move them adj Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@218 PS3, Line 218: > if we have the constant, shouldn't that use it? This function is removed in the latest patch. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@229 PS3, Line 229: > nit: re-invokes or retries Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@242 PS3, Line 242: // the actual RPC when the RPC is rescheduled. > Add a comment "Should only be called from the main fragment instance thread Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@290 PS3, Line 290: void EndDataStreamCompleteCb(); > It would be nice to separate out the responsibility of setting of certain s Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@334 PS3, Line 334: SCOPED_TIMER(parent_->state_->total_network_send_timer()); : : // Wait for in-flight RPCs to complete unless the parent sender is closed or cancelled. : while(rpc_in_flight_ && !ShouldTerminate()) { : > seems simpler to just write: Done. Will switch over to ConditionVariable as need arises in the future as the TimedWait() takes boost::unique_lock<boost::mutex>. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@397 PS3, Line 397: > CurrentProtoBatch(). Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@402 PS3, Line 402: remote_recvr_closed_ = true; > Would the case where the RPC got cancelled from L547 fall here? Or would it It can go to either line 406 or here, depending if the completion callback is invoked before the cancellation event is processed. Either way, the cancellation from L547 will not check for http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@528 PS3, Line 528: return Status::OK(); > if (UNLIKELY(remote_recvr_closed_)) return Status::OK(); ? Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@644 PS3, Line 644: return Status::OK(); > The RowBatch is serialized once per channel which is very wasteful. Fixed. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h File be/src/runtime/row-batch.h: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@52 PS3, Line 52: struct ProtoRowBatch { > I think we should get rid of this structure all together. IMO, the indirect ProtoRowBatch is a conceptual representation of a serialized row batch in both the sender and receiver side. A more appropriate name could be "SerializedRowBatch". I find it easier to have everything encapsulated in ProtoRowBatch when passing it to RowBatch::Deserialize() and hide the details of how it's constructed inside the RPC handler. Will look into removing the extra layer of indirection. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@89 PS3, Line 89: CachedProtoRowBatch > what is "cached" about this? Had hard time coming up with a good name to indicate the re-use of the vector and string buffers below. May be "reusable" is closer to the actual meaning but it's also quite confusing. Will switch to OutboundRowBatch and look into merging this with ProtoRowBatch above. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/service/data-stream-service.cc File be/src/service/data-stream-service.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/service/data-stream-service.cc@49 PS3, Line 49: // CloseSender() is guaranteed to eventually respond to this RPC so we don't do it here. > nit: "CloseSender() is guaranteed to eventually respond to this RPC, so we Done http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/data_stream_service.proto File common/protobuf/data_stream_service.proto: http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/data_stream_service.proto@29 PS3, Line 29: // Id of this fragment in its role as a sender. : optional int32 sender_id = 2; : > what are "IDs" in these cases? let's improve the documentation here. Especi There is no equivalent of typedef in protobuf as far as I can tell. Comments updated. http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto File common/protobuf/row_batch.proto: http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@30 PS3, Line 30: > in thrift we had TTupleId. Is there a reason we aren't defining those types As far as I know, there is no equivalent of typedef in protobuf. We can try defining a message with a single field but this seems unnecessarily cumbersome. http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@32 PS3, Line 32: = 2; > what's tuple_data? not a field in this structure... That's the tuple data sent as sidecar. Clarified in the new comments. http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@39 PS3, Line 39: The > size of what? Size of tuple_data. Comments fixed. http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@42 PS3, Line 42: ion is applied. > do we plan to fix that? Fixed in latest patch. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 4 Gerrit-Owner: Michael Ho <[email protected]> Gerrit-Reviewer: Dan Hecht <[email protected]> Gerrit-Reviewer: Michael Ho <[email protected]> Gerrit-Reviewer: Mostafa Mokhtar <[email protected]> Gerrit-Reviewer: Sailesh Mukil <[email protected]> Gerrit-Reviewer: Tim Armstrong <[email protected]> Gerrit-Comment-Date: Wed, 25 Oct 2017 22:54:52 +0000 Gerrit-HasComments: Yes
