Csaba Ringhofer has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/21579 )

Change subject: POC: Add queuing to KrpcDataStreamSender
......................................................................


Patch Set 15:

(18 comments)

Thanks, fixed some of the comments, not all.
My next step is to run tests with increased queue depths to check if the queue 
implementation really works.

http://gerrit.cloudera.org:8080/#/c/21579/12//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/21579/12//COMMIT_MSG@12
PS12, Line 12: 2. in-fligh batch (with current active RPC)
> Spell check on the commit message would catch a few issues.
Done


http://gerrit.cloudera.org:8080/#/c/21579/14/be/src/runtime/krpc-data-stream-sender.h
File be/src/runtime/krpc-data-stream-sender.h:

http://gerrit.cloudera.org:8080/#/c/21579/14/be/src/runtime/krpc-data-stream-sender.h@277
PS14, Line 277:     // Increments eos_completed_count_ and wakes 
WaitUntilEmpty() if all channels are
> line too long (91 > 90)
Done


http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h
File be/src/runtime/krpc-data-stream-sender.h:

http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@110
PS12, Line 110: with the exception of broadcast
> can you describe here how it's different with broadcast? (single OutboundQu
Done


http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@113
PS12, Line 113: a row batch (
> nit: either "a row batch" or "row batches"?
Done


http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@118
PS12, Line 118: /
> nit: missed a / here
Done


http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@211
PS12, Line 211:     KrpcDataStreamSender* parent_ = nullptr;
> Instead of holding an extra pointer, could be a GetParent() method that cal
Didn't think through yet.


http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@237
PS12, Line 237:   class OutboundQueue {
> nit: private class could be defined separately, like a separate header file
I would like to do this later, but I am not sure yet about how to structure 
this. In the long term I would like to split up krpc-data-stream-sender.cc.


http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@263
PS12, Line 263:     Status WaitUntilEmpty();
> nit: Til is a shortening of Until, which only has 1 'l'. So I'd do WaitTilE
done, chosen WaitUntilEmpty


http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@300
PS12, Line 300:     // All channels managed (but not owned) by this queue.
> Consider https://www.boost.org/doc/libs/1_74_0/doc/html/boost/container/sta
Done


http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@317
PS12, Line 317:     // sending it (QueuedBatch::consumers_left reaches 0).
> If we're trying to minimize memory allocation/deallocation, this might not
I didn't think through yet what is the best solution. Using an intrusive list 
could be also an option.


http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.h@507
PS12, Line 507:   // Pool of free OutboundRowBatch buffers available for 
serialization.
> Use deque (or queue) here too.
Same as for queue_


http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:

http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.cc@475
PS12, Line 475:   // Holds a non-OK status if starting the next RPC 
(TransmitData or EOS) failed.
> I think this section would be helpful to move to a new function with a desc
agree, but haven't refactored yet


http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.cc@636
PS12, Line 636:
> nit: Since this doesn't directly send the current batch anymore, only appen
Done


http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.cc@918
PS12, Line 918:   compression_scratch_.reset(new 
TrackedString(*char_mem_tracker_allocator_));
> This and the other partition_type_ checks would make more sense as a switch
I agree that the current large number of ifs is a mess, but I am not sure about 
the best way to structure this. Probably I would move each partition_type_ to 
its own init function.


http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.cc@920
PS12, Line 920:   if (partition_type_ == TPartitionType::UNPARTITIONED) {
> queues_.reserve first, or use static_vector.
Done


http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.cc@1189
PS12, Line 1189:     RETURN_IF_ERROR(SerializeBatch(
> Should we DCHECK_EQ(queues_.size(), 1)?
Done


http://gerrit.cloudera.org:8080/#/c/21579/12/be/src/runtime/krpc-data-stream-sender.cc@1483
PS12, Line 1483:     // All channels are closed. Return the batch directly to 
the pool.
> QueuedBatch can be constructed directly in emplace_back like
Done


http://gerrit.cloudera.org:8080/#/c/21579/14/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:

http://gerrit.cloudera.org:8080/#/c/21579/14/be/src/runtime/krpc-data-stream-sender.cc@140
PS14, Line 140: // OutboundQueue::FlushFinal() and WaitUntilEmpty()) before 
closing the data stream
> line too long (91 > 90)
Done



--
To view, visit http://gerrit.cloudera.org:8080/21579
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ibe3d01fada31f1b48de6c1403e7d2955114d8078
Gerrit-Change-Number: 21579
Gerrit-PatchSet: 15
Gerrit-Owner: Csaba Ringhofer <[email protected]>
Gerrit-Reviewer: Alexey Serbin <[email protected]>
Gerrit-Reviewer: Balazs Hevele <[email protected]>
Gerrit-Reviewer: Csaba Ringhofer <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Michael Smith <[email protected]>
Gerrit-Comment-Date: Tue, 19 May 2026 13:08:48 +0000
Gerrit-HasComments: Yes

Reply via email to