Re: Confusion about rebalance bytes sent metric in Flink UI
Hi Tao, Could you prepare a minimalistic example that would reproduce this issue? Also what Flink version are you using? Best, Piotrek czw., 16 gru 2021 o 09:44 tao xiao napisał(a): > >Your upstream is not inflating the record size? > No, this is a simply dedup function > > On Thu, Dec 16, 2021 at 2:49 PM Arvid Heise wrote: > >> Ah yes I see it now as well. Yes you are right, each record should be >> replicated 9 times to send to one of the instances each. Your upstream is >> not inflating the record size? The number of records seems to work >> decently. @pnowojski FYI. >> >> On Thu, Dec 16, 2021 at 2:20 AM tao xiao wrote: >> >>> Hi Arvid >>> >>> The second picture shows the metrics of the upstream operator. The >>> upstream has 150 parallelisms as you can see in the first picture. I expect >>> the bytes sent is about 9 * bytes received as we have 9 downstream >>> operators connecting. >>> >>> Hi Caizhi, >>> Let me create a minimal reproducible DAG and update here >>> >>> On Thu, Dec 16, 2021 at 4:03 AM Arvid Heise wrote: >>> Hi, Could you please clarify which operator we see in the second picture? If you are showing the upstream operator, then this has only parallelism 1, so there shouldn't be multiple subtasks. If you are showing the downstream operator, then the metric would refer to the HASH and not REBALANCE. On Tue, Dec 14, 2021 at 2:55 AM Caizhi Weng wrote: > Hi! > > This doesn't seem to be the expected behavior. Rebalance shuffle > should send records to one of the parallelism, not all. > > If possible could you please explain what your Flink job is doing and > preferably share your user code so that others can look into this case? > > tao xiao 于2021年12月11日周六 01:11写道: > >> Hi team, >> >> I have one operator that is connected to another 9 downstream >> operators using rebalance. Each operator has 150 parallelisms[1]. I >> assume >> each message in the upstream operation is sent to one of the parallel >> instances of the 9 receiving operators so the total bytes sent should be >> roughly 9 times of bytes received in the upstream operator metric. >> However >> the Flink UI shows the bytes sent is much higher than 9 times. It is >> about >> 150 * 9 * bytes received[2]. This looks to me like every message is >> duplicated to each parallel instance of all receiving operators like what >> broadcast does. Is this correct? >> >> >> >> [1] https://imgur.com/cGyb0QO >> [2] https://imgur.com/SFqPiJA >> -- >> Regards, >> Tao >> > >>> >>> -- >>> Regards, >>> Tao >>> >> > > -- > Regards, > Tao >
Re: Confusion about rebalance bytes sent metric in Flink UI
>Your upstream is not inflating the record size? No, this is a simply dedup function On Thu, Dec 16, 2021 at 2:49 PM Arvid Heise wrote: > Ah yes I see it now as well. Yes you are right, each record should be > replicated 9 times to send to one of the instances each. Your upstream is > not inflating the record size? The number of records seems to work > decently. @pnowojski FYI. > > On Thu, Dec 16, 2021 at 2:20 AM tao xiao wrote: > >> Hi Arvid >> >> The second picture shows the metrics of the upstream operator. The >> upstream has 150 parallelisms as you can see in the first picture. I expect >> the bytes sent is about 9 * bytes received as we have 9 downstream >> operators connecting. >> >> Hi Caizhi, >> Let me create a minimal reproducible DAG and update here >> >> On Thu, Dec 16, 2021 at 4:03 AM Arvid Heise wrote: >> >>> Hi, >>> >>> Could you please clarify which operator we see in the second picture? >>> >>> If you are showing the upstream operator, then this has only parallelism >>> 1, so there shouldn't be multiple subtasks. >>> If you are showing the downstream operator, then the metric would refer >>> to the HASH and not REBALANCE. >>> >>> On Tue, Dec 14, 2021 at 2:55 AM Caizhi Weng >>> wrote: >>> Hi! This doesn't seem to be the expected behavior. Rebalance shuffle should send records to one of the parallelism, not all. If possible could you please explain what your Flink job is doing and preferably share your user code so that others can look into this case? tao xiao 于2021年12月11日周六 01:11写道: > Hi team, > > I have one operator that is connected to another 9 downstream > operators using rebalance. Each operator has 150 parallelisms[1]. I assume > each message in the upstream operation is sent to one of the parallel > instances of the 9 receiving operators so the total bytes sent should be > roughly 9 times of bytes received in the upstream operator metric. However > the Flink UI shows the bytes sent is much higher than 9 times. It is about > 150 * 9 * bytes received[2]. This looks to me like every message is > duplicated to each parallel instance of all receiving operators like what > broadcast does. Is this correct? > > > > [1] https://imgur.com/cGyb0QO > [2] https://imgur.com/SFqPiJA > -- > Regards, > Tao > >> >> -- >> Regards, >> Tao >> > -- Regards, Tao
Re: Confusion about rebalance bytes sent metric in Flink UI
Ah yes I see it now as well. Yes you are right, each record should be replicated 9 times to send to one of the instances each. Your upstream is not inflating the record size? The number of records seems to work decently. @pnowojski FYI. On Thu, Dec 16, 2021 at 2:20 AM tao xiao wrote: > Hi Arvid > > The second picture shows the metrics of the upstream operator. The > upstream has 150 parallelisms as you can see in the first picture. I expect > the bytes sent is about 9 * bytes received as we have 9 downstream > operators connecting. > > Hi Caizhi, > Let me create a minimal reproducible DAG and update here > > On Thu, Dec 16, 2021 at 4:03 AM Arvid Heise wrote: > >> Hi, >> >> Could you please clarify which operator we see in the second picture? >> >> If you are showing the upstream operator, then this has only parallelism >> 1, so there shouldn't be multiple subtasks. >> If you are showing the downstream operator, then the metric would refer >> to the HASH and not REBALANCE. >> >> On Tue, Dec 14, 2021 at 2:55 AM Caizhi Weng wrote: >> >>> Hi! >>> >>> This doesn't seem to be the expected behavior. Rebalance shuffle should >>> send records to one of the parallelism, not all. >>> >>> If possible could you please explain what your Flink job is doing and >>> preferably share your user code so that others can look into this case? >>> >>> tao xiao 于2021年12月11日周六 01:11写道: >>> Hi team, I have one operator that is connected to another 9 downstream operators using rebalance. Each operator has 150 parallelisms[1]. I assume each message in the upstream operation is sent to one of the parallel instances of the 9 receiving operators so the total bytes sent should be roughly 9 times of bytes received in the upstream operator metric. However the Flink UI shows the bytes sent is much higher than 9 times. It is about 150 * 9 * bytes received[2]. This looks to me like every message is duplicated to each parallel instance of all receiving operators like what broadcast does. Is this correct? [1] https://imgur.com/cGyb0QO [2] https://imgur.com/SFqPiJA -- Regards, Tao >>> > > -- > Regards, > Tao >
Re: Confusion about rebalance bytes sent metric in Flink UI
Hi Arvid The second picture shows the metrics of the upstream operator. The upstream has 150 parallelisms as you can see in the first picture. I expect the bytes sent is about 9 * bytes received as we have 9 downstream operators connecting. Hi Caizhi, Let me create a minimal reproducible DAG and update here On Thu, Dec 16, 2021 at 4:03 AM Arvid Heise wrote: > Hi, > > Could you please clarify which operator we see in the second picture? > > If you are showing the upstream operator, then this has only parallelism > 1, so there shouldn't be multiple subtasks. > If you are showing the downstream operator, then the metric would refer to > the HASH and not REBALANCE. > > On Tue, Dec 14, 2021 at 2:55 AM Caizhi Weng wrote: > >> Hi! >> >> This doesn't seem to be the expected behavior. Rebalance shuffle should >> send records to one of the parallelism, not all. >> >> If possible could you please explain what your Flink job is doing and >> preferably share your user code so that others can look into this case? >> >> tao xiao 于2021年12月11日周六 01:11写道: >> >>> Hi team, >>> >>> I have one operator that is connected to another 9 downstream operators >>> using rebalance. Each operator has 150 parallelisms[1]. I assume each >>> message in the upstream operation is sent to one of the parallel instances >>> of the 9 receiving operators so the total bytes sent should be roughly 9 >>> times of bytes received in the upstream operator metric. However the Flink >>> UI shows the bytes sent is much higher than 9 times. It is about 150 * 9 * >>> bytes received[2]. This looks to me like every message is duplicated to >>> each parallel instance of all receiving operators like what broadcast >>> does. Is this correct? >>> >>> >>> >>> [1] https://imgur.com/cGyb0QO >>> [2] https://imgur.com/SFqPiJA >>> -- >>> Regards, >>> Tao >>> >> -- Regards, Tao
Re: Confusion about rebalance bytes sent metric in Flink UI
Hi, Could you please clarify which operator we see in the second picture? If you are showing the upstream operator, then this has only parallelism 1, so there shouldn't be multiple subtasks. If you are showing the downstream operator, then the metric would refer to the HASH and not REBALANCE. On Tue, Dec 14, 2021 at 2:55 AM Caizhi Weng wrote: > Hi! > > This doesn't seem to be the expected behavior. Rebalance shuffle should > send records to one of the parallelism, not all. > > If possible could you please explain what your Flink job is doing and > preferably share your user code so that others can look into this case? > > tao xiao 于2021年12月11日周六 01:11写道: > >> Hi team, >> >> I have one operator that is connected to another 9 downstream operators >> using rebalance. Each operator has 150 parallelisms[1]. I assume each >> message in the upstream operation is sent to one of the parallel instances >> of the 9 receiving operators so the total bytes sent should be roughly 9 >> times of bytes received in the upstream operator metric. However the Flink >> UI shows the bytes sent is much higher than 9 times. It is about 150 * 9 * >> bytes received[2]. This looks to me like every message is duplicated to >> each parallel instance of all receiving operators like what broadcast >> does. Is this correct? >> >> >> >> [1] https://imgur.com/cGyb0QO >> [2] https://imgur.com/SFqPiJA >> -- >> Regards, >> Tao >> >
Re: Confusion about rebalance bytes sent metric in Flink UI
Hi! This doesn't seem to be the expected behavior. Rebalance shuffle should send records to one of the parallelism, not all. If possible could you please explain what your Flink job is doing and preferably share your user code so that others can look into this case? tao xiao 于2021年12月11日周六 01:11写道: > Hi team, > > I have one operator that is connected to another 9 downstream operators > using rebalance. Each operator has 150 parallelisms[1]. I assume each > message in the upstream operation is sent to one of the parallel instances > of the 9 receiving operators so the total bytes sent should be roughly 9 > times of bytes received in the upstream operator metric. However the Flink > UI shows the bytes sent is much higher than 9 times. It is about 150 * 9 * > bytes received[2]. This looks to me like every message is duplicated to > each parallel instance of all receiving operators like what broadcast > does. Is this correct? > > > > [1] https://imgur.com/cGyb0QO > [2] https://imgur.com/SFqPiJA > -- > Regards, > Tao >
Confusion about rebalance bytes sent metric in Flink UI
Hi team, I have one operator that is connected to another 9 downstream operators using rebalance. Each operator has 150 parallelisms[1]. I assume each message in the upstream operation is sent to one of the parallel instances of the 9 receiving operators so the total bytes sent should be roughly 9 times of bytes received in the upstream operator metric. However the Flink UI shows the bytes sent is much higher than 9 times. It is about 150 * 9 * bytes received[2]. This looks to me like every message is duplicated to each parallel instance of all receiving operators like what broadcast does. Is this correct? [1] https://imgur.com/cGyb0QO [2] https://imgur.com/SFqPiJA -- Regards, Tao