Hi weizheng,

IMHO, I do not know where is not clear to you? Is the result not correct?
Can you share the correct result based on your understanding?

The "keyBy" specifies group field and min/max do the aggregation in the
other field based on the position you specified.

Best,
Vino

Lu Weizheng <luweizhen...@hotmail.com> 于2019年12月19日周四 下午5:00写道:

> Hi all,
>
> On a KeyedStream, when I use maxBy or minBy, I will get the max or min
> element. It means other fields will be kept as the max or min element. This
> is quite clear. However, when I use max or min, how do Flink do on other
> fields?
>
> val tupleStream = senv.fromElements(
>   (0, 0, 0), (0, 1, 1), (0, 2, 2),
>   (1, 0, 6), (1, 1, 7), (1, 2, 8)
> )
> //  (0,0,0)
> //  (0,0,1)
> //  (0,0,2)
> //  (1,0,6)
> //  (1,0,7)
> //  (1,0,8)
> val maxByStream = tupleStream.keyBy(0).max(2).print()
>
> In this case, the second field use the first element's 0.
>
> class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{
>
>   var isRunning: Boolean = true
>   var i = 0
>
>   val rand = new Random()
>
>   override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = {
>
>     while (isRunning) {
>
>       // 将数据源收集写入SourceContext
>       srcCtx.collect((0, i, i))
>       i += 1
>       Thread.sleep(1000)
>     }
>   }
>
>   override def cancel(): Unit = {
>     isRunning = false
>   }
> }
>
> //(0,0,0)
> //(0,1,2)
> //(0,3,4)
> //(0,5,6)
> //(0,7,8)
> //(0,9,10)
>
> val maxWindowStream = senv.addSource(new IntTupleSource)
>   .keyBy(0)
>   .timeWindow(Time.milliseconds(2000))
>   .max(2).print()
>
>
>
> In this case, the result is not so clear...
>
> So, for max and min, the two operator can not make sure the result of
> other fields ?
>
> Thank you so much if anyone can replay.
>
> Weizheng
>

Reply via email to