Dan Hecht has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 )
Change subject: IMPALA-4856: Port data stream service to KRPC ...................................................................... Patch Set 7: (37 comments) http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@102 PS7, Line 102: the batch is added : /// to the receiver's 'deferred_batches_' it's not really the batch added. and it's not just a single structure for the receiver (it may go into one of many queues for merging exchange). So how about saying: ... the RPC state is saved into a deferred queue. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@104 PS7, Line 104: from the pending sender queue how about: ... from a deferred RPC queue and the row batch is deserialized. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@393 PS7, Line 393: quick comment for why we define a move constructor and move operator=, since we don't typically want to define those. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@199 PS7, Line 199: DeserializeTask payload = : {DeserializeTaskType::EARLY_SENDERS, finst_id, dest_node_id, 0}; : deserialize_pool_.Offer(move(payload)); doesn't this mean we make early sender draining single threaded? shoudl we instead use the sender_id in this case as well and offer work per sender? or do we think this doesn't matter? http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@217 PS7, Line 217: already_unregistered that shouldn't be possible in the DEFERRED_BATCHES case, right? so i'd probably move this DCHECK into the cases below so you can tighten it up. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@235 PS7, Line 235: for (const unique_ptr<TransmitDataCtx>& ctx : early_senders.waiting_sender_ctxs) { shouldn't we process waiting_sender_ctxs before closed_sender_ctxs? Otherwise, if the same sender is in both lists we'll process those RPCs out of order. I guess that can't really happen given the current implementation of not responding to early RPCs and that senders only let one in flight, but it still seems to make more sense to do it the other way around. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@236 PS7, Line 236: already_unregistered why is this possible in the waiting_sender_ctxs case but not the closed_sender_ctxs case? http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@247 PS7, Line 247: already_unregistered why is that possible? http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@248 PS7, Line 248: recvr->AddDeferredBatches(task.sender_id); So I guess we no longer multithread within a single sender queue (and for non-merging, within a single receiver) doing it this way. I think that's okay but was it intentional? http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.h File be/src/runtime/krpc-data-stream-recvr.h: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.h@78 PS7, Line 78: The caller must acquire data from the : /// returned batch is that talking about calling TransferAllResources(), or can the caller do it directly? http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@127 PS6, Line 127: // If true, the receiver fragment for this stream got cancelled. > For the non-merging case, there is essentially only one queue. As mentioned elsewhere, I'm not totally convinced yet that this is the right way to do it but, yes, we can think about it more and change it later if necessary. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@72 PS7, Line 72: s typo http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@77 PS7, Line 77: Adds as many deferred batches as possible hmm I'm still not convinced this is the right thing to do (in the merging case). It seems like it's left up to chance as to the order that deferred batches are drained across the sender queues. But we can think about this more and address it later. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@97 PS7, Line 97: (1) 'batch_queue' is empty and there is no pending insertion the HasSpace name seems wrong for condition (1). From the name HasSpace, I was expecting it only to check for condition (2) because that tells us if there is still space. How about calling this CanEnqueue() or ShouldEnqueue()? http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@113 PS7, Line 113: size deserialized or serialized size? http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@169 PS7, Line 169: deferred batches these aren't really deferred batches though. They are deferred RPCs (which do contain batches). If you like the "deferred batches" terminology, I can live with it, but it seems somewhat misleading (though not worse than "blocked senders"). http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@220 PS7, Line 220: // if we haven't notified them already. rather than explaining what, we should explain why: We've either removed a batch from the sender queue (and so now there might be space to move in a deferred batch), or the sender queue is empty. In either case, try to process the deferred batches and move them to the sender queue. Though can the second case (empty case) even happen? Shouldn't we have already started this process if the queue was empty. That is, why is this so complicated. Why can't it just be: (1) while batch_queue_.empty() wait for data arrival. (2) Dequeue from batch queue and trigger deferred batch draining. i.e. that first while loop shoudn't need to care about the deferred_batches_ state. Whoever made the batch_queue_ empty should be responsible for triggering the draining so we can just wait for the arrival. And if there was nothing to drain, then the next batch to arrive would immediately go into the batch_queue_ anyway. So it seems like this can all be simplified, am I missing something? http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@223 PS7, Line 223: deferred_batches_.front()->request->sender_id(); that seems misleading in the case of !merging. sender_id will be some random one, but we'll funnel back into 0 anyway. But I guess this sender queue doesn't know what it's own sender id is, so okay. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@224 PS7, Line 224: l.unlock(); why do we need to drop the lock? http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@263 PS7, Line 263: COUNTER_ADD(recvr_->bytes_received_counter_, batch_size); let's add a comment: // Reserve queue space before dropping the lock. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@269 PS7, Line 269: deserialize multiple batches in parallel. I guess this only happens in the non-merging case, right? http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@274 PS7, Line 274: DCHECK_GT(num_pending_enqueue_, 0); I'm not sure what this dcheck is meant to prove. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@322 PS7, Line 322: starvation starvation of a sender in the non-merging case. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@324 PS7, Line 324: Note: It's important that we enqueue the new batch regardless of buffer limit if : // the queue is currently empty. In the case of a merging receiver, batches are : // received from a specific queue based on data order, and the pipeline will stall : // if the merger is waiting for data from an empty queue that cannot be filled because : // the limit has been reached. that comment shoudl be inside HasSpace() now. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@345 PS7, Line 345: void KrpcDataStreamRecvr::SenderQueue::AddDeferredBatches() { are we able to exercise the deferred batches path in functional testing? if not, I think we should figure out a way. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@531 PS7, Line 531: // All the sender queues will be cancelled after this call returns. is that accurate? http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@369 PS6, Line 369: // the need to manage yet another thread pool. > Mostly to avoid the complexity of managing yet another thread pool. Okay. I thought maybe to keep things cache local. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.cc@235 PS7, Line 235: call nit: delete call too. 'C' of RPC is call. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.cc@257 PS7, Line 257: (e.g. Connection object was shutdown due to network errors) with thrift RPC, wouldn't we have retried making a connection and doing the RPC in this case? and now with KRPC we will fail the query? Is that true? http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.cc@273 PS7, Line 273: current_batch_ rpc_in_flight_batch_ http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.cc@554 PS7, Line 554: while (rpc_in_flight_) { : rpc_done_cv_.wait(l); : } : } one liner? http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.cc@558 PS7, Line 558: DCHECK(!rpc_in_flight_); that becomes vacuous given the while loop condition. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/row-batch.h File be/src/runtime/row-batch.h: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/row-batch.h@349 PS7, Line 349: output_batch outbound_batch (or rename the param) http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/row-batch.cc File be/src/runtime/row-batch.cc: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/row-batch.cc@426 PS7, Line 426: result += header.num_tuples_per_row() * sizeof(int32_t); could you order this computation in the same order as thrift, to make it easy to see they are doing the same thing? http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/row-batch.cc@432 PS7, Line 432: batch.tuple_offsets_.size(); what is this trying to compute? the size of the tuple_ptrs_? if so, it doesn't look right since each offset will expand into a pointer, right? i.e. shouldn't it be tuple_offsets_.size() * sizeof(Tuple*). And the thrift version looks wrong too. And the Slice version is also different and wrong (since size() is different for Slice vs vector). http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/row-batch.cc@433 PS7, Line 433: batch.header_.num_tuples_per_row() * sizeof(int32_t); what is this accounting for? http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/row-batch.cc@440 PS7, Line 440: batch.header_.num_tuples_per_row() * sizeof(int32_t); what's that accounting? -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 7 Gerrit-Owner: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Dan Hecht <dhe...@cloudera.com> Gerrit-Reviewer: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Mostafa Mokhtar <mmokh...@cloudera.com> Gerrit-Reviewer: Sailesh Mukil <sail...@cloudera.com> Gerrit-Comment-Date: Fri, 03 Nov 2017 23:07:09 +0000 Gerrit-HasComments: Yes