Hi weizheng, IMHO, I do not know where is not clear to you? Is the result not correct? Can you share the correct result based on your understanding?
The "keyBy" specifies group field and min/max do the aggregation in the other field based on the position you specified. Best, Vino Lu Weizheng <luweizhen...@hotmail.com> 于2019年12月19日周四 下午5:00写道: > Hi all, > > On a KeyedStream, when I use maxBy or minBy, I will get the max or min > element. It means other fields will be kept as the max or min element. This > is quite clear. However, when I use max or min, how do Flink do on other > fields? > > val tupleStream = senv.fromElements( > (0, 0, 0), (0, 1, 1), (0, 2, 2), > (1, 0, 6), (1, 1, 7), (1, 2, 8) > ) > // (0,0,0) > // (0,0,1) > // (0,0,2) > // (1,0,6) > // (1,0,7) > // (1,0,8) > val maxByStream = tupleStream.keyBy(0).max(2).print() > > In this case, the second field use the first element's 0. > > class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{ > > var isRunning: Boolean = true > var i = 0 > > val rand = new Random() > > override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = { > > while (isRunning) { > > // 将数据源收集写入SourceContext > srcCtx.collect((0, i, i)) > i += 1 > Thread.sleep(1000) > } > } > > override def cancel(): Unit = { > isRunning = false > } > } > > //(0,0,0) > //(0,1,2) > //(0,3,4) > //(0,5,6) > //(0,7,8) > //(0,9,10) > > val maxWindowStream = senv.addSource(new IntTupleSource) > .keyBy(0) > .timeWindow(Time.milliseconds(2000)) > .max(2).print() > > > > In this case, the result is not so clear... > > So, for max and min, the two operator can not make sure the result of > other fields ? > > Thank you so much if anyone can replay. > > Weizheng >