[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940657#comment-15940657
 ] 

ASF GitHub Bot commented on FLINK-5890:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3515


> GatherSumApply broken when object reuse enabled
> ---
>
> Key: FLINK-5890
> URL: https://issues.apache.org/jira/browse/FLINK-5890
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in 
> {{ReduceDriver}} for the returned results).
> {code}
>   @Override
>   public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws 
> Exception {
>   K key = arg0.f0;
>   M result = this.sumFunction.sum(arg0.f1, arg1.f1);
>   return new Tuple2<>(key, result);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled

2017-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938374#comment-15938374
 ] 

ASF GitHub Bot commented on FLINK-5890:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3515
  
It is a +1 :-)


> GatherSumApply broken when object reuse enabled
> ---
>
> Key: FLINK-5890
> URL: https://issues.apache.org/jira/browse/FLINK-5890
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in 
> {{ReduceDriver}} for the returned results).
> {code}
>   @Override
>   public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws 
> Exception {
>   K key = arg0.f0;
>   M result = this.sumFunction.sum(arg0.f1, arg1.f1);
>   return new Tuple2<>(key, result);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled

2017-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15936909#comment-15936909
 ] 

ASF GitHub Bot commented on FLINK-5890:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3515
  
Not an issue of spilling memory but at least three elements are required to 
trigger two reduces and the error condition depends on which value is returned.

@StephanEwen is this a +1?


> GatherSumApply broken when object reuse enabled
> ---
>
> Key: FLINK-5890
> URL: https://issues.apache.org/jira/browse/FLINK-5890
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in 
> {{ReduceDriver}} for the returned results).
> {code}
>   @Override
>   public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws 
> Exception {
>   K key = arg0.f0;
>   M result = this.sumFunction.sum(arg0.f1, arg1.f1);
>   return new Tuple2<>(key, result);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled

2017-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15906199#comment-15906199
 ] 

ASF GitHub Bot commented on FLINK-5890:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3515
  
@StephanEwen deja vu FLINK-2883 / FLINK-3340.

I'm also looking to run the FLINK-4949 IT tests with object reuse both 
enabled and disabled which would have highlighted this issue as the unit test 
input was too small.


> GatherSumApply broken when object reuse enabled
> ---
>
> Key: FLINK-5890
> URL: https://issues.apache.org/jira/browse/FLINK-5890
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in 
> {{ReduceDriver}} for the returned results).
> {code}
>   @Override
>   public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws 
> Exception {
>   K key = arg0.f0;
>   M result = this.sumFunction.sum(arg0.f1, arg1.f1);
>   return new Tuple2<>(key, result);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled

2017-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15906198#comment-15906198
 ] 

ASF GitHub Bot commented on FLINK-5890:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/3515

[FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled

The initial fix for this ticket is not working on larger data sets.

Reduce supports returning the left input, right input, a new object, or a 
locally reused object. The trouble with the initial fix was that the returned 
local object was reusing fields from the input tuples.

The problem is with ReduceDriver#run managing two values (reuse1 and 
reuse2) and with a third, local value returned by 
GatherSumApplyIteration.SumUDF. After the first grouping value.f1 == reuse1.f1. 
Following UDF calls may swap value.f1 and reuse2.f1, which causes reuse1.f1 == 
reuse2.f1. With an odd number of swaps the next grouping will reduce with 
reuse1 and reuse2 sharing a field and deserialization will overwrite stored 
values.

The simple fix is to only use and return the provided inputs.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
5890b_gathersumapply_broken_when_object_reuse_enabled

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3515.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3515


commit c1bddcb7a59eff8ac1639deaa9fabac3073c6552
Author: Greg Hogan 
Date:   2017-03-10T21:44:27Z

[FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled

The initial fix for this ticket is not working on larger data sets.

Reduce supports returning the left input, right input, a new object, or
a locally reused object. The trouble with the initial fix was that the
returned local object was reusing fields from the input tuples.

The problem is with ReduceDriver#run managing two values (reuse1 and
reuse2) and with a third, local value returned by
GatherSumApplyIteration.SumUDF. After the first grouping value.f1 ==
reuse1.f1. Following UDF calls may swap value.f1 and reuse2.f1, which
causes reuse1.f1 == reuse2.f1. With an odd number of swaps the next
grouping will reduce with reuse1 and reuse2 sharing a field and
deserialization will overwrite stored values.

The simple fix is to only use and return the provided inputs.




> GatherSumApply broken when object reuse enabled
> ---
>
> Key: FLINK-5890
> URL: https://issues.apache.org/jira/browse/FLINK-5890
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in 
> {{ReduceDriver}} for the returned results).
> {code}
>   @Override
>   public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws 
> Exception {
>   K key = arg0.f0;
>   M result = this.sumFunction.sum(arg0.f1, arg1.f1);
>   return new Tuple2<>(key, result);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905298#comment-15905298
 ] 

ASF GitHub Bot commented on FLINK-5890:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3402


> GatherSumApply broken when object reuse enabled
> ---
>
> Key: FLINK-5890
> URL: https://issues.apache.org/jira/browse/FLINK-5890
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in 
> {{ReduceDriver}} for the returned results).
> {code}
>   @Override
>   public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws 
> Exception {
>   K key = arg0.f0;
>   M result = this.sumFunction.sum(arg0.f1, arg1.f1);
>   return new Tuple2<>(key, result);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905072#comment-15905072
 ] 

ASF GitHub Bot commented on FLINK-5890:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3402
  
@StephanEwen my last comment was ambiguous, I had originally modified a 
test and then with yesterday's commit reverted that change and added as a new 
test.

Will merge.


> GatherSumApply broken when object reuse enabled
> ---
>
> Key: FLINK-5890
> URL: https://issues.apache.org/jira/browse/FLINK-5890
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in 
> {{ReduceDriver}} for the returned results).
> {code}
>   @Override
>   public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws 
> Exception {
>   K key = arg0.f0;
>   M result = this.sumFunction.sum(arg0.f1, arg1.f1);
>   return new Tuple2<>(key, result);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904797#comment-15904797
 ] 

ASF GitHub Bot commented on FLINK-5890:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3402
  
@greghogan you are right, I overlooked the adjusted tests!

This looks like a good fix, please merge!


> GatherSumApply broken when object reuse enabled
> ---
>
> Key: FLINK-5890
> URL: https://issues.apache.org/jira/browse/FLINK-5890
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in 
> {{ReduceDriver}} for the returned results).
> {code}
>   @Override
>   public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws 
> Exception {
>   K key = arg0.f0;
>   M result = this.sumFunction.sum(arg0.f1, arg1.f1);
>   return new Tuple2<>(key, result);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903529#comment-15903529
 ] 

ASF GitHub Bot commented on FLINK-5890:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3402
  
@StephanEwen, I updated the test to include the original test plus a new 
test with object reuse enabled.

@vasia, would you be also be able to review this change?


> GatherSumApply broken when object reuse enabled
> ---
>
> Key: FLINK-5890
> URL: https://issues.apache.org/jira/browse/FLINK-5890
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in 
> {{ReduceDriver}} for the returned results).
> {code}
>   @Override
>   public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws 
> Exception {
>   K key = arg0.f0;
>   M result = this.sumFunction.sum(arg0.f1, arg1.f1);
>   return new Tuple2<>(key, result);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882883#comment-15882883
 ] 

ASF GitHub Bot commented on FLINK-5890:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3402
  
I'm not understanding the question. I modified an existing test to use 
mutable types which were previously failing.


> GatherSumApply broken when object reuse enabled
> ---
>
> Key: FLINK-5890
> URL: https://issues.apache.org/jira/browse/FLINK-5890
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in 
> {{ReduceDriver}} for the returned results).
> {code}
>   @Override
>   public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws 
> Exception {
>   K key = arg0.f0;
>   M result = this.sumFunction.sum(arg0.f1, arg1.f1);
>   return new Tuple2<>(key, result);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882661#comment-15882661
 ] 

ASF GitHub Bot commented on FLINK-5890:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3402
  
Fix looks good.
Is this tested implicitly by some other test already?


> GatherSumApply broken when object reuse enabled
> ---
>
> Key: FLINK-5890
> URL: https://issues.apache.org/jira/browse/FLINK-5890
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in 
> {{ReduceDriver}} for the returned results).
> {code}
>   @Override
>   public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws 
> Exception {
>   K key = arg0.f0;
>   M result = this.sumFunction.sum(arg0.f1, arg1.f1);
>   return new Tuple2<>(key, result);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880533#comment-15880533
 ] 

ASF GitHub Bot commented on FLINK-5890:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/3402

[FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled

GatherSumApplyIteration uses reduce and join for which extra care must be 
taken when object reuse is enabled. Adds a check for objects returned by the 
user to prevent system objects from being overwritten.

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
5890_gathersumapply_broken_when_object_reuse_enabled

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3402.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3402


commit c4a7c5103679c70b471e1003c860325d2a919b03
Author: Greg Hogan 
Date:   2017-02-23T13:47:48Z

[FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled

GatherSumApplyIteration uses reduce and join for which extra care must
be taken when object reuse is enabled. Adds a check for objects returned
by the user to prevent system objects from being overwritten.




> GatherSumApply broken when object reuse enabled
> ---
>
> Key: FLINK-5890
> URL: https://issues.apache.org/jira/browse/FLINK-5890
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in 
> {{ReduceDriver}} for the returned results).
> {code}
>   @Override
>   public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws 
> Exception {
>   K key = arg0.f0;
>   M result = this.sumFunction.sum(arg0.f1, arg1.f1);
>   return new Tuple2<>(key, result);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)