Yes, it does not happen in streaming mode. Is this considered a bug or is it by design?
Thanks! On Wed, Nov 3, 2021 at 1:58 PM Guowei Ma <guowei....@gmail.com> wrote: > Hi > > I did not run your program directly, but I see that you are now using the > Batch execution mode. I suspect it is related to this, because in the Batch > execution mode FLINK will "sort" the Key (this might be an unstable sort). > So would you like to experiment with the results of running with Streaming > mode and to see what happens? > > Best, > Guowei > > > On Wed, Nov 3, 2021 at 12:16 AM Yan Shen <leey...@gmail.com> wrote: > >> Hi all, >> >> Can anyone advise on this? >> >> I wrote a simple test of the countWindow method (in Kotlin) as below >> >> package aero.airlab.flinkjobs.headingreminder >> >> import org.apache.flink.api.common.RuntimeExecutionMode >> import org.apache.flink.api.common.eventtime.WatermarkStrategy >> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >> import kotlin.random.Random >> >> object CountWindowTest { >> @JvmStatic >> fun main(args: Array<String>) { >> val env = StreamExecutionEnvironment.getExecutionEnvironment() >> env.setRuntimeMode(RuntimeExecutionMode.BATCH) >> >> val rand = Random(0) >> val data = (0..1000).map { Pair(rand.nextInt(10), it) } >> env.fromCollection(data).assignTimestampsAndWatermarks( >> WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>() >> .withTimestampAssigner { e, _ -> e.second.toLong() }) >> .keyBy { it.first } >> .countWindow(3L, 1) >> .reduce { a, b -> b } >> .keyBy { it.first } >> .filter { it.first == 5 } >> .print() >> >> env.execute() >> } >> } >> >> >> The beginning of the output is as such >> >> 12> (5, 184) >> 12> (5, 18) >> 12> (5, 29) >> 12> (5, 37) >> 12> (5, 38) >> 12> (5, 112) >> 12> (5, 131) >> >> The first line (5, 184) is not in order from the rest. >> >> Is this a bug? The problem disappears if I remove the keyBy after the >> reduce. >> >> Thanks. >> >