[jira] [Commented] (FLINK-24767) A keyBy following countWindow does not preserve order within the same partition

2021-11-16 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1785#comment-1785
 ] 

Guowei Ma commented on FLINK-24767:
---

{code:java}
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

Collection> s = new ArrayList<>();
for (int i = 0; i < 10; i++) {
s.add(Tuple2.of(4, i));
s.add(Tuple2.of(5, i));
}

env.setParallelism(1);
env.fromCollection(s)
.keyBy((KeySelector, Integer>) value -> 
value.f0)
.print();
env.execute("test"); {code}
I think following code could also reproduce this problem.

> A keyBy following countWindow does not preserve order within the same 
> partition
> ---
>
> Key: FLINK-24767
> URL: https://issues.apache.org/jira/browse/FLINK-24767
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.13.3
>Reporter: Lee Y S
>Priority: Major
>
> I wrote a simple test of the countWindow method (in Kotlin) as below
> {code:java}
> import org.apache.flink.api.common.RuntimeExecutionMode
> import org.apache.flink.api.common.eventtime.WatermarkStrategy
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import kotlin.random.Randomobject
> CountWindowTest {
>   @JvmStatic
>   fun main(args: Array) {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment()
>     env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>     val rand = Random(0)
>     val data = (0..1000).map { Pair(rand.nextInt(10), it) }
>     env.fromCollection(data).assignTimestampsAndWatermarks(
>       WatermarkStrategy.forMonotonousTimestamps>()
>         .withTimestampAssigner { e, _ -> e.second.toLong() })
>   .keyBy { it.first }
>       .countWindow(3L, 1)
>   .reduce { a, b -> b }
>   .keyBy { it.first }
>   .filter { it.first == 5 }
>   .print()
> env.execute()
>   }
> }
> {code}
> The beginning of the output is as below
> 12> (5, 184)
>  12> (5, 18)
>  12> (5, 29)
>  12> (5, 37)
>  12> (5, 38)
>  12> (5, 112)
>  12> (5, 131)
> The first line (5, 184) is not in order from the rest.
> The problem disappears if I remove the keyBy after the reduce or use stream 
> mode instead of batch mode.
>   



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24767) A keyBy following countWindow does not preserve order within the same partition

2021-11-15 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17444280#comment-17444280
 ] 

Guowei Ma commented on FLINK-24767:
---

I think it might be that in the Batch execution mode FLINK will "sort" the Key 
and this might be an unstable sort.

> A keyBy following countWindow does not preserve order within the same 
> partition
> ---
>
> Key: FLINK-24767
> URL: https://issues.apache.org/jira/browse/FLINK-24767
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.13.3
>Reporter: Lee Y S
>Priority: Major
>
> I wrote a simple test of the countWindow method (in Kotlin) as below
> {code:java}
> import org.apache.flink.api.common.RuntimeExecutionMode
> import org.apache.flink.api.common.eventtime.WatermarkStrategy
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import kotlin.random.Randomobject
> CountWindowTest {
>   @JvmStatic
>   fun main(args: Array) {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment()
>     env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>     val rand = Random(0)
>     val data = (0..1000).map { Pair(rand.nextInt(10), it) }
>     env.fromCollection(data).assignTimestampsAndWatermarks(
>       WatermarkStrategy.forMonotonousTimestamps>()
>         .withTimestampAssigner { e, _ -> e.second.toLong() })
>   .keyBy { it.first }
>       .countWindow(3L, 1)
>   .reduce { a, b -> b }
>   .keyBy { it.first }
>   .filter { it.first == 5 }
>   .print()
> env.execute()
>   }
> }
> {code}
> The beginning of the output is as below
> 12> (5, 184)
>  12> (5, 18)
>  12> (5, 29)
>  12> (5, 37)
>  12> (5, 38)
>  12> (5, 112)
>  12> (5, 131)
> The first line (5, 184) is not in order from the rest.
> The problem disappears if I remove the keyBy after the reduce or use stream 
> mode instead of batch mode.
>   



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24767) A keyBy following countWindow does not preserve order within the same partition

2021-11-04 Thread Lee Y S (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17439030#comment-17439030
 ] 

Lee Y S commented on FLINK-24767:
-

Hi [~sjwiesman],

Because I set the random seed (0), the result should be the same for every run. 
So the problem should be reproducible consistently.

Regards.

> A keyBy following countWindow does not preserve order within the same 
> partition
> ---
>
> Key: FLINK-24767
> URL: https://issues.apache.org/jira/browse/FLINK-24767
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.13.3
>Reporter: Lee Y S
>Priority: Major
>
> I wrote a simple test of the countWindow method (in Kotlin) as below
> {code:java}
> import org.apache.flink.api.common.RuntimeExecutionMode
> import org.apache.flink.api.common.eventtime.WatermarkStrategy
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import kotlin.random.Randomobject
> CountWindowTest {
>   @JvmStatic
>   fun main(args: Array) {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment()
>     env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>     val rand = Random(0)
>     val data = (0..1000).map { Pair(rand.nextInt(10), it) }
>     env.fromCollection(data).assignTimestampsAndWatermarks(
>       WatermarkStrategy.forMonotonousTimestamps>()
>         .withTimestampAssigner { e, _ -> e.second.toLong() })
>   .keyBy { it.first }
>       .countWindow(3L, 1)
>   .reduce { a, b -> b }
>   .keyBy { it.first }
>   .filter { it.first == 5 }
>   .print()
> env.execute()
>   }
> }
> {code}
> The beginning of the output is as below
> 12> (5, 184)
>  12> (5, 18)
>  12> (5, 29)
>  12> (5, 37)
>  12> (5, 38)
>  12> (5, 112)
>  12> (5, 131)
> The first line (5, 184) is not in order from the rest.
> The problem disappears if I remove the keyBy after the reduce or use stream 
> mode instead of batch mode.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24767) A keyBy following countWindow does not preserve order within the same partition

2021-11-04 Thread Seth Wiesman (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438953#comment-17438953
 ] 

Seth Wiesman commented on FLINK-24767:
--

When executing a `keyBy` under batch execution, the DataStream API groups 
elements by key using a sort[1]. I have not investigated, but my suspicion is 
this is not a stable sort and you are seeing the result of that. [~dwysakowicz] 
I'm guessing this is expected? 

 

 
[1]https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/execution_mode/#state-backends--state

> A keyBy following countWindow does not preserve order within the same 
> partition
> ---
>
> Key: FLINK-24767
> URL: https://issues.apache.org/jira/browse/FLINK-24767
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.13.3
>Reporter: Lee Y S
>Priority: Major
>
> I wrote a simple test of the countWindow method (in Kotlin) as below
> {code:java}
> import org.apache.flink.api.common.RuntimeExecutionMode
> import org.apache.flink.api.common.eventtime.WatermarkStrategy
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import kotlin.random.Randomobject
> CountWindowTest {
>   @JvmStatic
>   fun main(args: Array) {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment()
>     env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>     val rand = Random(0)
>     val data = (0..1000).map { Pair(rand.nextInt(10), it) }
>     env.fromCollection(data).assignTimestampsAndWatermarks(
>       WatermarkStrategy.forMonotonousTimestamps>()
>         .withTimestampAssigner { e, _ -> e.second.toLong() })
>   .keyBy { it.first }
>       .countWindow(3L, 1)
>   .reduce { a, b -> b }
>   .keyBy { it.first }
>   .filter { it.first == 5 }
>   .print()
> env.execute()
>   }
> }
> {code}
> The beginning of the output is as below
> 12> (5, 184)
>  12> (5, 18)
>  12> (5, 29)
>  12> (5, 37)
>  12> (5, 38)
>  12> (5, 112)
>  12> (5, 131)
> The first line (5, 184) is not in order from the rest.
> The problem disappears if I remove the keyBy after the reduce or use stream 
> mode instead of batch mode.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24767) A keyBy following countWindow does not preserve order within the same partition

2021-11-04 Thread Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438663#comment-17438663
 ] 

Liu commented on FLINK-24767:
-

It seems like that the behavior only happens in batch mode.

> A keyBy following countWindow does not preserve order within the same 
> partition
> ---
>
> Key: FLINK-24767
> URL: https://issues.apache.org/jira/browse/FLINK-24767
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.13.3
>Reporter: Lee Y S
>Priority: Major
>
> I wrote a simple test of the countWindow method (in Kotlin) as below
> {code:java}
> import org.apache.flink.api.common.RuntimeExecutionMode
> import org.apache.flink.api.common.eventtime.WatermarkStrategy
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import kotlin.random.Randomobject
> CountWindowTest {
>   @JvmStatic
>   fun main(args: Array) {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment()
>     env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>     val rand = Random(0)
>     val data = (0..1000).map { Pair(rand.nextInt(10), it) }
>     env.fromCollection(data).assignTimestampsAndWatermarks(
>       WatermarkStrategy.forMonotonousTimestamps>()
>         .withTimestampAssigner { e, _ -> e.second.toLong() })
>   .keyBy { it.first }
>       .countWindow(3L, 1)
>   .reduce { a, b -> b }
>   .keyBy { it.first }
>   .filter { it.first == 5 }
>   .print()
> env.execute()
>   }
> }
> {code}
> The beginning of the output is as below
> 12> (5, 184)
>  12> (5, 18)
>  12> (5, 29)
>  12> (5, 37)
>  12> (5, 38)
>  12> (5, 112)
>  12> (5, 131)
> The first line (5, 184) is not in order from the rest.
> The problem disappears if I remove the keyBy after the reduce or use stream 
> mode instead of batch mode.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24767) A keyBy following countWindow does not preserve order within the same partition

2021-11-04 Thread Fabian Paul (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438650#comment-17438650
 ] 

Fabian Paul commented on FLINK-24767:
-

[~guoweima] can you take a look at this?

> A keyBy following countWindow does not preserve order within the same 
> partition
> ---
>
> Key: FLINK-24767
> URL: https://issues.apache.org/jira/browse/FLINK-24767
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.13.3
>Reporter: Lee Y S
>Priority: Major
>
> I wrote a simple test of the countWindow method (in Kotlin) as below
> {code:java}
> import org.apache.flink.api.common.RuntimeExecutionMode
> import org.apache.flink.api.common.eventtime.WatermarkStrategy
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import kotlin.random.Randomobject
> CountWindowTest {
>   @JvmStatic
>   fun main(args: Array) {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment()
>     env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>     val rand = Random(0)
>     val data = (0..1000).map { Pair(rand.nextInt(10), it) }
>     env.fromCollection(data).assignTimestampsAndWatermarks(
>       WatermarkStrategy.forMonotonousTimestamps>()
>         .withTimestampAssigner { e, _ -> e.second.toLong() })
>   .keyBy { it.first }
>       .countWindow(3L, 1)
>   .reduce { a, b -> b }
>   .keyBy { it.first }
>   .filter { it.first == 5 }
>   .print()
> env.execute()
>   }
> }
> {code}
> The beginning of the output is as below
> 12> (5, 184)
>  12> (5, 18)
>  12> (5, 29)
>  12> (5, 37)
>  12> (5, 38)
>  12> (5, 112)
>  12> (5, 131)
> The first line (5, 184) is not in order from the rest.
> The problem disappears if I remove the keyBy after the reduce or use stream 
> mode instead of batch mode.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)