Hi Yan After a second thought I think you are right, the downstream operator should keep the order of the same key from the same upstream. So feel free to open a jira. Best, Guowei
On Wed, Nov 3, 2021 at 7:30 PM Yan Shen <leey...@gmail.com> wrote: > Hi, > > It will complicate things a lot if we cannot assume input order of any > operator after a keyBy. So far I only have the problem with countWindow > which I seem to be able to avoid by writing my own stateful KeyedProcess. > Are there other operators which might cause the same problem? > > The other alternative is not to use batch mode, but the problem is that I > wont know when a batch job finishes if I don't run it in batch mode since a > streaming process will never end. > > Thanks. > > On Wed, Nov 3, 2021 at 4:38 PM Guowei Ma <guowei....@gmail.com> wrote: > >> Hi, Yan >> I do not think it is a bug. Maybe we could not assume the input's order >> of an operator simply. >> Best, >> Guowei >> >> >> On Wed, Nov 3, 2021 at 3:10 PM Yan Shen <leey...@gmail.com> wrote: >> >>> Yes, it does not happen in streaming mode. Is this considered a bug or >>> is it by design? >>> >>> Thanks! >>> >>> On Wed, Nov 3, 2021 at 1:58 PM Guowei Ma <guowei....@gmail.com> wrote: >>> >>>> Hi >>>> >>>> I did not run your program directly, but I see that you are now using >>>> the Batch execution mode. I suspect it is related to this, because in the >>>> Batch execution mode FLINK will "sort" the Key (this might be an unstable >>>> sort). >>>> So would you like to experiment with the results of running with >>>> Streaming mode and to see what happens? >>>> >>>> Best, >>>> Guowei >>>> >>>> >>>> On Wed, Nov 3, 2021 at 12:16 AM Yan Shen <leey...@gmail.com> wrote: >>>> >>>>> Hi all, >>>>> >>>>> Can anyone advise on this? >>>>> >>>>> I wrote a simple test of the countWindow method (in Kotlin) as below >>>>> >>>>> package aero.airlab.flinkjobs.headingreminder >>>>> >>>>> import org.apache.flink.api.common.RuntimeExecutionMode >>>>> import org.apache.flink.api.common.eventtime.WatermarkStrategy >>>>> import >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >>>>> import kotlin.random.Random >>>>> >>>>> object CountWindowTest { >>>>> @JvmStatic >>>>> fun main(args: Array<String>) { >>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment() >>>>> env.setRuntimeMode(RuntimeExecutionMode.BATCH) >>>>> >>>>> val rand = Random(0) >>>>> val data = (0..1000).map { Pair(rand.nextInt(10), it) } >>>>> env.fromCollection(data).assignTimestampsAndWatermarks( >>>>> WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>() >>>>> .withTimestampAssigner { e, _ -> e.second.toLong() }) >>>>> .keyBy { it.first } >>>>> .countWindow(3L, 1) >>>>> .reduce { a, b -> b } >>>>> .keyBy { it.first } >>>>> .filter { it.first == 5 } >>>>> .print() >>>>> >>>>> env.execute() >>>>> } >>>>> } >>>>> >>>>> >>>>> The beginning of the output is as such >>>>> >>>>> 12> (5, 184) >>>>> 12> (5, 18) >>>>> 12> (5, 29) >>>>> 12> (5, 37) >>>>> 12> (5, 38) >>>>> 12> (5, 112) >>>>> 12> (5, 131) >>>>> >>>>> The first line (5, 184) is not in order from the rest. >>>>> >>>>> Is this a bug? The problem disappears if I remove the keyBy after the >>>>> reduce. >>>>> >>>>> Thanks. >>>>> >>>>