Hi Ufuk,
Thank you for your detail and clear explaination! I reviewed the code
based on your info and got it clearly.
For local transfer: When producer emits results to ResultPartition, it will
request Buffer from the pool. If there are no available Buffer, it will wait
.The ResultPartition of producer is the InputGate of Consumer, when consumer
read buffer from InputGate and deserializer the buffer, the Buffer will be
recycled ,so the producer can request Buffer again.So if the consumer slows,
the Buffer in the ResultPartition of producer can not be recycled quickly,
resulting in producer has no available Buffer to emit data.If the producer
waits the available Buffer to emit, it can not process elements and read next
data from its InputGate, resulting in slowing the producer.For remote transfer:
The InputGate and ResultPartition for each task are separate 1. For
producer the Buffer in ResultPartition will be recycled when producer write
them to the channel. 2. For consumer the Buffer will be recycled when
consumer read it from InputGate and deserializer it . The consumer need to
request Buffer when reading data from channel and put them to InputGate.
3. When consumer slows and there are no available Buffer , the consumer will
not read data from channel, so it will affect the producer writing data to
channel and the producer will have no available Buffer to emit result at last
resulting in slowing producer.
My understanding is right? Looking forward to your docs!
Best wishes,
Zhijiang Wang
------------------------------------------------------------------发件人:Ufuk
Celebi <[email protected]>发送时间:2015年8月7日(星期五) 21:07收件人:user
<[email protected]>,wangzhijiang999 <[email protected]>主 题:Re: how
to understand the flink flow controlHey Zhijiang Wang,I will update the docs
next week with more information. The short version is that flow control happens
via the buffer pools that Flink uses for produced and consumed intermediate
results.The slightly ;) longer version:Each task has buffer pools. The size of
these buffer pools depends on multiple things (per task manager):- the
configured number of network buffers [default 2048]- the number of tasks
running- the number of consumed and produced outputsEach consumed input (if you
are looking into the code: each SingleInputGate) has a buffer pool associated
with it and each produced intermediate result (see IntermediateResultPartition
in the code) as well.Each produced record is serialized into a buffer of the
respective buffer pool of the produced result partition and dispatched to the
consumer (either local or remote via network). After the buffer has been
consumed it is recycled to the pool and can be used for the outstanding
records. If the producer is faster than the consumer, these buffer will take
longer to be available again and the producer will slow down by waiting on a
buffer.For local exchange the buffer is consumed as soon as the local consumer
has deserialized the records and for remote exchange as soon as the network
layer has dispatched the buffer.For the input side, there is a similar
mechanism. The network layer receives a buffer and copies it to the buffer pool
of the respective input gate and queues the filled buffer to the (remote) input
channel. If there is no buffer available at the input gate, the TCP channel is
not read until a buffer is available again. This backpressures remote
receivers, because their output buffers are not dispatched and cannot be
recycled.I hope this helps. If you have further questions, just post them here.
I will update the docs with some figures, so this will be easier to follow.–
UfukOn 07 Aug 2015, at 03:37, wangzhijiang999 <[email protected]>
wrote:> As said in apache page, Flink's streaming runtime has natural flow
control: Slow downstream operators backpressure faster upstream operators.> How
to understand the flink natural flow control? > As i know, heron has the
backpressure mechanism, if some tasks process slowly, it will stop reading from
source and notify other tasks to stop reading from source.> In flink, if the
producer task process quickly, it will emit the results to consumer. So the
buffer in InputChannel of consumer wil be filled up, if the consumer process
slowly, how to control the upstream flow?> > Thank you for any suggestions in
advance!> > > Best wishes,> > Zhijiang Wang