[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940657#comment-15940657 ] ASF GitHub Bot commented on FLINK-5890: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3515 > GatherSumApply broken when object reuse enabled > --- > > Key: FLINK-5890 > URL: https://issues.apache.org/jira/browse/FLINK-5890 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in > the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to > swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in > {{ReduceDriver}} for the returned results). > {code} > @Override > public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws > Exception { > K key = arg0.f0; > M result = this.sumFunction.sum(arg0.f1, arg1.f1); > return new Tuple2<>(key, result); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938374#comment-15938374 ] ASF GitHub Bot commented on FLINK-5890: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3515 It is a +1 :-) > GatherSumApply broken when object reuse enabled > --- > > Key: FLINK-5890 > URL: https://issues.apache.org/jira/browse/FLINK-5890 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in > the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to > swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in > {{ReduceDriver}} for the returned results). > {code} > @Override > public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws > Exception { > K key = arg0.f0; > M result = this.sumFunction.sum(arg0.f1, arg1.f1); > return new Tuple2<>(key, result); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15936909#comment-15936909 ] ASF GitHub Bot commented on FLINK-5890: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3515 Not an issue of spilling memory but at least three elements are required to trigger two reduces and the error condition depends on which value is returned. @StephanEwen is this a +1? > GatherSumApply broken when object reuse enabled > --- > > Key: FLINK-5890 > URL: https://issues.apache.org/jira/browse/FLINK-5890 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in > the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to > swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in > {{ReduceDriver}} for the returned results). > {code} > @Override > public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws > Exception { > K key = arg0.f0; > M result = this.sumFunction.sum(arg0.f1, arg1.f1); > return new Tuple2<>(key, result); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15906199#comment-15906199 ] ASF GitHub Bot commented on FLINK-5890: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3515 @StephanEwen deja vu FLINK-2883 / FLINK-3340. I'm also looking to run the FLINK-4949 IT tests with object reuse both enabled and disabled which would have highlighted this issue as the unit test input was too small. > GatherSumApply broken when object reuse enabled > --- > > Key: FLINK-5890 > URL: https://issues.apache.org/jira/browse/FLINK-5890 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in > the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to > swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in > {{ReduceDriver}} for the returned results). > {code} > @Override > public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws > Exception { > K key = arg0.f0; > M result = this.sumFunction.sum(arg0.f1, arg1.f1); > return new Tuple2<>(key, result); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15906198#comment-15906198 ] ASF GitHub Bot commented on FLINK-5890: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/3515 [FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled The initial fix for this ticket is not working on larger data sets. Reduce supports returning the left input, right input, a new object, or a locally reused object. The trouble with the initial fix was that the returned local object was reusing fields from the input tuples. The problem is with ReduceDriver#run managing two values (reuse1 and reuse2) and with a third, local value returned by GatherSumApplyIteration.SumUDF. After the first grouping value.f1 == reuse1.f1. Following UDF calls may swap value.f1 and reuse2.f1, which causes reuse1.f1 == reuse2.f1. With an odd number of swaps the next grouping will reduce with reuse1 and reuse2 sharing a field and deserialization will overwrite stored values. The simple fix is to only use and return the provided inputs. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 5890b_gathersumapply_broken_when_object_reuse_enabled Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3515.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3515 commit c1bddcb7a59eff8ac1639deaa9fabac3073c6552 Author: Greg Hogan Date: 2017-03-10T21:44:27Z [FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled The initial fix for this ticket is not working on larger data sets. Reduce supports returning the left input, right input, a new object, or a locally reused object. The trouble with the initial fix was that the returned local object was reusing fields from the input tuples. The problem is with ReduceDriver#run managing two values (reuse1 and reuse2) and with a third, local value returned by GatherSumApplyIteration.SumUDF. After the first grouping value.f1 == reuse1.f1. Following UDF calls may swap value.f1 and reuse2.f1, which causes reuse1.f1 == reuse2.f1. With an odd number of swaps the next grouping will reduce with reuse1 and reuse2 sharing a field and deserialization will overwrite stored values. The simple fix is to only use and return the provided inputs. > GatherSumApply broken when object reuse enabled > --- > > Key: FLINK-5890 > URL: https://issues.apache.org/jira/browse/FLINK-5890 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in > the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to > swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in > {{ReduceDriver}} for the returned results). > {code} > @Override > public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws > Exception { > K key = arg0.f0; > M result = this.sumFunction.sum(arg0.f1, arg1.f1); > return new Tuple2<>(key, result); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905298#comment-15905298 ] ASF GitHub Bot commented on FLINK-5890: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3402 > GatherSumApply broken when object reuse enabled > --- > > Key: FLINK-5890 > URL: https://issues.apache.org/jira/browse/FLINK-5890 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in > the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to > swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in > {{ReduceDriver}} for the returned results). > {code} > @Override > public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws > Exception { > K key = arg0.f0; > M result = this.sumFunction.sum(arg0.f1, arg1.f1); > return new Tuple2<>(key, result); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905072#comment-15905072 ] ASF GitHub Bot commented on FLINK-5890: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3402 @StephanEwen my last comment was ambiguous, I had originally modified a test and then with yesterday's commit reverted that change and added as a new test. Will merge. > GatherSumApply broken when object reuse enabled > --- > > Key: FLINK-5890 > URL: https://issues.apache.org/jira/browse/FLINK-5890 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in > the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to > swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in > {{ReduceDriver}} for the returned results). > {code} > @Override > public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws > Exception { > K key = arg0.f0; > M result = this.sumFunction.sum(arg0.f1, arg1.f1); > return new Tuple2<>(key, result); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904797#comment-15904797 ] ASF GitHub Bot commented on FLINK-5890: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3402 @greghogan you are right, I overlooked the adjusted tests! This looks like a good fix, please merge! > GatherSumApply broken when object reuse enabled > --- > > Key: FLINK-5890 > URL: https://issues.apache.org/jira/browse/FLINK-5890 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in > the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to > swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in > {{ReduceDriver}} for the returned results). > {code} > @Override > public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws > Exception { > K key = arg0.f0; > M result = this.sumFunction.sum(arg0.f1, arg1.f1); > return new Tuple2<>(key, result); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903529#comment-15903529 ] ASF GitHub Bot commented on FLINK-5890: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3402 @StephanEwen, I updated the test to include the original test plus a new test with object reuse enabled. @vasia, would you be also be able to review this change? > GatherSumApply broken when object reuse enabled > --- > > Key: FLINK-5890 > URL: https://issues.apache.org/jira/browse/FLINK-5890 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in > the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to > swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in > {{ReduceDriver}} for the returned results). > {code} > @Override > public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws > Exception { > K key = arg0.f0; > M result = this.sumFunction.sum(arg0.f1, arg1.f1); > return new Tuple2<>(key, result); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882883#comment-15882883 ] ASF GitHub Bot commented on FLINK-5890: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3402 I'm not understanding the question. I modified an existing test to use mutable types which were previously failing. > GatherSumApply broken when object reuse enabled > --- > > Key: FLINK-5890 > URL: https://issues.apache.org/jira/browse/FLINK-5890 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in > the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to > swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in > {{ReduceDriver}} for the returned results). > {code} > @Override > public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws > Exception { > K key = arg0.f0; > M result = this.sumFunction.sum(arg0.f1, arg1.f1); > return new Tuple2<>(key, result); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882661#comment-15882661 ] ASF GitHub Bot commented on FLINK-5890: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3402 Fix looks good. Is this tested implicitly by some other test already? > GatherSumApply broken when object reuse enabled > --- > > Key: FLINK-5890 > URL: https://issues.apache.org/jira/browse/FLINK-5890 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in > the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to > swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in > {{ReduceDriver}} for the returned results). > {code} > @Override > public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws > Exception { > K key = arg0.f0; > M result = this.sumFunction.sum(arg0.f1, arg1.f1); > return new Tuple2<>(key, result); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880533#comment-15880533 ] ASF GitHub Bot commented on FLINK-5890: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/3402 [FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled GatherSumApplyIteration uses reduce and join for which extra care must be taken when object reuse is enabled. Adds a check for objects returned by the user to prevent system objects from being overwritten. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 5890_gathersumapply_broken_when_object_reuse_enabled Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3402.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3402 commit c4a7c5103679c70b471e1003c860325d2a919b03 Author: Greg Hogan Date: 2017-02-23T13:47:48Z [FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled GatherSumApplyIteration uses reduce and join for which extra care must be taken when object reuse is enabled. Adds a check for objects returned by the user to prevent system objects from being overwritten. > GatherSumApply broken when object reuse enabled > --- > > Key: FLINK-5890 > URL: https://issues.apache.org/jira/browse/FLINK-5890 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in > the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to > swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in > {{ReduceDriver}} for the returned results). > {code} > @Override > public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws > Exception { > K key = arg0.f0; > M result = this.sumFunction.sum(arg0.f1, arg1.f1); > return new Tuple2<>(key, result); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)