[jira] [Commented] (FLINK-7593) Generated plan does not create correct groups

2018-07-03 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16530995#comment-16530995
 ] 

Fabian Hueske commented on FLINK-7593:
--

Thanks for the feedback and the effort to reproduce the problem [~cshi]!
I'm curious how that bug was fixed and would like to have a look before closing 
the issue.

Thanks, Fabian

> Generated plan does not create correct groups
> -
>
> Key: FLINK-7593
> URL: https://issues.apache.org/jira/browse/FLINK-7593
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.3.2
> Environment: Windows 7, Ubuntu 16.04, Flink 1.3.2
>Reporter: Steffen Dienst
>Priority: Critical
> Attachments: flink-good-plan.json
>
>
> Under specific circumstances Flink seems to generate an execution plan that 
> is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files 
> contained multiple entries per group, the grouping did not occur. After some 
> work I managed to reduce the relevant part of our code to the minimal test 
> case below. Be careful: All parts need to be present, even the irrelevant 
> secondary output. If I remove anything else Flink generates correct code 
> (either by introducing a combiner node prior to the reducer or by using "Sum 
> (combine))" an the edge before the reducer.
> {code:java}
> import java.util.ArrayList;
> import java.util.Collection;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.core.fs.FileSystem.WriteMode;
> import org.apache.flink.types.LongValue;
> import org.apache.flink.util.LongValueSequenceIterator;
> public class FlinkOptimizerBug {
>   public static void main(String[] args) throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> 
> DataSet> x = 
> env.fromParallelCollection(new LongValueSequenceIterator(0,1000), 
> LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))
> .join(env.fromParallelCollection(new 
> LongValueSequenceIterator(0,1000), LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)))
> .where(0).equalTo(0).with((t1,t2) -> t1)
> .union(env.fromParallelCollection(new 
> LongValueSequenceIterator(0,1000), LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L)))
> .map(l->l)
> .withForwardedFields("f0;f1");
> 
> Collection  out = new ArrayList();
> x.output(new LocalCollectionOutputFormat<>(out ));
> 
> x.groupBy(0)
> .sum(1) //BUG: this will not be grouped correctly, so there will be 
> multiple outputs per group!
> .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE)
> .setParallelism(1);
> env.setParallelism(4);
> 
> System.out.println(env.getExecutionPlan());
> env.execute();
>   }
> }
> {code}
> Invalid execution plan generated:
> {code:javascript}
> {
>   "nodes": [
>   {
>   "id": 5,
>   "type": "source",
>   "pact": "Data Source",
>   "contents": "at 
> fromParallelCollection(ExecutionEnvironment.java:870) 
> (org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
>   "parallelism": "4",
>   "global_properties": [
>   { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
> },
>   { "name": "Partitioning Order", "value": "(none)" },
>   { "name": "Uniqueness", "value": "not unique" }
>   ],
>   "local_properties": [
>   { "name": "Order", "value": "(none)" },
>   { "name": "Grouping", "value": "not grouped" },
>   { "name": "Uniqueness", "value": "not unique" }
>   ],
>   "estimates": [
>   { "name": "Est. Output Size", "value": "(unknown)" },
>   { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
>   "costs": [
>   { "name": "Network", "value": "0.0" },
>   { "name": "Disk I/O", "value": "0.0" },
>   { "name": "CPU", "value": "0.0" },
>   { "name": "Cumulative Network", "value": "0.0" },
>   { "name": "Cumulative Disk I/O", "value": "0.0" },
>   { "name": "Cumulative CPU", "value": "0.0" }
>   ],
>   "compiler_hints": [
>   { "name": "Output Size (bytes)", "value": "(none)" },
>   { "name": "Outp

[jira] [Commented] (FLINK-7593) Generated plan does not create correct groups

2018-07-02 Thread Chunhui Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16530665#comment-16530665
 ] 

Chunhui Shi commented on FLINK-7593:


Yes, I built a small project for the code. And I have reproduced the issue when 
the project was configured to use flink 1.3.2, 1.4.0. But with 1.5.0 and 1.6.0, 
I got good plans and good results. [~fhueske]. Let me know if you could still 
reproduce this bug with 1.5.0 and 1.6.0.[^flink-good-plan.json]

> Generated plan does not create correct groups
> -
>
> Key: FLINK-7593
> URL: https://issues.apache.org/jira/browse/FLINK-7593
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.3.2
> Environment: Windows 7, Ubuntu 16.04, Flink 1.3.2
>Reporter: Steffen Dienst
>Priority: Critical
> Attachments: flink-good-plan.json
>
>
> Under specific circumstances Flink seems to generate an execution plan that 
> is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files 
> contained multiple entries per group, the grouping did not occur. After some 
> work I managed to reduce the relevant part of our code to the minimal test 
> case below. Be careful: All parts need to be present, even the irrelevant 
> secondary output. If I remove anything else Flink generates correct code 
> (either by introducing a combiner node prior to the reducer or by using "Sum 
> (combine))" an the edge before the reducer.
> {code:java}
> import java.util.ArrayList;
> import java.util.Collection;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.core.fs.FileSystem.WriteMode;
> import org.apache.flink.types.LongValue;
> import org.apache.flink.util.LongValueSequenceIterator;
> public class FlinkOptimizerBug {
>   public static void main(String[] args) throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> 
> DataSet> x = 
> env.fromParallelCollection(new LongValueSequenceIterator(0,1000), 
> LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))
> .join(env.fromParallelCollection(new 
> LongValueSequenceIterator(0,1000), LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)))
> .where(0).equalTo(0).with((t1,t2) -> t1)
> .union(env.fromParallelCollection(new 
> LongValueSequenceIterator(0,1000), LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L)))
> .map(l->l)
> .withForwardedFields("f0;f1");
> 
> Collection  out = new ArrayList();
> x.output(new LocalCollectionOutputFormat<>(out ));
> 
> x.groupBy(0)
> .sum(1) //BUG: this will not be grouped correctly, so there will be 
> multiple outputs per group!
> .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE)
> .setParallelism(1);
> env.setParallelism(4);
> 
> System.out.println(env.getExecutionPlan());
> env.execute();
>   }
> }
> {code}
> Invalid execution plan generated:
> {code:javascript}
> {
>   "nodes": [
>   {
>   "id": 5,
>   "type": "source",
>   "pact": "Data Source",
>   "contents": "at 
> fromParallelCollection(ExecutionEnvironment.java:870) 
> (org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
>   "parallelism": "4",
>   "global_properties": [
>   { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
> },
>   { "name": "Partitioning Order", "value": "(none)" },
>   { "name": "Uniqueness", "value": "not unique" }
>   ],
>   "local_properties": [
>   { "name": "Order", "value": "(none)" },
>   { "name": "Grouping", "value": "not grouped" },
>   { "name": "Uniqueness", "value": "not unique" }
>   ],
>   "estimates": [
>   { "name": "Est. Output Size", "value": "(unknown)" },
>   { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
>   "costs": [
>   { "name": "Network", "value": "0.0" },
>   { "name": "Disk I/O", "value": "0.0" },
>   { "name": "CPU", "value": "0.0" },
>   { "name": "Cumulative Network", "value": "0.0" },
>   { "name": "Cumulative Disk I/O", "value": "0.0" },
>   { "name": "Cumulative CPU", "value": "0.0" }
>   ],
>   "compiler_hints":

[jira] [Commented] (FLINK-7593) Generated plan does not create correct groups

2018-07-02 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529491#comment-16529491
 ] 

Fabian Hueske commented on FLINK-7593:
--

I'm not aware of any efforts to resolve this problem.
[~cshi], why do you think the issue was fixed? Did you try to reproduce it in 
1.5.0?
Thanks, Fabian

> Generated plan does not create correct groups
> -
>
> Key: FLINK-7593
> URL: https://issues.apache.org/jira/browse/FLINK-7593
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.3.2
> Environment: Windows 7, Ubuntu 16.04, Flink 1.3.2
>Reporter: Steffen Dienst
>Priority: Critical
>
> Under specific circumstances Flink seems to generate an execution plan that 
> is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files 
> contained multiple entries per group, the grouping did not occur. After some 
> work I managed to reduce the relevant part of our code to the minimal test 
> case below. Be careful: All parts need to be present, even the irrelevant 
> secondary output. If I remove anything else Flink generates correct code 
> (either by introducing a combiner node prior to the reducer or by using "Sum 
> (combine))" an the edge before the reducer.
> {code:java}
> import java.util.ArrayList;
> import java.util.Collection;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.core.fs.FileSystem.WriteMode;
> import org.apache.flink.types.LongValue;
> import org.apache.flink.util.LongValueSequenceIterator;
> public class FlinkOptimizerBug {
>   public static void main(String[] args) throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> 
> DataSet> x = 
> env.fromParallelCollection(new LongValueSequenceIterator(0,1000), 
> LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))
> .join(env.fromParallelCollection(new 
> LongValueSequenceIterator(0,1000), LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)))
> .where(0).equalTo(0).with((t1,t2) -> t1)
> .union(env.fromParallelCollection(new 
> LongValueSequenceIterator(0,1000), LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L)))
> .map(l->l)
> .withForwardedFields("f0;f1");
> 
> Collection  out = new ArrayList();
> x.output(new LocalCollectionOutputFormat<>(out ));
> 
> x.groupBy(0)
> .sum(1) //BUG: this will not be grouped correctly, so there will be 
> multiple outputs per group!
> .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE)
> .setParallelism(1);
> env.setParallelism(4);
> 
> System.out.println(env.getExecutionPlan());
> env.execute();
>   }
> }
> {code}
> Invalid execution plan generated:
> {code:javascript}
> {
>   "nodes": [
>   {
>   "id": 5,
>   "type": "source",
>   "pact": "Data Source",
>   "contents": "at 
> fromParallelCollection(ExecutionEnvironment.java:870) 
> (org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
>   "parallelism": "4",
>   "global_properties": [
>   { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
> },
>   { "name": "Partitioning Order", "value": "(none)" },
>   { "name": "Uniqueness", "value": "not unique" }
>   ],
>   "local_properties": [
>   { "name": "Order", "value": "(none)" },
>   { "name": "Grouping", "value": "not grouped" },
>   { "name": "Uniqueness", "value": "not unique" }
>   ],
>   "estimates": [
>   { "name": "Est. Output Size", "value": "(unknown)" },
>   { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
>   "costs": [
>   { "name": "Network", "value": "0.0" },
>   { "name": "Disk I/O", "value": "0.0" },
>   { "name": "CPU", "value": "0.0" },
>   { "name": "Cumulative Network", "value": "0.0" },
>   { "name": "Cumulative Disk I/O", "value": "0.0" },
>   { "name": "Cumulative CPU", "value": "0.0" }
>   ],
>   "compiler_hints": [
>   { "name": "Output Size (bytes)", "value": "(none)" },
>   { "name": "Output Cardinality", "value": "(none)" },
>   { "name": 

[jira] [Commented] (FLINK-7593) Generated plan does not create correct groups

2018-06-30 Thread Chunhui Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16528700#comment-16528700
 ] 

Chunhui Shi commented on FLINK-7593:


This seems was fixed in 1.5. 

> Generated plan does not create correct groups
> -
>
> Key: FLINK-7593
> URL: https://issues.apache.org/jira/browse/FLINK-7593
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.3.2
> Environment: Windows 7, Ubuntu 16.04, Flink 1.3.2
>Reporter: Steffen Dienst
>Priority: Critical
>
> Under specific circumstances Flink seems to generate an execution plan that 
> is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files 
> contained multiple entries per group, the grouping did not occur. After some 
> work I managed to reduce the relevant part of our code to the minimal test 
> case below. Be careful: All parts need to be present, even the irrelevant 
> secondary output. If I remove anything else Flink generates correct code 
> (either by introducing a combiner node prior to the reducer or by using "Sum 
> (combine))" an the edge before the reducer.
> {code:java}
> import java.util.ArrayList;
> import java.util.Collection;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.core.fs.FileSystem.WriteMode;
> import org.apache.flink.types.LongValue;
> import org.apache.flink.util.LongValueSequenceIterator;
> public class FlinkOptimizerBug {
>   public static void main(String[] args) throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> 
> DataSet> x = 
> env.fromParallelCollection(new LongValueSequenceIterator(0,1000), 
> LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))
> .join(env.fromParallelCollection(new 
> LongValueSequenceIterator(0,1000), LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)))
> .where(0).equalTo(0).with((t1,t2) -> t1)
> .union(env.fromParallelCollection(new 
> LongValueSequenceIterator(0,1000), LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L)))
> .map(l->l)
> .withForwardedFields("f0;f1");
> 
> Collection  out = new ArrayList();
> x.output(new LocalCollectionOutputFormat<>(out ));
> 
> x.groupBy(0)
> .sum(1) //BUG: this will not be grouped correctly, so there will be 
> multiple outputs per group!
> .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE)
> .setParallelism(1);
> env.setParallelism(4);
> 
> System.out.println(env.getExecutionPlan());
> env.execute();
>   }
> }
> {code}
> Invalid execution plan generated:
> {code:javascript}
> {
>   "nodes": [
>   {
>   "id": 5,
>   "type": "source",
>   "pact": "Data Source",
>   "contents": "at 
> fromParallelCollection(ExecutionEnvironment.java:870) 
> (org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
>   "parallelism": "4",
>   "global_properties": [
>   { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
> },
>   { "name": "Partitioning Order", "value": "(none)" },
>   { "name": "Uniqueness", "value": "not unique" }
>   ],
>   "local_properties": [
>   { "name": "Order", "value": "(none)" },
>   { "name": "Grouping", "value": "not grouped" },
>   { "name": "Uniqueness", "value": "not unique" }
>   ],
>   "estimates": [
>   { "name": "Est. Output Size", "value": "(unknown)" },
>   { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
>   "costs": [
>   { "name": "Network", "value": "0.0" },
>   { "name": "Disk I/O", "value": "0.0" },
>   { "name": "CPU", "value": "0.0" },
>   { "name": "Cumulative Network", "value": "0.0" },
>   { "name": "Cumulative Disk I/O", "value": "0.0" },
>   { "name": "Cumulative CPU", "value": "0.0" }
>   ],
>   "compiler_hints": [
>   { "name": "Output Size (bytes)", "value": "(none)" },
>   { "name": "Output Cardinality", "value": "(none)" },
>   { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>   { "name": "Filter Factor", "value": "(none)" }

[jira] [Commented] (FLINK-7593) Generated plan does not create correct groups

2017-09-06 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155275#comment-16155275
 ] 

Fabian Hueske commented on FLINK-7593:
--

Thanks for reporting the bug!
I could reproduce the issue and noticed that the program is correctly executed 
when you remove the {{env.getExecutionPlan()}} call. That's definitely a 
problem.

However, I don't have time to look into this right now.
Best, Fabian

> Generated plan does not create correct groups
> -
>
> Key: FLINK-7593
> URL: https://issues.apache.org/jira/browse/FLINK-7593
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.3.2
> Environment: Windows 7, Ubuntu 16.04, Flink 1.3.2
>Reporter: Steffen Dienst
>
> Under specific circumstances Flink seems to generate an execution plan that 
> is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files 
> contained multiple entries per group, the grouping did not occur. After some 
> work I managed to reduce the relevant part of our code to the minimal test 
> case below. Be careful: All parts need to be present, even the irrelevant 
> secondary output. If I remove anything else Flink generates correct code 
> (either by introducing a combiner node prior to the reducer or by using "Sum 
> (combine))" an the edge before the reducer.
> {code:java}
> import java.util.ArrayList;
> import java.util.Collection;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.core.fs.FileSystem.WriteMode;
> import org.apache.flink.types.LongValue;
> import org.apache.flink.util.LongValueSequenceIterator;
> public class FlinkOptimizerBug {
>   public static void main(String[] args) throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> 
> DataSet> x = 
> env.fromParallelCollection(new LongValueSequenceIterator(0,1000), 
> LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))
> .join(env.fromParallelCollection(new 
> LongValueSequenceIterator(0,1000), LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)))
> .where(0).equalTo(0).with((t1,t2) -> t1)
> .union(env.fromParallelCollection(new 
> LongValueSequenceIterator(0,1000), LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L)))
> .map(l->l)
> .withForwardedFields("f0;f1");
> 
> Collection  out = new ArrayList();
> x.output(new LocalCollectionOutputFormat<>(out ));
> 
> x.groupBy(0)
> .sum(1) //BUG: this will not be grouped correctly, so there will be 
> multiple outputs per group!
> .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE)
> .setParallelism(1);
> env.setParallelism(4);
> 
> System.out.println(env.getExecutionPlan());
> env.execute();
>   }
> }
> {code}
> Invalid execution plan generated:
> {code:javascript}
> {
>   "nodes": [
>   {
>   "id": 5,
>   "type": "source",
>   "pact": "Data Source",
>   "contents": "at 
> fromParallelCollection(ExecutionEnvironment.java:870) 
> (org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
>   "parallelism": "4",
>   "global_properties": [
>   { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
> },
>   { "name": "Partitioning Order", "value": "(none)" },
>   { "name": "Uniqueness", "value": "not unique" }
>   ],
>   "local_properties": [
>   { "name": "Order", "value": "(none)" },
>   { "name": "Grouping", "value": "not grouped" },
>   { "name": "Uniqueness", "value": "not unique" }
>   ],
>   "estimates": [
>   { "name": "Est. Output Size", "value": "(unknown)" },
>   { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
>   "costs": [
>   { "name": "Network", "value": "0.0" },
>   { "name": "Disk I/O", "value": "0.0" },
>   { "name": "CPU", "value": "0.0" },
>   { "name": "Cumulative Network", "value": "0.0" },
>   { "name": "Cumulative Disk I/O", "value": "0.0" },
>   { "name": "Cumulative CPU", "value": "0.0" }
>   ],
>   "compiler_hints": [
>   { "name": "Output Size (bytes)", "value": "(none)" },
>   { "name": "O