Csaba Ringhofer has uploaded a new patch set (#20). ( http://gerrit.cloudera.org:8080/21579 )
Change subject: IMPALA-15080: Add queuing to KrpcDataStreamSender ...................................................................... IMPALA-15080: Add queuing to KrpcDataStreamSender Before this patch KrpcDataStreamSender handled 2 batches per channel: 1. serialization batch (not ready for send) 2. in-flight batch (with current active RPC) If the in-flight batch was not finished till serialization finished, then the sender blocked. This meant that all RPCs were initiated from the fragment instance thread (with the exception of repeated ones on errors). To provide deeper queue the main challenge is to send the next batch in queue immediately once the in-flight RPC finished, which can't be done from the fragment instance thread which may be busy somewhere else. The solution is to keep a queue of ready-to-send batches (OutboundQueue) and dispatch the next batch in the callback of last successful TransmitData (TransmitDataCompleteCb(), similarly to how repeated RPCs were handled). This happens on KRPC reactor thread so should not do significant work or block. Most cases have 1 OutboundQueue per Channel, with the exception of broadcasts, where a single OutboundQueue is shared by all channels, allowing sharing of buffers while progressing with batches independently. Besides sending next TransmitData, EndDataStream is also sent during TransmitDataCompleteCb in case of EOS to notify the receiver quicker without waiting other receivers to consume their batches. Queue sizes can be set with (hidden) flags: data_stream_sender_broadcast_queue_depth (default: 2) data_stream_sender_per_channel_queue_depth (default: 2) Default queue depth of 2 is similar to previous behavior and should not cause significant increase in memory usage. Increasing depth for broadcast may be beneficial in the future as relatively small memory can increase the resistance to transient slowness in network or receivers. For partitioned case the depth of 2 per channel can already bring significant benefits - in the evenly distributed case 1 from the 2 batches will be occupied for collecting rows per-partition, and the extra 1 batch of channels without in-flight RPC can be redistributed. For example if a single receiver is stalled from 100, it can "grow" a queue of 50 batches till half of the receivers start starving (starving receivers can't have in-flight RPC). Queue capacity is enforced by KrpcDataStreamSender::WaitForCapacity(). This will block after returning 'batch_pool_max_size_' batches till ReleaseBatch() is called from OutboundQueue. Buffers of released batches are reused to avoid extra cross thread allocations/deletes (allocation on fragment instance thread, free on reactor thread). Skewed partitioned exchanges may waste memory by having large queues for the limited number of actually used channels. To protect against regression from growing mem need in this case, WaitForCapacity will only allocate new batches if queue for the channel is not at capacity (default 2), meaning that channels that are never used won't be counted in the 2*channels capacity. E.g. if only a single channel has any rows, it will have a limit of 2 like in unpartitioned case. Benchmarks: - 0.5-1% improvement on dev env TPCDS-20, I expect larger improvement on bigger clusters + real network Partially generated-by: Claude Sonnet 4.6 Change-Id: Ibe3d01fada31f1b48de6c1403e7d2955114d8078 --- M be/src/runtime/krpc-data-stream-sender-ir.cc M be/src/runtime/krpc-data-stream-sender.cc M be/src/runtime/krpc-data-stream-sender.h 3 files changed, 659 insertions(+), 229 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/79/21579/20 -- To view, visit http://gerrit.cloudera.org:8080/21579 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: Ibe3d01fada31f1b48de6c1403e7d2955114d8078 Gerrit-Change-Number: 21579 Gerrit-PatchSet: 20 Gerrit-Owner: Csaba Ringhofer <[email protected]> Gerrit-Reviewer: Alexey Serbin <[email protected]> Gerrit-Reviewer: Balazs Hevele <[email protected]> Gerrit-Reviewer: Csaba Ringhofer <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Michael Smith <[email protected]>
