Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 4:

(86 comments)

Reply to some of the comments for now. Will look into removing ProtoRowBatch 
next. Will not rebase until next version of the patch is pushed.

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc
File be/src/common/status.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc@246
PS3, Line 246: void Status::FromProto(const StatusPB& status) {
> this is the same as FromThrift() effectively, right? Can we make the two lo
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc@262
PS3, Line 262: void Status::FreeMessage() noexcept {
> same comment. let's make this and ToThrift look the same so it's obvious th
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/exec/exchange-node.cc
File be/src/exec/exchange-node.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/exec/exchange-node.cc@109
PS3, Line 109:   RETURN_IF_CANCELLED(state);
> why do we do this in some Open() but not all? Should we just do it in ExecN
Actually, I noticed similar patterns in other exec nodes. Let me keep this line 
of change for now and do the refactoring in another change.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.h
File be/src/rpc/rpc-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.h@154
PS3, Line 154:   ~RpcMgr() {
> nit: one-liner?
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.cc
File be/src/rpc/rpc-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.cc@77
PS3, Line 77:   VLOG_QUERY << "Registered KRPC service: " << 
service_pool->service_name();
> Should we add a log message stating which services we registered with KRPC?
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@63
PS3, Line 63: tiple RPCs. The logical connection between a pair of client
> I don't think that's accurate. see questions in krpc-data-stream-recvr.h ab
Comment rephrased.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@93
PS3, Line 93: After the first batch has been received, a sender continues to 
send batches, one
> XXX check whether these are really different
Rephrased.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@94
PS3, Line 94: () RPC
> what buffer? do you mean queue?
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@108
PS3, Line 108:
> what does that mean?  Is it saying that during unordinary operation, a send
It means the fragment instance completes without hitting any error. If a 
fragment instance ends early, it may end up not calling EOS() RPC. For 
instance, if there is any cancellation, the stream will just be torn down 
without sending EOS as it's expected that the receivers' fragments will be 
cancelled too.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@140
PS3, Line 140: RPCs, and may be cancell
> what are "it" and "its" here? "the sender" and "the RPC's"?
the result will be dropped silently by the RPC layer.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@141
PS3, Line 141: /// time. If an in-flight RPC is cancelled at the sender side, 
the reply from the receiver
> is that still true now that we have cancellation of RPCs?
Yup. If an RPC is cancelled before the result arrives, the KRPC code will just 
ignore the result.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@153
PS3, Line 153:
> sending fragment?
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@164
PS3, Line 164:  structure is const
> is that because the recvr hasn't showed up yet, or because the stream's que
both. Comments updated.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@166
PS3, Line 166:
> is that talking about the 'request' field below, or something different?
Yes.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@175
PS3, Line 175:   kudu::rpc::RpcContext* rpc_context;
> what's the relationship between this and proto_batch?
proto_batch is the inbound row_batch populated from information in 'request' 
and 'rpc_context'. I agree that it's not strictly necessary to keep it in 
TransmitDataCtx.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@178
PS3, Line 178:   /// such as the destination finst ID, plan node ID and the row 
batch header.
> who owns it?
'context'. Commends added.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@235
PS3, Line 235: the mai
> dest_node_id?
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@239
PS3, Line 239: Create a receiver for a specific 
fragment_instance_id/dest_node_id.
             :   /// If is_m
> that seems unnecessary but don't change it now.
The problem is that Close() of a receiver is not synchronized with the service 
threads which add the row batches. So, it's possible that there are still 
outstanding references to the receiver after it has been closed by the owning 
exchange node.

We should fix this by synchronizing the Close() of a receiver and all 
outstanding service threads which hold reference to it.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@246
PS3, Line 246: t
> 'proto_batch'?
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@248
PS3, Line 248: o
> 'request'.
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@266
PS3, Line 266: The RPC may not be res
> is this an RPC handler? I think we should just be explicit about which of t
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@267
PS3, Line 267:
> what RPC is this talking about? If this is a handler, then it's clear.
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@274
PS3, Line 274:
> Does it close or cancel? (or is there no difference?)
Cancel is more accurate. Close() will free all the buffered row batches. 
Cancel() will just mark all sender queues as cancelled so no more row batches 
can be enqueued.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@284
PS3, Line 284:
> To be consistent with terminology used in class comment, maybe say "deferre
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@314
PS3, Line 314:
> susper-nit: Capital 'W'
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@340
PS3, Line 340:
> what is that saying? is that a misplaced comma or am I reading this wrong?
We sort all receiver IDs based on (finst_id, dest_node_id). To locate all 
receivers for a given fragment instance, we call 
std::set::lower_bound(finst_id, 0) to identify the first entry and then iterate 
until finst_id stops matching. I rephrased the comment to make it less 
confusing.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@341
PS3, Line 341: } else if (a.first.
> I don't understand this.  it kinda sounds like we're trying to be able to f
Clarified in the new comment.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@349
PS3, Line 349:     }
> hmm, I guess we need this now that we can't block the RPC thread?
Yes. In essence, anything which blocks needs to be either stashed somewhere or 
replied to with an error status.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@358
PS3, Line 358: ntRe
> Monotonic time
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@374
PS3, Line 374:
> monotonic time
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@382
PS3, Line 382:   /// or b) the Maintenance() thread detects that the 
longest-waiting sender has been
> all this parallel startup stuff really needs to be revisited (but not for t
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@386
PS3, Line 386:
> maybe call it DeserializeDeferred() or DeserializeWorker() to make it clear
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@404
PS3, Line 404:   void EnqueueDeferredBatch(DeserializeWorkItem&& payload);
> how about grouping this with Deferred function above since it's related. Al
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@413
PS3, Line 413: ndDat
> what's that?
Removed.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@414
PS3, Line 414: /// st
> I think that status is not getting checked by the caller. I thought Tim mad
Added check for return status.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@416
PS3, Line 416:   void AddEarlyClosedSender(const TUniqueId& 
fragment_instance_id,
> let's add a quick comment.
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@421
PS3, Line 421: n empty shared_ptr if
> RespondToTimedOutSender() or RespondTimeOutToSender()?
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@112
PS3, Line 112:   for (unique_ptr<TransmitDataCtx>& ctx : 
early_senders.waiting_sender_ctxs) {
             :      EnqueueDeferredBatch({recvr->fragment_instance_id(), 
move(ctx)});
             :      num_senders_waiting_->Increment(-1);
             :   }
             :   for (unique_ptr<EndDataStreamCtx>& ctx : 
early_senders.closed_sender_ctxs) {
             :      recvr->RemoveSender(ctx->request->sender_id());
             :      Status::OK().ToProto(ctx->response->mutable_status());
             :      ctx->rpc_context->RespondSuccess();
             :      num_senders_waiting_->Increment(-1);
             :   }
> It's not possible for the same sender to be in waiting_senders_ctxs and clo
Yes, it's impossible as the early senders shouldn't have sent EOS() without 
waiting for the reply for its previous TransmitData() RPC.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@140
PS3, Line 140:   }
             :   RecvrId recvr_id = make_pair(finst_id, dest_node_id);
             :   if (closed_stream_cache_.find(recvr_id) != 
closed_stream_cache_.end()) {
             :     *already_unregistered = true;
             :   }
             :   return shared_ptr<KrpcDataStreamRecvr>();
             : }
             :
> I'm thinking it makes sense to prioritize finding the receiver with the ass
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@151
PS3, Line 151: vrId recvr_id = make_pair(finst_i
> We could merge the implementations of AddEarlySender() and AddEarlyClosedSe
Probably not worth it.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@174
PS3, Line 174:     // closed_stream_cache_), the sender is timed out by the 
maintenance thread.
> Add comment "In the worst case, this RPC is so late that the receiver is al
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@242
PS3, Line 242:
             :   {
             :     // TODO: Move this to maintenance thread.
             :     // Remove any closed streams that have been in the cache for 
more than
             :     // STREAM_EXPIRATION_TIME_MS.
             :     lock_guard<mutex> l(lock_);
             :     ClosedStreamMap::iterator it = 
closed_stream_expirations_.begin();
             :     int64_t now = MonotonicMillis();
             :     int32_t before = closed_stream_cache_.size();
             :     while (it != closed_stream_expirations_.end() && it->first < 
now) {
             :       closed_stream_cache_.erase(it->second);
             :       closed_stream_expirations_.erase(it++);
             :     }
             :     DCHECK_EQ(closed_stream_cache_.size(), 
closed_stream_expirations_.size());
             :     int32_t after = closed_stream_cache_.size();
             :     if (before != after) {
             :       VLOG_QUERY << "Reduced stream ID cache from " << before << 
" items, to " << after
             :                  << ", eviction took: "
             :
> Historically, we never had a maintenance thread, which is why we did the st
Not a faster response to EOS RPC because the RPC is responded to already at 
line 240 but I agree that we should avoid blocking the service threads for this 
chore.

Added a TODO.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@289
PS3, Line 289: }
             :
             :   const string msg = Substitute(
             :       "Unknown row r
> Should this be a DCHECK instead?
This may still be useful if the receiver is closed twice due to error.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@303
PS3, Line 303: ile (iter != fragment_recvr_set_.end() && iter->first == 
finst_id) {
> Just realized that we could do this more efficiently. Instead of doing an O
Not sure I understand the proposed idea. As discussed offline, we shouldn't 
call FindRecvr() for more than the number of receivers in a fragment instance.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.h
File be/src/runtime/krpc-data-stream-recvr.h:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.h@48
PS3, Line 48: Single receiver of an m:n data stream.
> Either the "an" shouldn't be there or streams shouldn't be plural. But I'm
Yes, it should be m:n data stream without 's'. So, the definition would be all 
(send, recvr) pairs.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.h@116
PS3, Line 116: sponded t
> that doesn't seem accurate
Oops..fixed.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.h@117
PS3, Line 117: an't be
> what's that?
Fixed.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.h@126
PS3, Line 126:
> mentioned above, we didn't clearly define what a "stream" actually is, so i
See reply above.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.h@141
PS3, Line 141:
> is this in bytes?
Yes. Fixed.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.h@142
PS3, Line 142: _;
> wouldn't it be more accurate (and consistent with mgr terminology) to say "
Done


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@225
PS1, Line 225:     ++num_pending_enqueue_;
> But batch_queue_ will still be empty, so other callers of AddBatch() will s
The lock at line 196 serializes all callers but yes, this is fixed in PS4.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.cc@192
PS3, Line 192:   RpcContext* context) {
This lock is pessimistic and prevents multiple threads from deserializing 
multiple row batches in parallel.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-recvr.cc@253
PS3, Line 253:   lock_guard<SpinLock> l(lock_);
             :   DCHECK_GT(num_remaining_senders_, 0);
             :   num_remaining_senders_ = max(0, num_remaining_senders_ - 1);
             :   VLOG_FILE << "decremented senders: fragment_instance
> Is this that important that we have to do it while holding the lock?
Useful for debugging missing EOS but can be done outside of the lock. It's not 
on for the default log level though.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.h
File be/src/runtime/krpc-data-stream-sender.h:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.h@53
PS3, Line 53: ender instance, and is unique within a fragment.
> it's not clear what that means from just reading the header, though i know
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.h@59
PS3, Line 59: ONED (broadc
> that's not documented.
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.h@97
PS3, Line 97:   friend class DataStreamTest;
> should that really be public? seems more like a worker function.
Moved to private.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.h@101
PS3, Line 101:   virtual Status Init(const std::vector<TExpr>& 
thrift_output_exprs,
> that seems weird. is it for testing? if so, can it be protected instead?
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.h@109
PS3, Line 109:   class Channel;
> why is that protected? is this a testing thing?
Overriding the Init() in DataSink which is protected. This is called from 
DataSink::Create() so it shouldn't be public. The derived classes of DataSink 
may still call DataSink::Init() so it's protected.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@89
PS3, Line 89: // Note that due to KUDU-2011, timeout cannot be used with 
outbound sidecars. The client
> I think we should add a reference to KUDU-2011 somewhere here like:
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@92
PS3, Line 92: r ref
> query? does it mean fragment instance?
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@124
PS3, Line 124: status if serialization
> is that still accurate?
Removed. Comments carried over from the thrift implementation.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@139
PS3, Line 139:   // Flushes any buffered row batches and sends the EOS RPC to 
close the channel.
> that could use a comment.
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@141
PS3, Line 141:
             :   int64_t num_data_bytes_sent() const { return 
num_data_bytes_sent_; }
             :
> those could be commented together to say they identify the destination. it'
The names are carried over from the existing data-stream-sender.cc. Comments 
may help.

Please see my reply to the comments about "req_" and "response_" fields below.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@152
PS3, Line 152:   int buffer_size_;
> Add "Not used for unpartitioned and random partitioned types".
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@159
PS3, Line 159: ress address_;
> caps since constant?
I renamed this to NUM_OUTBOUND_BATCHES for now. Google C++ style guideline 
(http://google.github.io/styleguide/cppguide.html#Constant_Names) suggests 
otherwise but I think we don't follow the guideline strictly (e.g. the use of 
LLVM_CLASS_NAME in various places).

Yes, there is implicit assumption about how the WaitForRpc() will wait for the 
in-flight RPC to complete so bumping this number will not provide any benefit. 
That said, it will help with readability to have a name for this value instead 
of using a magic constant.

I plan to rewrite this part of the code later to use a queue which will allow 
more than 2 outbound row batches. I don't want to add extra complication to 
this already large patch. Will leave a TODO.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@175
PS3, Line 175: llptr
> proxy_
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@178
PS3, Line 178:   // buffer for the serialized row batches. When one is used for 
the in-flight RPC,
             :   // the execution thread can continue to run and serialize 
another row batch to the
             :   // other entry. 'current_batch_idx_' is the index of the entry 
being used by the
             :   // in-flight or last completed RPC.
             :   // TODO: replace this with an actual queue. Schedule another 
RPC callback in the
             :   // completion callback if the queue is not empty.
             :   CachedProtoRowBatch cached_proto
> Why do we need to store these in the class? Can't they be local in the func
We need to access these fields from the callback (e.g. due to retry).

The req_ fields may be generated on the fly but the response buffer definitely 
needs to live for the entire duration of RPC call. Will look into generating 
'req_' on the fly as its lifetime need not be longer than the RPC invocation 
itself.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@197
PS3, Line 197:   RpcController rpc_controller_;
> why is that needed now?
It saves some bandwidth if the receiver is already closed. It's also a stop-gap 
fix for IMPALA-3990.

If the remote receiver is closed (e.g. hitting a limit), it will live in the 
closed_stream_cache_ until it gets evicted after STREAM_EXPIRATION_TIME_MS ms. 
After the receiver is evicted from this cache, all future calls to this 
receiver will fail with a timeout. If we have this flag here, we will save the 
sender from hitting the timeout issue in most cases. Granted, this is not a 
fool-proof solution for IMPALA-3990 (e.g. the sender may not send anything for 
STREAM_EXPIRATION_TIME_MS ms) but it will help in most cases. In the long run, 
we should have a better answer for IMPALA-3990 in the long run. Please also see 
my previous reply to Tim's comments.

Cancelling the fragment instance may need some thought. For the current 
partitioning strategies, closing one of the channels shouldn't prevent a sender 
from sending to other channels unless there is only one channel.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@200
PS3, Line 200:   TransmitDataRequestPB req_;
> I think it would help to associate this with rpc_in_flight_ - move them adj
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@218
PS3, Line 218:
> if we have the constant, shouldn't that use it?
This function is removed in the latest patch.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@229
PS3, Line 229:
> nit: re-invokes or retries
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@242
PS3, Line 242:   // the actual RPC when the RPC is rescheduled.
> Add a comment "Should only be called from the main fragment instance thread
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@290
PS3, Line 290:   void EndDataStreamCompleteCb();
> It would be nice to separate out the responsibility of setting of certain s
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@334
PS3, Line 334:   SCOPED_TIMER(parent_->state_->total_network_send_timer());
             :
             :   // Wait for in-flight RPCs to complete unless the parent 
sender is closed or cancelled.
             :   while(rpc_in_flight_ && !ShouldTerminate()) {
             :
> seems simpler to just write:
Done. Will switch over to ConditionVariable as need arises in the future as the 
TimedWait() takes boost::unique_lock<boost::mutex>.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@397
PS3, Line 397:
> CurrentProtoBatch().
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@402
PS3, Line 402:       remote_recvr_closed_ = true;
> Would the case where the RPC got cancelled from L547 fall here? Or would it
It can go to either line 406 or here, depending if the completion callback is 
invoked before the cancellation event is processed. Either way, the 
cancellation from L547 will not check for


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@528
PS3, Line 528:   return Status::OK();
> if (UNLIKELY(remote_recvr_closed_)) return Status::OK(); ?
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@644
PS3, Line 644:   return Status::OK();
> The RowBatch is serialized once per channel which is very wasteful.
Fixed.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h
File be/src/runtime/row-batch.h:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@52
PS3, Line 52: struct ProtoRowBatch {
> I think we should get rid of this structure all together. IMO, the indirect
ProtoRowBatch is a conceptual representation of a serialized row batch in both 
the sender and receiver side. A more appropriate name could be 
"SerializedRowBatch".  I find it easier to have everything encapsulated in 
ProtoRowBatch when passing it to RowBatch::Deserialize() and hide the details 
of how it's constructed inside the RPC handler.

Will look into removing the extra layer of indirection.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@89
PS3, Line 89: CachedProtoRowBatch
> what is "cached" about this?
Had hard time coming up with a good name to indicate the re-use of the vector 
and string buffers below. May be "reusable" is closer to the actual meaning but 
it's also quite confusing.

Will switch to OutboundRowBatch and look into merging this with ProtoRowBatch 
above.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/service/data-stream-service.cc@49
PS3, Line 49:   // CloseSender() is guaranteed to eventually respond to this 
RPC so we don't do it here.
> nit: "CloseSender() is guaranteed to eventually respond to this RPC, so we
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/data_stream_service.proto
File common/protobuf/data_stream_service.proto:

http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/data_stream_service.proto@29
PS3, Line 29:   // Id of this fragment in its role as a sender.
            :   optional int32 sender_id = 2;
            :
> what are "IDs" in these cases? let's improve the documentation here. Especi
There is no equivalent of typedef in protobuf as far as I can tell.
Comments updated.


http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto
File common/protobuf/row_batch.proto:

http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@30
PS3, Line 30:
> in thrift we had TTupleId. Is there a reason we aren't defining those types
As far as I know, there is no equivalent of typedef in protobuf. We can try 
defining a message with a single field but this seems unnecessarily cumbersome.


http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@32
PS3, Line 32: = 2;
> what's tuple_data? not a field in this structure...
That's the tuple data sent as sidecar. Clarified in the new comments.


http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@39
PS3, Line 39: The
> size of what?
Size of tuple_data. Comments fixed.


http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@42
PS3, Line 42: ion is applied.
> do we plan to fix that?
Fixed in latest patch.



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 4
Gerrit-Owner: Michael Ho <[email protected]>
Gerrit-Reviewer: Dan Hecht <[email protected]>
Gerrit-Reviewer: Michael Ho <[email protected]>
Gerrit-Reviewer: Mostafa Mokhtar <[email protected]>
Gerrit-Reviewer: Sailesh Mukil <[email protected]>
Gerrit-Reviewer: Tim Armstrong <[email protected]>
Gerrit-Comment-Date: Wed, 25 Oct 2017 22:54:52 +0000
Gerrit-HasComments: Yes

Reply via email to