Hi Yan
After a second thought I think you are right, the downstream operator
should keep the order of the same key from the same upstream. So feel free
to open a jira.
Best,
Guowei


On Wed, Nov 3, 2021 at 7:30 PM Yan Shen <leey...@gmail.com> wrote:

> Hi,
>
> It will complicate things a lot if we cannot assume input order of any
> operator after a keyBy. So far I only have the problem with countWindow
> which I seem to be able to avoid by writing my own stateful KeyedProcess.
> Are there other operators which might cause the same problem?
>
> The other alternative is not to use batch mode, but the problem is that I
> wont know when a batch job finishes if I don't run it in batch mode since a
> streaming process will never end.
>
> Thanks.
>
> On Wed, Nov 3, 2021 at 4:38 PM Guowei Ma <guowei....@gmail.com> wrote:
>
>> Hi, Yan
>> I do not think it is a bug. Maybe we could not assume the input's order
>> of an operator simply.
>> Best,
>> Guowei
>>
>>
>> On Wed, Nov 3, 2021 at 3:10 PM Yan Shen <leey...@gmail.com> wrote:
>>
>>> Yes, it does not happen in streaming mode. Is this considered a bug or
>>> is it by design?
>>>
>>> Thanks!
>>>
>>> On Wed, Nov 3, 2021 at 1:58 PM Guowei Ma <guowei....@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> I did not run your program directly, but I see that you are now using
>>>> the Batch execution mode. I suspect it is related to this, because in the
>>>> Batch execution mode FLINK will "sort" the Key (this might be an unstable
>>>> sort).
>>>> So would you like to experiment with the results of running with
>>>> Streaming mode and to see what happens?
>>>>
>>>> Best,
>>>> Guowei
>>>>
>>>>
>>>> On Wed, Nov 3, 2021 at 12:16 AM Yan Shen <leey...@gmail.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> Can anyone advise on this?
>>>>>
>>>>> I wrote a simple test of the countWindow method (in Kotlin) as below
>>>>>
>>>>> package aero.airlab.flinkjobs.headingreminder
>>>>>
>>>>> import org.apache.flink.api.common.RuntimeExecutionMode
>>>>> import org.apache.flink.api.common.eventtime.WatermarkStrategy
>>>>> import 
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>>>>> import kotlin.random.Random
>>>>>
>>>>> object CountWindowTest {
>>>>>     @JvmStatic
>>>>>     fun main(args: Array<String>) {
>>>>>         val env = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>>         env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>>>>>
>>>>>         val rand = Random(0)
>>>>>         val data = (0..1000).map { Pair(rand.nextInt(10), it) }
>>>>>         env.fromCollection(data).assignTimestampsAndWatermarks(
>>>>>             WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>()
>>>>>                 .withTimestampAssigner { e, _ -> e.second.toLong() })
>>>>>             .keyBy { it.first }
>>>>>             .countWindow(3L, 1)
>>>>>             .reduce { a, b -> b }
>>>>>             .keyBy { it.first }
>>>>>             .filter { it.first == 5 }
>>>>>             .print()
>>>>>
>>>>>         env.execute()
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>> The beginning of the output is as such
>>>>>
>>>>> 12> (5, 184)
>>>>> 12> (5, 18)
>>>>> 12> (5, 29)
>>>>> 12> (5, 37)
>>>>> 12> (5, 38)
>>>>> 12> (5, 112)
>>>>> 12> (5, 131)
>>>>>
>>>>> The first line (5, 184) is not in order from the rest.
>>>>>
>>>>> Is this a bug? The problem disappears if I remove the keyBy after the
>>>>> reduce.
>>>>>
>>>>> Thanks.
>>>>>
>>>>

Reply via email to