Thanks Timo for suggested solution. Will go with idea of artificial key for
our use case.

Gagan

On Mon, Jan 7, 2019 at 10:21 PM Timo Walther <twal...@apache.org> wrote:

> Hi Gagan,
>
> a typical solution to such a problem is to introduce an artifical key
> (enrichment id + some additional suffix), you can then keyBy on this
> artificial key and thus spread the workload more evenly. Of course you need
> to make sure that records of the second stream are duplicated to all
> operators with the same artificial key.
>
> Depending on the frequency of the second stream, it might also worth to
> use a broadcast join that distributes the second stream to all operators
> such that all operators can perform the enrichment step in a round robin
> fashion.
>
> Regards,
> Timo
>
> Am 07.01.19 um 14:45 schrieb Gagan Agrawal:
>
> Flink Version is 1.7.
> Thanks Zhijiang for your pointer. Initially I was checking only for few.
> However I just checked for all and found couple of them having queue length
> of 40+ which seems to be due to skewness in data. Is there any general
> guide lines on how to handle skewed data? In my case I am taking union and
> then keyBy (with custom stateful Process function) on enrichment id of 2
> streams (1 enrichment stream with low volume and another regular data
> stream with high volume). I see that 30% of my data stream records have
> same enrichment Id and hence go to same tasks which results in skewness.
> Any pointers on how to handle skewness while doing keyBy would be of great
> help.
>
> Gagan
>
> On Mon, Jan 7, 2019 at 3:25 PM zhijiang <wangzhijiang...@aliyun.com>
> wrote:
>
>> Hi Gagan,
>>
>> What flink version do you use? And have you checked the 
>> buffers.inputQueueLength
>> for all the related parallelism (connected with A) of B?  It may exist the
>> scenario that only one parallelim B is full of inqueue buffers which back
>> pressure A, and the input queue for other parallelism B is empty.
>>
>> Best,
>> Zhijiang
>>
>> ------------------------------------------------------------------
>> From:Gagan Agrawal <agrawalga...@gmail.com>
>> Send Time:2019年1月7日(星期一) 12:06
>> To:user <user@flink.apache.org>
>> Subject:Buffer stats when Back Pressure is high
>>
>> Hi,
>> I want to understand does any of buffer stats help in debugging /
>> validating that downstream operator is performing slow when Back Pressure
>> is high? Say I have A -> B operators and A shows High Back Pressure which
>> indicates something wrong or not performing well on B side which is slowing
>> down operator A. However when I look at buffers.inputQueueLength for
>> operator B, it's 0. My understanding is that when B is processing slow,
>> it's input buffer will be full of incoming messages which ultimately
>> blocks/slows down upstream operator A. However it doesn't seem to be
>> happening in my case. Can someone throw some light on how should different
>> stats around buffers (e.g buffers.inPoolUsage, buffers.inputQueueLength,
>> numBuffersInLocalPerSecond, numBuffersInRemotePerSecond) look like when
>> downstream operator is performing slow?
>>
>> Gagan
>>
>>
>>
>

Reply via email to