Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 )
Change subject: IMPALA-4856: Port data stream service to KRPC ...................................................................... Patch Set 8: (36 comments) http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@102 PS7, Line 102: the RPC state is : /// saved into the receiver's 'deferred_r > it's not really the batch added. and it's not just a single structure for t Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@104 PS7, Line 104: C is removed from the 'deferr > how about: ... from a deferred RPC queue and the row batch is deserialized. Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@393 PS7, Line 393: EarlySendersList& operator=(EarlySendersList&& other) { > quick comment for why we define a move constructor and move operator=, sinc Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@430 PS7, Line 430: n > id Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@436 PS7, Line 436: nt instan > identifies Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@61 PS7, Line 61: > nit: blank space Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@217 PS7, Line 217: > that shouldn't be possible in the DEFERRED_BATCHES case, right? so i'd prob This is actually possible if the receiver is closed before the deserialization threads execute this task. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@236 PS7, Line 236: << " node_id=" << r > why is this possible in the waiting_sender_ctxs case but not the closed_sen The response for the closed_sender_ctxs case is the same regardless of whether the receiver is unregistered or not. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@247 PS7, Line 247: !already_unregistere > why is that possible? It's possible that the receiver is cancelled and closed before the deserialization thread gets around to processing this request. In which case, the deferred RPCs would all be replied to in KrpcDataStreamRecvr::SenderQueue::Cancel(). http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@248 PS7, Line 248: AddEarlyClosedSender(finst_id, request, > So I guess we no longer multithread within a single sender queue (and for n PS8 will queue multiple deserialization requests to drain multiple deferred RPCs at the same time. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.h File be/src/runtime/krpc-data-stream-recvr.h: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.h@78 PS7, Line 78: The caller must call TransferAllResources() : /// to acquire dat > is that talking about calling TransferAllResources(), or can the caller do Yes, it's TransferAllResources(). Comments clarified. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@72 PS7, Line 72: h > typo Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@97 PS7, Line 97: > the HasSpace name seems wrong for condition (1). From the name HasSpace, I Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@113 PS7, Line 113: B* r > deserialized or serialized size? deserialized size http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@169 PS7, Line 169: > these aren't really deferred batches though. They are deferred RPCs (which Renamed to deferred_rpcs_. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@220 PS7, Line 220: recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first); > rather than explaining what, we should explain why: Simplified in the new patch. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@224 PS7, Line 224: batch_queue_.pop_front(); > why do we need to drop the lock? Comments added. TriggerDeferredBatchesDrain() may block. Holding the lock here may lead to dead-lock. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@263 PS7, Line 263: KUDU_RETURN_IF_ERROR(rpc_context->GetInboundSidecar( > let's add a comment: Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@269 PS7, Line 269: e data sidecar"); > I guess this only happens in the non-merging case, right? Yes. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@274 PS7, Line 274: > I'm not sure what this dcheck is meant to prove. This is to verify that Close() cannot proceed as num_pending_enqueue_ > 0. I guess line 284 servers similar purpose so it can be removed. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@322 PS7, Line 322: > starvation of a sender in the non-merging case. Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@324 PS7, Line 324: : unique_lock<SpinLock> l(lock_); : // There should be one or more senders left when this function is called. The reason : // is that EndDataStream RPC is not sent until all outstanding TransmitData() RPC has : // been replied to. There is a > that comment shoudl be inside HasSpace() now. Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@345 PS7, Line 345: deferred_rpcs_.push(move(payload)); > are we able to exercise the deferred batches path in functional testing? if Yes, my local mini-cluster tests shows that we do exercise this patch with concurrent queries but yeah, we should include it in functional tests. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@531 PS7, Line 531: ADD_COUNTER(sender_side_profile_, "NumBatchesDeferred", TUnit:: > is that accurate? Yes. DeregisterRecvr() will call CancelStream(). http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.h File be/src/runtime/krpc-data-stream-sender.h: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.h@136 PS7, Line 136: // > nit: /// Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.cc@182 PS7, Line 182: > delete. Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.cc@235 PS7, Line 235: fail > nit: delete call too. 'C' of RPC is call. Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.cc@257 PS7, Line 257: (e.g. the reactor thread was being shut down) or if the > with thrift RPC, wouldn't we have retried making a connection and doing the Yes. We will fail the query in KRPC if the connection failed. Filed IMPALA-6159. Please see JIRA for details. Btw, I just double checked. The abort here is actually due to shutdown of the reactor thread not the connection. Comments updated. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.cc@273 PS7, Line 273: rpc_in_flight_ > rpc_in_flight_batch_ Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.cc@554 PS7, Line 554: while (rpc_in_flight_) rpc_done_cv_.wait(l); : } : batch_.reset(); : } > one liner? Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.cc@558 PS7, Line 558: > that becomes vacuous given the while loop condition. Removed. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/row-batch.h File be/src/runtime/row-batch.h: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/row-batch.h@349 PS7, Line 349: output_batch > outbound_batch (or rename the param) Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/row-batch.cc File be/src/runtime/row-batch.cc: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/row-batch.cc@426 PS7, Line 426: (tuple_offsets.size() / sizeof(int32_t)) * sizeof(Tuple*); > could you order this computation in the same order as thrift, to make it ea Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/row-batch.cc@432 PS7, Line 432: > what is this trying to compute? the size of the tuple_ptrs_? if so, it does Good catch. Fixed. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/row-batch.cc@433 PS7, Line 433: atch::GetSerializedSize(const OutboundRowBatch& batch > what is this accounting for? Nothing. Looks like thrift version has the same bug as this is not really used. I will refrain from updating the TRowBatch in this patch but updated the size calculation in GetDeserializedSize(). http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/row-batch.cc@440 PS7, Line 440: tuple_ptrs_size_, src->tuple_ptrs_size_); > what's that accounting? Left over from previous PS. Removed now. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 8 Gerrit-Owner: Michael Ho <[email protected]> Gerrit-Reviewer: Dan Hecht <[email protected]> Gerrit-Reviewer: Michael Ho <[email protected]> Gerrit-Reviewer: Mostafa Mokhtar <[email protected]> Gerrit-Reviewer: Sailesh Mukil <[email protected]> Gerrit-Comment-Date: Sun, 05 Nov 2017 20:23:47 +0000 Gerrit-HasComments: Yes
