Hi Ufuk,
       Thank you for your detail and clear explaination!  I reviewed the code 
based on your info and got it clearly.
For local transfer: When producer emits results to ResultPartition, it will 
request  Buffer from the pool. If there  are no available Buffer, it will wait 
.The ResultPartition of producer is the InputGate of Consumer, when consumer 
read buffer from InputGate and deserializer the buffer, the Buffer will be 
recycled ,so the producer can request Buffer again.So if the consumer slows, 
the Buffer in the ResultPartition of producer can not be recycled quickly, 
resulting in producer has no available Buffer to emit data.If the producer 
waits the available Buffer to emit, it can not process elements and read next 
data from its InputGate, resulting in slowing the producer.For remote transfer: 
    The InputGate and ResultPartition for each task are separate     1. For 
producer the Buffer in ResultPartition will be recycled when producer write 
them to the channel.     2. For consumer the Buffer will be recycled when 
consumer read it from InputGate and deserializer it . The consumer need to 
request Buffer when reading data from channel     and put them to InputGate.    
 3.  When consumer slows and there are no available Buffer , the consumer will 
not read data from channel, so it will affect the producer writing data to 
channel and  the producer will have no available Buffer to emit result at last 
resulting in slowing producer.
My understanding is right?   Looking forward to your docs!
Best wishes,
Zhijiang Wang
------------------------------------------------------------------发件人:Ufuk 
Celebi <[email protected]>发送时间:2015年8月7日(星期五) 21:07收件人:user 
<[email protected]>,wangzhijiang999 <[email protected]>主 题:Re: how 
to understand the flink flow controlHey Zhijiang Wang,I will update the docs 
next week with more information. The short version is that flow control happens 
via the buffer pools that Flink uses for produced and consumed intermediate 
results.The slightly ;) longer version:Each task has buffer pools. The size of 
these buffer pools depends on multiple things (per task manager):- the 
configured number of network buffers [default 2048]- the number of tasks 
running- the number of consumed and produced outputsEach consumed input (if you 
are looking into the code: each SingleInputGate) has a buffer pool associated 
with it and each produced intermediate result (see IntermediateResultPartition 
in the code) as well.Each produced record is serialized into a buffer of the 
respective buffer pool of the produced result partition and dispatched to the 
consumer (either local or remote via network). After the buffer has been 
consumed it is recycled to the pool and can be used for the outstanding 
records. If the producer is faster than the consumer, these buffer will take 
longer to be available again and the producer will slow down by waiting on a 
buffer.For local exchange the buffer is consumed as soon as the local consumer 
has deserialized the records and for remote exchange as soon as the network 
layer has dispatched the buffer.For the input side, there is a similar 
mechanism. The network layer receives a buffer and copies it to the buffer pool 
of the respective input gate and queues the filled buffer to the (remote) input 
channel. If there is no buffer available at the input gate, the TCP channel is 
not read until a buffer is available again. This backpressures remote 
receivers, because their output buffers are not dispatched and cannot be 
recycled.I hope this helps. If you have further questions, just post them here. 
I will update the docs with some figures, so this will be easier to follow.– 
UfukOn 07 Aug 2015, at 03:37, wangzhijiang999 <[email protected]> 
wrote:> As said in apache page, Flink's streaming runtime has natural flow 
control: Slow downstream operators backpressure faster upstream operators.> How 
to understand the flink natural flow control? > As i know, heron has the 
backpressure mechanism, if some tasks process slowly, it will stop reading from 
source and notify other tasks to stop reading from source.> In flink, if the 
producer task process quickly, it will emit the results to consumer. So the 
buffer in InputChannel of consumer wil be filled up, if the consumer process 
slowly, how to control the upstream flow?> > Thank you for any suggestions in 
advance!> > > Best wishes,> > Zhijiang Wang

Reply via email to