[jira] [Commented] (FLINK-7593) Generated plan does not create correct groups
[ https://issues.apache.org/jira/browse/FLINK-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16530995#comment-16530995 ] Fabian Hueske commented on FLINK-7593: -- Thanks for the feedback and the effort to reproduce the problem [~cshi]! I'm curious how that bug was fixed and would like to have a look before closing the issue. Thanks, Fabian > Generated plan does not create correct groups > - > > Key: FLINK-7593 > URL: https://issues.apache.org/jira/browse/FLINK-7593 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.3.2 > Environment: Windows 7, Ubuntu 16.04, Flink 1.3.2 >Reporter: Steffen Dienst >Priority: Critical > Attachments: flink-good-plan.json > > > Under specific circumstances Flink seems to generate an execution plan that > is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files > contained multiple entries per group, the grouping did not occur. After some > work I managed to reduce the relevant part of our code to the minimal test > case below. Be careful: All parts need to be present, even the irrelevant > secondary output. If I remove anything else Flink generates correct code > (either by introducing a combiner node prior to the reducer or by using "Sum > (combine))" an the edge before the reducer. > {code:java} > import java.util.ArrayList; > import java.util.Collection; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.io.LocalCollectionOutputFormat; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.core.fs.FileSystem.WriteMode; > import org.apache.flink.types.LongValue; > import org.apache.flink.util.LongValueSequenceIterator; > public class FlinkOptimizerBug { > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > > DataSet> x = > env.fromParallelCollection(new LongValueSequenceIterator(0,1000), > LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)) > .join(env.fromParallelCollection(new > LongValueSequenceIterator(0,1000), LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))) > .where(0).equalTo(0).with((t1,t2) -> t1) > .union(env.fromParallelCollection(new > LongValueSequenceIterator(0,1000), LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L))) > .map(l->l) > .withForwardedFields("f0;f1"); > > Collection out = new ArrayList(); > x.output(new LocalCollectionOutputFormat<>(out )); > > x.groupBy(0) > .sum(1) //BUG: this will not be grouped correctly, so there will be > multiple outputs per group! > .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE) > .setParallelism(1); > env.setParallelism(4); > > System.out.println(env.getExecutionPlan()); > env.execute(); > } > } > {code} > Invalid execution plan generated: > {code:javascript} > { > "nodes": [ > { > "id": 5, > "type": "source", > "pact": "Data Source", > "contents": "at > fromParallelCollection(ExecutionEnvironment.java:870) > (org.apache.flink.api.java.io.ParallelIteratorInputFormat)", > "parallelism": "4", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM_PARTITIONED" > }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "0.0" }, > { "name": "Cumulative Disk I/O", "value": "0.0" }, > { "name": "Cumulative CPU", "value": "0.0" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Outp
[jira] [Commented] (FLINK-7593) Generated plan does not create correct groups
[ https://issues.apache.org/jira/browse/FLINK-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16530665#comment-16530665 ] Chunhui Shi commented on FLINK-7593: Yes, I built a small project for the code. And I have reproduced the issue when the project was configured to use flink 1.3.2, 1.4.0. But with 1.5.0 and 1.6.0, I got good plans and good results. [~fhueske]. Let me know if you could still reproduce this bug with 1.5.0 and 1.6.0.[^flink-good-plan.json] > Generated plan does not create correct groups > - > > Key: FLINK-7593 > URL: https://issues.apache.org/jira/browse/FLINK-7593 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.3.2 > Environment: Windows 7, Ubuntu 16.04, Flink 1.3.2 >Reporter: Steffen Dienst >Priority: Critical > Attachments: flink-good-plan.json > > > Under specific circumstances Flink seems to generate an execution plan that > is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files > contained multiple entries per group, the grouping did not occur. After some > work I managed to reduce the relevant part of our code to the minimal test > case below. Be careful: All parts need to be present, even the irrelevant > secondary output. If I remove anything else Flink generates correct code > (either by introducing a combiner node prior to the reducer or by using "Sum > (combine))" an the edge before the reducer. > {code:java} > import java.util.ArrayList; > import java.util.Collection; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.io.LocalCollectionOutputFormat; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.core.fs.FileSystem.WriteMode; > import org.apache.flink.types.LongValue; > import org.apache.flink.util.LongValueSequenceIterator; > public class FlinkOptimizerBug { > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > > DataSet> x = > env.fromParallelCollection(new LongValueSequenceIterator(0,1000), > LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)) > .join(env.fromParallelCollection(new > LongValueSequenceIterator(0,1000), LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))) > .where(0).equalTo(0).with((t1,t2) -> t1) > .union(env.fromParallelCollection(new > LongValueSequenceIterator(0,1000), LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L))) > .map(l->l) > .withForwardedFields("f0;f1"); > > Collection out = new ArrayList(); > x.output(new LocalCollectionOutputFormat<>(out )); > > x.groupBy(0) > .sum(1) //BUG: this will not be grouped correctly, so there will be > multiple outputs per group! > .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE) > .setParallelism(1); > env.setParallelism(4); > > System.out.println(env.getExecutionPlan()); > env.execute(); > } > } > {code} > Invalid execution plan generated: > {code:javascript} > { > "nodes": [ > { > "id": 5, > "type": "source", > "pact": "Data Source", > "contents": "at > fromParallelCollection(ExecutionEnvironment.java:870) > (org.apache.flink.api.java.io.ParallelIteratorInputFormat)", > "parallelism": "4", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM_PARTITIONED" > }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "0.0" }, > { "name": "Cumulative Disk I/O", "value": "0.0" }, > { "name": "Cumulative CPU", "value": "0.0" } > ], > "compiler_hints":
[jira] [Commented] (FLINK-7593) Generated plan does not create correct groups
[ https://issues.apache.org/jira/browse/FLINK-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529491#comment-16529491 ] Fabian Hueske commented on FLINK-7593: -- I'm not aware of any efforts to resolve this problem. [~cshi], why do you think the issue was fixed? Did you try to reproduce it in 1.5.0? Thanks, Fabian > Generated plan does not create correct groups > - > > Key: FLINK-7593 > URL: https://issues.apache.org/jira/browse/FLINK-7593 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.3.2 > Environment: Windows 7, Ubuntu 16.04, Flink 1.3.2 >Reporter: Steffen Dienst >Priority: Critical > > Under specific circumstances Flink seems to generate an execution plan that > is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files > contained multiple entries per group, the grouping did not occur. After some > work I managed to reduce the relevant part of our code to the minimal test > case below. Be careful: All parts need to be present, even the irrelevant > secondary output. If I remove anything else Flink generates correct code > (either by introducing a combiner node prior to the reducer or by using "Sum > (combine))" an the edge before the reducer. > {code:java} > import java.util.ArrayList; > import java.util.Collection; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.io.LocalCollectionOutputFormat; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.core.fs.FileSystem.WriteMode; > import org.apache.flink.types.LongValue; > import org.apache.flink.util.LongValueSequenceIterator; > public class FlinkOptimizerBug { > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > > DataSet> x = > env.fromParallelCollection(new LongValueSequenceIterator(0,1000), > LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)) > .join(env.fromParallelCollection(new > LongValueSequenceIterator(0,1000), LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))) > .where(0).equalTo(0).with((t1,t2) -> t1) > .union(env.fromParallelCollection(new > LongValueSequenceIterator(0,1000), LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L))) > .map(l->l) > .withForwardedFields("f0;f1"); > > Collection out = new ArrayList(); > x.output(new LocalCollectionOutputFormat<>(out )); > > x.groupBy(0) > .sum(1) //BUG: this will not be grouped correctly, so there will be > multiple outputs per group! > .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE) > .setParallelism(1); > env.setParallelism(4); > > System.out.println(env.getExecutionPlan()); > env.execute(); > } > } > {code} > Invalid execution plan generated: > {code:javascript} > { > "nodes": [ > { > "id": 5, > "type": "source", > "pact": "Data Source", > "contents": "at > fromParallelCollection(ExecutionEnvironment.java:870) > (org.apache.flink.api.java.io.ParallelIteratorInputFormat)", > "parallelism": "4", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM_PARTITIONED" > }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "0.0" }, > { "name": "Cumulative Disk I/O", "value": "0.0" }, > { "name": "Cumulative CPU", "value": "0.0" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name":
[jira] [Commented] (FLINK-7593) Generated plan does not create correct groups
[ https://issues.apache.org/jira/browse/FLINK-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16528700#comment-16528700 ] Chunhui Shi commented on FLINK-7593: This seems was fixed in 1.5. > Generated plan does not create correct groups > - > > Key: FLINK-7593 > URL: https://issues.apache.org/jira/browse/FLINK-7593 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.3.2 > Environment: Windows 7, Ubuntu 16.04, Flink 1.3.2 >Reporter: Steffen Dienst >Priority: Critical > > Under specific circumstances Flink seems to generate an execution plan that > is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files > contained multiple entries per group, the grouping did not occur. After some > work I managed to reduce the relevant part of our code to the minimal test > case below. Be careful: All parts need to be present, even the irrelevant > secondary output. If I remove anything else Flink generates correct code > (either by introducing a combiner node prior to the reducer or by using "Sum > (combine))" an the edge before the reducer. > {code:java} > import java.util.ArrayList; > import java.util.Collection; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.io.LocalCollectionOutputFormat; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.core.fs.FileSystem.WriteMode; > import org.apache.flink.types.LongValue; > import org.apache.flink.util.LongValueSequenceIterator; > public class FlinkOptimizerBug { > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > > DataSet> x = > env.fromParallelCollection(new LongValueSequenceIterator(0,1000), > LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)) > .join(env.fromParallelCollection(new > LongValueSequenceIterator(0,1000), LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))) > .where(0).equalTo(0).with((t1,t2) -> t1) > .union(env.fromParallelCollection(new > LongValueSequenceIterator(0,1000), LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L))) > .map(l->l) > .withForwardedFields("f0;f1"); > > Collection out = new ArrayList(); > x.output(new LocalCollectionOutputFormat<>(out )); > > x.groupBy(0) > .sum(1) //BUG: this will not be grouped correctly, so there will be > multiple outputs per group! > .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE) > .setParallelism(1); > env.setParallelism(4); > > System.out.println(env.getExecutionPlan()); > env.execute(); > } > } > {code} > Invalid execution plan generated: > {code:javascript} > { > "nodes": [ > { > "id": 5, > "type": "source", > "pact": "Data Source", > "contents": "at > fromParallelCollection(ExecutionEnvironment.java:870) > (org.apache.flink.api.java.io.ParallelIteratorInputFormat)", > "parallelism": "4", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM_PARTITIONED" > }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "0.0" }, > { "name": "Cumulative Disk I/O", "value": "0.0" }, > { "name": "Cumulative CPU", "value": "0.0" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" }
[jira] [Commented] (FLINK-7593) Generated plan does not create correct groups
[ https://issues.apache.org/jira/browse/FLINK-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155275#comment-16155275 ] Fabian Hueske commented on FLINK-7593: -- Thanks for reporting the bug! I could reproduce the issue and noticed that the program is correctly executed when you remove the {{env.getExecutionPlan()}} call. That's definitely a problem. However, I don't have time to look into this right now. Best, Fabian > Generated plan does not create correct groups > - > > Key: FLINK-7593 > URL: https://issues.apache.org/jira/browse/FLINK-7593 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.3.2 > Environment: Windows 7, Ubuntu 16.04, Flink 1.3.2 >Reporter: Steffen Dienst > > Under specific circumstances Flink seems to generate an execution plan that > is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files > contained multiple entries per group, the grouping did not occur. After some > work I managed to reduce the relevant part of our code to the minimal test > case below. Be careful: All parts need to be present, even the irrelevant > secondary output. If I remove anything else Flink generates correct code > (either by introducing a combiner node prior to the reducer or by using "Sum > (combine))" an the edge before the reducer. > {code:java} > import java.util.ArrayList; > import java.util.Collection; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.io.LocalCollectionOutputFormat; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.core.fs.FileSystem.WriteMode; > import org.apache.flink.types.LongValue; > import org.apache.flink.util.LongValueSequenceIterator; > public class FlinkOptimizerBug { > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > > DataSet> x = > env.fromParallelCollection(new LongValueSequenceIterator(0,1000), > LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)) > .join(env.fromParallelCollection(new > LongValueSequenceIterator(0,1000), LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))) > .where(0).equalTo(0).with((t1,t2) -> t1) > .union(env.fromParallelCollection(new > LongValueSequenceIterator(0,1000), LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L))) > .map(l->l) > .withForwardedFields("f0;f1"); > > Collection out = new ArrayList(); > x.output(new LocalCollectionOutputFormat<>(out )); > > x.groupBy(0) > .sum(1) //BUG: this will not be grouped correctly, so there will be > multiple outputs per group! > .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE) > .setParallelism(1); > env.setParallelism(4); > > System.out.println(env.getExecutionPlan()); > env.execute(); > } > } > {code} > Invalid execution plan generated: > {code:javascript} > { > "nodes": [ > { > "id": 5, > "type": "source", > "pact": "Data Source", > "contents": "at > fromParallelCollection(ExecutionEnvironment.java:870) > (org.apache.flink.api.java.io.ParallelIteratorInputFormat)", > "parallelism": "4", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM_PARTITIONED" > }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "0.0" }, > { "name": "Cumulative Disk I/O", "value": "0.0" }, > { "name": "Cumulative CPU", "value": "0.0" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "O