Hello Alexey Serbin, Balazs Hevele, Michael Smith, Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

    http://gerrit.cloudera.org:8080/21579

to look at the new patch set (#15).

Change subject: POC: Add queuing to KrpcDataStreamSender
......................................................................

POC: Add queuing to KrpcDataStreamSender

Before this patch KrpcDataStreamSender handled 2 batches
per channel:
1. serialization batch (not ready for send)
2. in-fligh batch (with current active RPC)

If the in-flight batch was not finished till serialization finished,
then the sender blocked. This meant that all RPCs were initiated
from the fragment instance thread (with the exception of repeated
ones on errors).

To provide deeper queue the main challenge is to send the next batch
in queue immediately once the in-flight RPC finished, which can't be
done from the fragment instance thread which may be busy somewhere
else. The solution is to keep a queue of ready-to-send batches
(OutboundQueue) and dispatch the next batch in the callback of
last successful TransmitData (TransmitDataCompleteCb(), similarly to
how repeated RPCs were handled). This happens on KRPC reactor thread
so should not do significant work or block.

Most cases have 1 OutboundQueue per Channel, with the exception of
broadcasts, where a single OutboundQueue is shared by all channels,
allowing sharing of buffers while progressing with batches
independently.

Besides sending next TransmitData, EndDataStream is also sent during
TransmitDataCompleteCb in case of EOS to notify the receiver quicker
without waiting other receivers to consume their batches.

Queue sizes can be set with (hidden) flags:
data_stream_sender_broadcast_queue_depth (default: 2)
data_stream_sender_per_channel_queue_depth (default: 2)

Default queue depth of 2 is similar to previous behavior and should
not cause significant increase in memory usage. Increasing depth
for broadcast may be beneficial in the future as relatively small
memory can increase the resistance to  transient slowness in network
or receivers. For partitioned case the depth of 2 per channel can
already bring significant benefits - in the evenly distributed case
1 from the 2 batches will be occupied for collecting rows
per-partition, and the extra 1 batch of channels without in-flight
RPC can be redistributed. For example if a single receiver is stalled
from 100, it can "grow" a queue of 50 batches till half of the
receivers start starving (starving receivers can't have in-flight RPC).

Queue capacity is enforced by KrpcDataStreamSender::WaitForCapacity().
This will block after returning 'batch_pool_max_size_'  batches till
ReleaseBatch() is called from OutboundQueue. Buffers of released
batches are reused to avoid extra cross thread allocations/deletes
(allocation on fragment instance thread, free on reactor
thread).

Skewed partitioned exchanges may waste memory by having large queues
for the limited number of actually used channels. To protect against
regression from growing mem need in this case, WaitForCapacity will
only allocate new batches if queue for the channel is not at capacity
(default 2), meaning that channels that are never used won't be
counted in the 2*channels capacity. E.g. if only a single channel has
any rows, it will have a limit of 2 like in unpartitioned case.

Benchmarks:
- 0.5-1% improvement on dev env TPCDS-20, I expect larger improvement
  on bigger clusters + real network

Partially generated-by: Claude Sonnet 4.6

Change-Id: Ibe3d01fada31f1b48de6c1403e7d2955114d8078
---
M be/src/runtime/krpc-data-stream-sender-ir.cc
M be/src/runtime/krpc-data-stream-sender.cc
M be/src/runtime/krpc-data-stream-sender.h
3 files changed, 684 insertions(+), 223 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/79/21579/15
--
To view, visit http://gerrit.cloudera.org:8080/21579
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: Ibe3d01fada31f1b48de6c1403e7d2955114d8078
Gerrit-Change-Number: 21579
Gerrit-PatchSet: 15
Gerrit-Owner: Csaba Ringhofer <[email protected]>
Gerrit-Reviewer: Alexey Serbin <[email protected]>
Gerrit-Reviewer: Balazs Hevele <[email protected]>
Gerrit-Reviewer: Csaba Ringhofer <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Michael Smith <[email protected]>

Reply via email to