[jira] [Commented] (FLINK-24767) A keyBy following countWindow does not preserve order within the same partition
[ https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1785#comment-1785 ] Guowei Ma commented on FLINK-24767: --- {code:java} env.setRuntimeMode(RuntimeExecutionMode.BATCH); Collection> s = new ArrayList<>(); for (int i = 0; i < 10; i++) { s.add(Tuple2.of(4, i)); s.add(Tuple2.of(5, i)); } env.setParallelism(1); env.fromCollection(s) .keyBy((KeySelector, Integer>) value -> value.f0) .print(); env.execute("test"); {code} I think following code could also reproduce this problem. > A keyBy following countWindow does not preserve order within the same > partition > --- > > Key: FLINK-24767 > URL: https://issues.apache.org/jira/browse/FLINK-24767 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.13.3 >Reporter: Lee Y S >Priority: Major > > I wrote a simple test of the countWindow method (in Kotlin) as below > {code:java} > import org.apache.flink.api.common.RuntimeExecutionMode > import org.apache.flink.api.common.eventtime.WatermarkStrategy > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment > import kotlin.random.Randomobject > CountWindowTest { > @JvmStatic > fun main(args: Array) { > val env = StreamExecutionEnvironment.getExecutionEnvironment() > env.setRuntimeMode(RuntimeExecutionMode.BATCH) > val rand = Random(0) > val data = (0..1000).map { Pair(rand.nextInt(10), it) } > env.fromCollection(data).assignTimestampsAndWatermarks( > WatermarkStrategy.forMonotonousTimestamps>() > .withTimestampAssigner { e, _ -> e.second.toLong() }) > .keyBy { it.first } > .countWindow(3L, 1) > .reduce { a, b -> b } > .keyBy { it.first } > .filter { it.first == 5 } > .print() > env.execute() > } > } > {code} > The beginning of the output is as below > 12> (5, 184) > 12> (5, 18) > 12> (5, 29) > 12> (5, 37) > 12> (5, 38) > 12> (5, 112) > 12> (5, 131) > The first line (5, 184) is not in order from the rest. > The problem disappears if I remove the keyBy after the reduce or use stream > mode instead of batch mode. > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-24767) A keyBy following countWindow does not preserve order within the same partition
[ https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17444280#comment-17444280 ] Guowei Ma commented on FLINK-24767: --- I think it might be that in the Batch execution mode FLINK will "sort" the Key and this might be an unstable sort. > A keyBy following countWindow does not preserve order within the same > partition > --- > > Key: FLINK-24767 > URL: https://issues.apache.org/jira/browse/FLINK-24767 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.13.3 >Reporter: Lee Y S >Priority: Major > > I wrote a simple test of the countWindow method (in Kotlin) as below > {code:java} > import org.apache.flink.api.common.RuntimeExecutionMode > import org.apache.flink.api.common.eventtime.WatermarkStrategy > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment > import kotlin.random.Randomobject > CountWindowTest { > @JvmStatic > fun main(args: Array) { > val env = StreamExecutionEnvironment.getExecutionEnvironment() > env.setRuntimeMode(RuntimeExecutionMode.BATCH) > val rand = Random(0) > val data = (0..1000).map { Pair(rand.nextInt(10), it) } > env.fromCollection(data).assignTimestampsAndWatermarks( > WatermarkStrategy.forMonotonousTimestamps>() > .withTimestampAssigner { e, _ -> e.second.toLong() }) > .keyBy { it.first } > .countWindow(3L, 1) > .reduce { a, b -> b } > .keyBy { it.first } > .filter { it.first == 5 } > .print() > env.execute() > } > } > {code} > The beginning of the output is as below > 12> (5, 184) > 12> (5, 18) > 12> (5, 29) > 12> (5, 37) > 12> (5, 38) > 12> (5, 112) > 12> (5, 131) > The first line (5, 184) is not in order from the rest. > The problem disappears if I remove the keyBy after the reduce or use stream > mode instead of batch mode. > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-24767) A keyBy following countWindow does not preserve order within the same partition
[ https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17439030#comment-17439030 ] Lee Y S commented on FLINK-24767: - Hi [~sjwiesman], Because I set the random seed (0), the result should be the same for every run. So the problem should be reproducible consistently. Regards. > A keyBy following countWindow does not preserve order within the same > partition > --- > > Key: FLINK-24767 > URL: https://issues.apache.org/jira/browse/FLINK-24767 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.13.3 >Reporter: Lee Y S >Priority: Major > > I wrote a simple test of the countWindow method (in Kotlin) as below > {code:java} > import org.apache.flink.api.common.RuntimeExecutionMode > import org.apache.flink.api.common.eventtime.WatermarkStrategy > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment > import kotlin.random.Randomobject > CountWindowTest { > @JvmStatic > fun main(args: Array) { > val env = StreamExecutionEnvironment.getExecutionEnvironment() > env.setRuntimeMode(RuntimeExecutionMode.BATCH) > val rand = Random(0) > val data = (0..1000).map { Pair(rand.nextInt(10), it) } > env.fromCollection(data).assignTimestampsAndWatermarks( > WatermarkStrategy.forMonotonousTimestamps>() > .withTimestampAssigner { e, _ -> e.second.toLong() }) > .keyBy { it.first } > .countWindow(3L, 1) > .reduce { a, b -> b } > .keyBy { it.first } > .filter { it.first == 5 } > .print() > env.execute() > } > } > {code} > The beginning of the output is as below > 12> (5, 184) > 12> (5, 18) > 12> (5, 29) > 12> (5, 37) > 12> (5, 38) > 12> (5, 112) > 12> (5, 131) > The first line (5, 184) is not in order from the rest. > The problem disappears if I remove the keyBy after the reduce or use stream > mode instead of batch mode. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-24767) A keyBy following countWindow does not preserve order within the same partition
[ https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438953#comment-17438953 ] Seth Wiesman commented on FLINK-24767: -- When executing a `keyBy` under batch execution, the DataStream API groups elements by key using a sort[1]. I have not investigated, but my suspicion is this is not a stable sort and you are seeing the result of that. [~dwysakowicz] I'm guessing this is expected? [1]https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/execution_mode/#state-backends--state > A keyBy following countWindow does not preserve order within the same > partition > --- > > Key: FLINK-24767 > URL: https://issues.apache.org/jira/browse/FLINK-24767 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.13.3 >Reporter: Lee Y S >Priority: Major > > I wrote a simple test of the countWindow method (in Kotlin) as below > {code:java} > import org.apache.flink.api.common.RuntimeExecutionMode > import org.apache.flink.api.common.eventtime.WatermarkStrategy > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment > import kotlin.random.Randomobject > CountWindowTest { > @JvmStatic > fun main(args: Array) { > val env = StreamExecutionEnvironment.getExecutionEnvironment() > env.setRuntimeMode(RuntimeExecutionMode.BATCH) > val rand = Random(0) > val data = (0..1000).map { Pair(rand.nextInt(10), it) } > env.fromCollection(data).assignTimestampsAndWatermarks( > WatermarkStrategy.forMonotonousTimestamps>() > .withTimestampAssigner { e, _ -> e.second.toLong() }) > .keyBy { it.first } > .countWindow(3L, 1) > .reduce { a, b -> b } > .keyBy { it.first } > .filter { it.first == 5 } > .print() > env.execute() > } > } > {code} > The beginning of the output is as below > 12> (5, 184) > 12> (5, 18) > 12> (5, 29) > 12> (5, 37) > 12> (5, 38) > 12> (5, 112) > 12> (5, 131) > The first line (5, 184) is not in order from the rest. > The problem disappears if I remove the keyBy after the reduce or use stream > mode instead of batch mode. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-24767) A keyBy following countWindow does not preserve order within the same partition
[ https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438663#comment-17438663 ] Liu commented on FLINK-24767: - It seems like that the behavior only happens in batch mode. > A keyBy following countWindow does not preserve order within the same > partition > --- > > Key: FLINK-24767 > URL: https://issues.apache.org/jira/browse/FLINK-24767 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.13.3 >Reporter: Lee Y S >Priority: Major > > I wrote a simple test of the countWindow method (in Kotlin) as below > {code:java} > import org.apache.flink.api.common.RuntimeExecutionMode > import org.apache.flink.api.common.eventtime.WatermarkStrategy > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment > import kotlin.random.Randomobject > CountWindowTest { > @JvmStatic > fun main(args: Array) { > val env = StreamExecutionEnvironment.getExecutionEnvironment() > env.setRuntimeMode(RuntimeExecutionMode.BATCH) > val rand = Random(0) > val data = (0..1000).map { Pair(rand.nextInt(10), it) } > env.fromCollection(data).assignTimestampsAndWatermarks( > WatermarkStrategy.forMonotonousTimestamps>() > .withTimestampAssigner { e, _ -> e.second.toLong() }) > .keyBy { it.first } > .countWindow(3L, 1) > .reduce { a, b -> b } > .keyBy { it.first } > .filter { it.first == 5 } > .print() > env.execute() > } > } > {code} > The beginning of the output is as below > 12> (5, 184) > 12> (5, 18) > 12> (5, 29) > 12> (5, 37) > 12> (5, 38) > 12> (5, 112) > 12> (5, 131) > The first line (5, 184) is not in order from the rest. > The problem disappears if I remove the keyBy after the reduce or use stream > mode instead of batch mode. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-24767) A keyBy following countWindow does not preserve order within the same partition
[ https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438650#comment-17438650 ] Fabian Paul commented on FLINK-24767: - [~guoweima] can you take a look at this? > A keyBy following countWindow does not preserve order within the same > partition > --- > > Key: FLINK-24767 > URL: https://issues.apache.org/jira/browse/FLINK-24767 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.13.3 >Reporter: Lee Y S >Priority: Major > > I wrote a simple test of the countWindow method (in Kotlin) as below > {code:java} > import org.apache.flink.api.common.RuntimeExecutionMode > import org.apache.flink.api.common.eventtime.WatermarkStrategy > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment > import kotlin.random.Randomobject > CountWindowTest { > @JvmStatic > fun main(args: Array) { > val env = StreamExecutionEnvironment.getExecutionEnvironment() > env.setRuntimeMode(RuntimeExecutionMode.BATCH) > val rand = Random(0) > val data = (0..1000).map { Pair(rand.nextInt(10), it) } > env.fromCollection(data).assignTimestampsAndWatermarks( > WatermarkStrategy.forMonotonousTimestamps>() > .withTimestampAssigner { e, _ -> e.second.toLong() }) > .keyBy { it.first } > .countWindow(3L, 1) > .reduce { a, b -> b } > .keyBy { it.first } > .filter { it.first == 5 } > .print() > env.execute() > } > } > {code} > The beginning of the output is as below > 12> (5, 184) > 12> (5, 18) > 12> (5, 29) > 12> (5, 37) > 12> (5, 38) > 12> (5, 112) > 12> (5, 131) > The first line (5, 184) is not in order from the rest. > The problem disappears if I remove the keyBy after the reduce or use stream > mode instead of batch mode. > -- This message was sent by Atlassian Jira (v8.3.4#803005)