Csaba Ringhofer has posted comments on this change. ( http://gerrit.cloudera.org:8080/21579 )
Change subject: POC: Add queuing to KrpcDataStreamSender ...................................................................... Patch Set 15: (18 comments) Thanks, fixed some of the comments, not all. My next step is to run tests with increased queue depths to check if the queue implementation really works. http://gerrit.cloudera.org:8080/#/c/21579/12//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/21579/12//COMMIT_MSG@12 PS12, Line 12: 2. in-fligh batch (with current active RPC) > Spell check on the commit message would catch a few issues. Done http://gerrit.cloudera.org:8080/#/c/21579/14/be/src/runtime/krpc-data-stream-sender.h File be/src/runtime/krpc-data-stream-sender.h: http://gerrit.cloudera.org:8080/#/c/21579/14/be/src/runtime/krpc-data-stream-sender.h@277 PS14, Line 277: // Increments eos_completed_count_ and wakes WaitUntilEmpty() if all channels are > line too long (91 > 90) Done http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h File be/src/runtime/krpc-data-stream-sender.h: http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@110 PS12, Line 110: with the exception of broadcast > can you describe here how it's different with broadcast? (single OutboundQu Done http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@113 PS12, Line 113: a row batch ( > nit: either "a row batch" or "row batches"? Done http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@118 PS12, Line 118: / > nit: missed a / here Done http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@211 PS12, Line 211: KrpcDataStreamSender* parent_ = nullptr; > Instead of holding an extra pointer, could be a GetParent() method that cal Didn't think through yet. http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@237 PS12, Line 237: class OutboundQueue { > nit: private class could be defined separately, like a separate header file I would like to do this later, but I am not sure yet about how to structure this. In the long term I would like to split up krpc-data-stream-sender.cc. http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@263 PS12, Line 263: Status WaitUntilEmpty(); > nit: Til is a shortening of Until, which only has 1 'l'. So I'd do WaitTilE done, chosen WaitUntilEmpty http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@300 PS12, Line 300: // All channels managed (but not owned) by this queue. > Consider https://www.boost.org/doc/libs/1_74_0/doc/html/boost/container/sta Done http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@317 PS12, Line 317: // sending it (QueuedBatch::consumers_left reaches 0). > If we're trying to minimize memory allocation/deallocation, this might not I didn't think through yet what is the best solution. Using an intrusive list could be also an option. http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@507 PS12, Line 507: // Pool of free OutboundRowBatch buffers available for serialization. > Use deque (or queue) here too. Same as for queue_ http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.cc@475 PS12, Line 475: // Holds a non-OK status if starting the next RPC (TransmitData or EOS) failed. > I think this section would be helpful to move to a new function with a desc agree, but haven't refactored yet http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.cc@636 PS12, Line 636: > nit: Since this doesn't directly send the current batch anymore, only appen Done http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.cc@918 PS12, Line 918: compression_scratch_.reset(new TrackedString(*char_mem_tracker_allocator_)); > This and the other partition_type_ checks would make more sense as a switch I agree that the current large number of ifs is a mess, but I am not sure about the best way to structure this. Probably I would move each partition_type_ to its own init function. http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.cc@920 PS12, Line 920: if (partition_type_ == TPartitionType::UNPARTITIONED) { > queues_.reserve first, or use static_vector. Done http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.cc@1189 PS12, Line 1189: RETURN_IF_ERROR(SerializeBatch( > Should we DCHECK_EQ(queues_.size(), 1)? Done http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.cc@1483 PS12, Line 1483: // All channels are closed. Return the batch directly to the pool. > QueuedBatch can be constructed directly in emplace_back like Done http://gerrit.cloudera.org:8080/#/c/21579/14/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/21579/14/be/src/runtime/krpc-data-stream-sender.cc@140 PS14, Line 140: // OutboundQueue::FlushFinal() and WaitUntilEmpty()) before closing the data stream > line too long (91 > 90) Done -- To view, visit http://gerrit.cloudera.org:8080/21579 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ibe3d01fada31f1b48de6c1403e7d2955114d8078 Gerrit-Change-Number: 21579 Gerrit-PatchSet: 15 Gerrit-Owner: Csaba Ringhofer <[email protected]> Gerrit-Reviewer: Alexey Serbin <[email protected]> Gerrit-Reviewer: Balazs Hevele <[email protected]> Gerrit-Reviewer: Csaba Ringhofer <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Michael Smith <[email protected]> Gerrit-Comment-Date: Tue, 19 May 2026 13:08:48 +0000 Gerrit-HasComments: Yes
