回复: DataStream API min max aggregation on other fields
Yes, the unpredictable non-key and non-aggregated fields make me confused. As Biao said, It is because the purged window state. So when I want to use max or min, I should only use aggregated field. Other fields are not defined, I should take care not use them. Thank you guys for your replies! 发件人: Biao Liu 发送时间: 2019年12月19日 18:10 收件人: vino yang 抄送: Lu Weizheng ; user@flink.apache.org 主题: Re: DataStream API min max aggregation on other fields Hi Lu, @vino yang<mailto:yanghua1...@gmail.com> I think what he means is that the "max" semantics between window and non-window are different. It changes non-aggregated fields unpredictably. That's really an interesting question. I take a look at the relevant implementation. From the perspective of codes, "max" always keeps the non-aggregated fields with the value of first arrived record, which should be (0, 0, x) in this case. However when the window is purged, the state (which keeps non-aggregated fields of first arrived record and the maximum field) will be cleared. That means the "first arrived record" will always be reset when a window is purged. That's why the second column increases unpredictably. The semantics here is so confused to me. Thanks, Biao /'bɪ.aʊ/ On Thu, 19 Dec 2019 at 17:50, vino yang mailto:yanghua1...@gmail.com>> wrote: Hi weizheng, IMHO, I do not know where is not clear to you? Is the result not correct? Can you share the correct result based on your understanding? The "keyBy" specifies group field and min/max do the aggregation in the other field based on the position you specified. Best, Vino Lu Weizheng mailto:luweizhen...@hotmail.com>> 于2019年12月19日周四 下午5:00写道: Hi all, On a KeyedStream, when I use maxBy or minBy, I will get the max or min element. It means other fields will be kept as the max or min element. This is quite clear. However, when I use max or min, how do Flink do on other fields? val tupleStream = senv.fromElements( (0, 0, 0), (0, 1, 1), (0, 2, 2), (1, 0, 6), (1, 1, 7), (1, 2, 8) ) // (0,0,0) // (0,0,1) // (0,0,2) // (1,0,6) // (1,0,7) // (1,0,8) val maxByStream = tupleStream.keyBy(0).max(2).print() In this case, the second field use the first element's 0. class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{ var isRunning: Boolean = true var i = 0 val rand = new Random() override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = { while (isRunning) { // 将数据源收集写入SourceContext srcCtx.collect((0, i, i)) i += 1 Thread.sleep(1000) } } override def cancel(): Unit = { isRunning = false } } //(0,0,0) //(0,1,2) //(0,3,4) //(0,5,6) //(0,7,8) //(0,9,10) val maxWindowStream = senv.addSource(new IntTupleSource) .keyBy(0) .timeWindow(Time.milliseconds(2000)) .max(2).print() In this case, the result is not so clear... So, for max and min, the two operator can not make sure the result of other fields ? Thank you so much if anyone can replay. Weizheng
Re: DataStream API min max aggregation on other fields
Hi Lu, @vino yang I think what he means is that the "max" semantics between window and non-window are different. It changes non-aggregated fields unpredictably. That's really an interesting question. I take a look at the relevant implementation. From the perspective of codes, "max" always keeps the non-aggregated fields with the value of first arrived record, which should be (0, 0, x) in this case. However when the window is purged, the state (which keeps non-aggregated fields of first arrived record and the maximum field) will be cleared. That means the "first arrived record" will always be reset when a window is purged. That's why the second column increases unpredictably. The semantics here is so confused to me. Thanks, Biao /'bɪ.aʊ/ On Thu, 19 Dec 2019 at 17:50, vino yang wrote: > Hi weizheng, > > IMHO, I do not know where is not clear to you? Is the result not correct? > Can you share the correct result based on your understanding? > > The "keyBy" specifies group field and min/max do the aggregation in the > other field based on the position you specified. > > Best, > Vino > > Lu Weizheng 于2019年12月19日周四 下午5:00写道: > >> Hi all, >> >> On a KeyedStream, when I use maxBy or minBy, I will get the max or min >> element. It means other fields will be kept as the max or min element. This >> is quite clear. However, when I use max or min, how do Flink do on other >> fields? >> >> val tupleStream = senv.fromElements( >> (0, 0, 0), (0, 1, 1), (0, 2, 2), >> (1, 0, 6), (1, 1, 7), (1, 2, 8) >> ) >> // (0,0,0) >> // (0,0,1) >> // (0,0,2) >> // (1,0,6) >> // (1,0,7) >> // (1,0,8) >> val maxByStream = tupleStream.keyBy(0).max(2).print() >> >> In this case, the second field use the first element's 0. >> >> class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{ >> >> var isRunning: Boolean = true >> var i = 0 >> >> val rand = new Random() >> >> override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = { >> >> while (isRunning) { >> >> // 将数据源收集写入SourceContext >> srcCtx.collect((0, i, i)) >> i += 1 >> Thread.sleep(1000) >> } >> } >> >> override def cancel(): Unit = { >> isRunning = false >> } >> } >> >> //(0,0,0) >> //(0,1,2) >> //(0,3,4) >> //(0,5,6) >> //(0,7,8) >> //(0,9,10) >> >> val maxWindowStream = senv.addSource(new IntTupleSource) >> .keyBy(0) >> .timeWindow(Time.milliseconds(2000)) >> .max(2).print() >> >> >> >> In this case, the result is not so clear... >> >> So, for max and min, the two operator can not make sure the result of >> other fields ? >> >> Thank you so much if anyone can replay. >> >> Weizheng >> >
Re: DataStream API min max aggregation on other fields
Hi weizheng, IMHO, I do not know where is not clear to you? Is the result not correct? Can you share the correct result based on your understanding? The "keyBy" specifies group field and min/max do the aggregation in the other field based on the position you specified. Best, Vino Lu Weizheng 于2019年12月19日周四 下午5:00写道: > Hi all, > > On a KeyedStream, when I use maxBy or minBy, I will get the max or min > element. It means other fields will be kept as the max or min element. This > is quite clear. However, when I use max or min, how do Flink do on other > fields? > > val tupleStream = senv.fromElements( > (0, 0, 0), (0, 1, 1), (0, 2, 2), > (1, 0, 6), (1, 1, 7), (1, 2, 8) > ) > // (0,0,0) > // (0,0,1) > // (0,0,2) > // (1,0,6) > // (1,0,7) > // (1,0,8) > val maxByStream = tupleStream.keyBy(0).max(2).print() > > In this case, the second field use the first element's 0. > > class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{ > > var isRunning: Boolean = true > var i = 0 > > val rand = new Random() > > override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = { > > while (isRunning) { > > // 将数据源收集写入SourceContext > srcCtx.collect((0, i, i)) > i += 1 > Thread.sleep(1000) > } > } > > override def cancel(): Unit = { > isRunning = false > } > } > > //(0,0,0) > //(0,1,2) > //(0,3,4) > //(0,5,6) > //(0,7,8) > //(0,9,10) > > val maxWindowStream = senv.addSource(new IntTupleSource) > .keyBy(0) > .timeWindow(Time.milliseconds(2000)) > .max(2).print() > > > > In this case, the result is not so clear... > > So, for max and min, the two operator can not make sure the result of > other fields ? > > Thank you so much if anyone can replay. > > Weizheng >
DataStream API min max aggregation on other fields
Hi all, On a KeyedStream, when I use maxBy or minBy, I will get the max or min element. It means other fields will be kept as the max or min element. This is quite clear. However, when I use max or min, how do Flink do on other fields? val tupleStream = senv.fromElements( (0, 0, 0), (0, 1, 1), (0, 2, 2), (1, 0, 6), (1, 1, 7), (1, 2, 8) ) // (0,0,0) // (0,0,1) // (0,0,2) // (1,0,6) // (1,0,7) // (1,0,8) val maxByStream = tupleStream.keyBy(0).max(2).print() In this case, the second field use the first element's 0. class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{ var isRunning: Boolean = true var i = 0 val rand = new Random() override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = { while (isRunning) { // 将数据源收集写入SourceContext srcCtx.collect((0, i, i)) i += 1 Thread.sleep(1000) } } override def cancel(): Unit = { isRunning = false } } //(0,0,0) //(0,1,2) //(0,3,4) //(0,5,6) //(0,7,8) //(0,9,10) val maxWindowStream = senv.addSource(new IntTupleSource) .keyBy(0) .timeWindow(Time.milliseconds(2000)) .max(2).print() In this case, the result is not so clear... So, for max and min, the two operator can not make sure the result of other fields ? Thank you so much if anyone can replay. Weizheng