Re: Confusion about rebalance bytes sent metric in Flink UI

2021-12-16 Thread Piotr Nowojski
Hi Tao,

Could you prepare a minimalistic example that would reproduce this issue?
Also what Flink version are you using?

Best,
Piotrek

czw., 16 gru 2021 o 09:44 tao xiao  napisał(a):

> >Your upstream is not inflating the record size?
> No, this is a simply dedup function
>
> On Thu, Dec 16, 2021 at 2:49 PM Arvid Heise  wrote:
>
>> Ah yes I see it now as well. Yes you are right, each record should be
>> replicated 9 times to send to one of the instances each. Your upstream is
>> not inflating the record size? The number of records seems to work
>> decently. @pnowojski  FYI.
>>
>> On Thu, Dec 16, 2021 at 2:20 AM tao xiao  wrote:
>>
>>> Hi Arvid
>>>
>>> The second picture shows the metrics of the upstream operator. The
>>> upstream has 150 parallelisms as you can see in the first picture. I expect
>>> the bytes sent is about 9 * bytes received as we have 9 downstream
>>> operators connecting.
>>>
>>> Hi Caizhi,
>>> Let me create a minimal reproducible DAG and update here
>>>
>>> On Thu, Dec 16, 2021 at 4:03 AM Arvid Heise  wrote:
>>>
 Hi,

 Could you please clarify which operator we see in the second picture?

 If you are showing the upstream operator, then this has only
 parallelism 1, so there shouldn't be multiple subtasks.
 If you are showing the downstream operator, then the metric would refer
 to the HASH and not REBALANCE.

 On Tue, Dec 14, 2021 at 2:55 AM Caizhi Weng 
 wrote:

> Hi!
>
> This doesn't seem to be the expected behavior. Rebalance shuffle
> should send records to one of the parallelism, not all.
>
> If possible could you please explain what your Flink job is doing and
> preferably share your user code so that others can look into this case?
>
> tao xiao  于2021年12月11日周六 01:11写道:
>
>> Hi team,
>>
>> I have one operator that is connected to another 9 downstream
>> operators using rebalance. Each operator has 150 parallelisms[1]. I 
>> assume
>> each message in the upstream operation is sent to one of the parallel
>> instances of the 9 receiving operators so the total bytes sent should be
>> roughly 9 times of bytes received in the upstream operator metric. 
>> However
>> the Flink UI shows the bytes sent is much higher than 9 times. It is 
>> about
>> 150 * 9 * bytes received[2]. This looks to me like every message is
>> duplicated to each parallel instance of all receiving operators like what
>> broadcast does.  Is this correct?
>>
>>
>>
>> [1] https://imgur.com/cGyb0QO
>> [2] https://imgur.com/SFqPiJA
>> --
>> Regards,
>> Tao
>>
>
>>>
>>> --
>>> Regards,
>>> Tao
>>>
>>
>
> --
> Regards,
> Tao
>


Re: Confusion about rebalance bytes sent metric in Flink UI

2021-12-16 Thread tao xiao
>Your upstream is not inflating the record size?
No, this is a simply dedup function

On Thu, Dec 16, 2021 at 2:49 PM Arvid Heise  wrote:

> Ah yes I see it now as well. Yes you are right, each record should be
> replicated 9 times to send to one of the instances each. Your upstream is
> not inflating the record size? The number of records seems to work
> decently. @pnowojski  FYI.
>
> On Thu, Dec 16, 2021 at 2:20 AM tao xiao  wrote:
>
>> Hi Arvid
>>
>> The second picture shows the metrics of the upstream operator. The
>> upstream has 150 parallelisms as you can see in the first picture. I expect
>> the bytes sent is about 9 * bytes received as we have 9 downstream
>> operators connecting.
>>
>> Hi Caizhi,
>> Let me create a minimal reproducible DAG and update here
>>
>> On Thu, Dec 16, 2021 at 4:03 AM Arvid Heise  wrote:
>>
>>> Hi,
>>>
>>> Could you please clarify which operator we see in the second picture?
>>>
>>> If you are showing the upstream operator, then this has only parallelism
>>> 1, so there shouldn't be multiple subtasks.
>>> If you are showing the downstream operator, then the metric would refer
>>> to the HASH and not REBALANCE.
>>>
>>> On Tue, Dec 14, 2021 at 2:55 AM Caizhi Weng 
>>> wrote:
>>>
 Hi!

 This doesn't seem to be the expected behavior. Rebalance shuffle should
 send records to one of the parallelism, not all.

 If possible could you please explain what your Flink job is doing and
 preferably share your user code so that others can look into this case?

 tao xiao  于2021年12月11日周六 01:11写道:

> Hi team,
>
> I have one operator that is connected to another 9 downstream
> operators using rebalance. Each operator has 150 parallelisms[1]. I assume
> each message in the upstream operation is sent to one of the parallel
> instances of the 9 receiving operators so the total bytes sent should be
> roughly 9 times of bytes received in the upstream operator metric. However
> the Flink UI shows the bytes sent is much higher than 9 times. It is about
> 150 * 9 * bytes received[2]. This looks to me like every message is
> duplicated to each parallel instance of all receiving operators like what
> broadcast does.  Is this correct?
>
>
>
> [1] https://imgur.com/cGyb0QO
> [2] https://imgur.com/SFqPiJA
> --
> Regards,
> Tao
>

>>
>> --
>> Regards,
>> Tao
>>
>

-- 
Regards,
Tao


Re: Confusion about rebalance bytes sent metric in Flink UI

2021-12-15 Thread Arvid Heise
Ah yes I see it now as well. Yes you are right, each record should be
replicated 9 times to send to one of the instances each. Your upstream is
not inflating the record size? The number of records seems to work
decently. @pnowojski  FYI.

On Thu, Dec 16, 2021 at 2:20 AM tao xiao  wrote:

> Hi Arvid
>
> The second picture shows the metrics of the upstream operator. The
> upstream has 150 parallelisms as you can see in the first picture. I expect
> the bytes sent is about 9 * bytes received as we have 9 downstream
> operators connecting.
>
> Hi Caizhi,
> Let me create a minimal reproducible DAG and update here
>
> On Thu, Dec 16, 2021 at 4:03 AM Arvid Heise  wrote:
>
>> Hi,
>>
>> Could you please clarify which operator we see in the second picture?
>>
>> If you are showing the upstream operator, then this has only parallelism
>> 1, so there shouldn't be multiple subtasks.
>> If you are showing the downstream operator, then the metric would refer
>> to the HASH and not REBALANCE.
>>
>> On Tue, Dec 14, 2021 at 2:55 AM Caizhi Weng  wrote:
>>
>>> Hi!
>>>
>>> This doesn't seem to be the expected behavior. Rebalance shuffle should
>>> send records to one of the parallelism, not all.
>>>
>>> If possible could you please explain what your Flink job is doing and
>>> preferably share your user code so that others can look into this case?
>>>
>>> tao xiao  于2021年12月11日周六 01:11写道:
>>>
 Hi team,

 I have one operator that is connected to another 9 downstream operators
 using rebalance. Each operator has 150 parallelisms[1]. I assume each
 message in the upstream operation is sent to one of the parallel instances
 of the 9 receiving operators so the total bytes sent should be roughly 9
 times of bytes received in the upstream operator metric. However the Flink
 UI shows the bytes sent is much higher than 9 times. It is about 150 * 9 *
 bytes received[2]. This looks to me like every message is duplicated to
 each parallel instance of all receiving operators like what broadcast
 does.  Is this correct?



 [1] https://imgur.com/cGyb0QO
 [2] https://imgur.com/SFqPiJA
 --
 Regards,
 Tao

>>>
>
> --
> Regards,
> Tao
>


Re: Confusion about rebalance bytes sent metric in Flink UI

2021-12-15 Thread tao xiao
Hi Arvid

The second picture shows the metrics of the upstream operator. The upstream
has 150 parallelisms as you can see in the first picture. I expect the
bytes sent is about 9 * bytes received as we have 9 downstream operators
connecting.

Hi Caizhi,
Let me create a minimal reproducible DAG and update here

On Thu, Dec 16, 2021 at 4:03 AM Arvid Heise  wrote:

> Hi,
>
> Could you please clarify which operator we see in the second picture?
>
> If you are showing the upstream operator, then this has only parallelism
> 1, so there shouldn't be multiple subtasks.
> If you are showing the downstream operator, then the metric would refer to
> the HASH and not REBALANCE.
>
> On Tue, Dec 14, 2021 at 2:55 AM Caizhi Weng  wrote:
>
>> Hi!
>>
>> This doesn't seem to be the expected behavior. Rebalance shuffle should
>> send records to one of the parallelism, not all.
>>
>> If possible could you please explain what your Flink job is doing and
>> preferably share your user code so that others can look into this case?
>>
>> tao xiao  于2021年12月11日周六 01:11写道:
>>
>>> Hi team,
>>>
>>> I have one operator that is connected to another 9 downstream operators
>>> using rebalance. Each operator has 150 parallelisms[1]. I assume each
>>> message in the upstream operation is sent to one of the parallel instances
>>> of the 9 receiving operators so the total bytes sent should be roughly 9
>>> times of bytes received in the upstream operator metric. However the Flink
>>> UI shows the bytes sent is much higher than 9 times. It is about 150 * 9 *
>>> bytes received[2]. This looks to me like every message is duplicated to
>>> each parallel instance of all receiving operators like what broadcast
>>> does.  Is this correct?
>>>
>>>
>>>
>>> [1] https://imgur.com/cGyb0QO
>>> [2] https://imgur.com/SFqPiJA
>>> --
>>> Regards,
>>> Tao
>>>
>>

-- 
Regards,
Tao


Re: Confusion about rebalance bytes sent metric in Flink UI

2021-12-15 Thread Arvid Heise
Hi,

Could you please clarify which operator we see in the second picture?

If you are showing the upstream operator, then this has only parallelism 1,
so there shouldn't be multiple subtasks.
If you are showing the downstream operator, then the metric would refer to
the HASH and not REBALANCE.

On Tue, Dec 14, 2021 at 2:55 AM Caizhi Weng  wrote:

> Hi!
>
> This doesn't seem to be the expected behavior. Rebalance shuffle should
> send records to one of the parallelism, not all.
>
> If possible could you please explain what your Flink job is doing and
> preferably share your user code so that others can look into this case?
>
> tao xiao  于2021年12月11日周六 01:11写道:
>
>> Hi team,
>>
>> I have one operator that is connected to another 9 downstream operators
>> using rebalance. Each operator has 150 parallelisms[1]. I assume each
>> message in the upstream operation is sent to one of the parallel instances
>> of the 9 receiving operators so the total bytes sent should be roughly 9
>> times of bytes received in the upstream operator metric. However the Flink
>> UI shows the bytes sent is much higher than 9 times. It is about 150 * 9 *
>> bytes received[2]. This looks to me like every message is duplicated to
>> each parallel instance of all receiving operators like what broadcast
>> does.  Is this correct?
>>
>>
>>
>> [1] https://imgur.com/cGyb0QO
>> [2] https://imgur.com/SFqPiJA
>> --
>> Regards,
>> Tao
>>
>


Re: Confusion about rebalance bytes sent metric in Flink UI

2021-12-13 Thread Caizhi Weng
Hi!

This doesn't seem to be the expected behavior. Rebalance shuffle should
send records to one of the parallelism, not all.

If possible could you please explain what your Flink job is doing and
preferably share your user code so that others can look into this case?

tao xiao  于2021年12月11日周六 01:11写道:

> Hi team,
>
> I have one operator that is connected to another 9 downstream operators
> using rebalance. Each operator has 150 parallelisms[1]. I assume each
> message in the upstream operation is sent to one of the parallel instances
> of the 9 receiving operators so the total bytes sent should be roughly 9
> times of bytes received in the upstream operator metric. However the Flink
> UI shows the bytes sent is much higher than 9 times. It is about 150 * 9 *
> bytes received[2]. This looks to me like every message is duplicated to
> each parallel instance of all receiving operators like what broadcast
> does.  Is this correct?
>
>
>
> [1] https://imgur.com/cGyb0QO
> [2] https://imgur.com/SFqPiJA
> --
> Regards,
> Tao
>


Confusion about rebalance bytes sent metric in Flink UI

2021-12-10 Thread tao xiao
Hi team,

I have one operator that is connected to another 9 downstream operators
using rebalance. Each operator has 150 parallelisms[1]. I assume each
message in the upstream operation is sent to one of the parallel instances
of the 9 receiving operators so the total bytes sent should be roughly 9
times of bytes received in the upstream operator metric. However the Flink
UI shows the bytes sent is much higher than 9 times. It is about 150 * 9 *
bytes received[2]. This looks to me like every message is duplicated to
each parallel instance of all receiving operators like what broadcast
does.  Is this correct?



[1] https://imgur.com/cGyb0QO
[2] https://imgur.com/SFqPiJA
-- 
Regards,
Tao