[jira] [Commented] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue
[ https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15588930#comment-15588930 ] ASF GitHub Bot commented on FLINK-4586: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2639 > NumberSequenceIterator and Accumulator threading issue > -- > > Key: FLINK-4586 > URL: https://issues.apache.org/jira/browse/FLINK-4586 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.1.2 >Reporter: Johannes >Assignee: Greg Hogan >Priority: Minor > Fix For: 1.2.0, 1.1.4 > > Attachments: FLINK4586Test.scala > > > There is a strange problem when using the NumberSequenceIterator in > combination with an AverageAccumulator. > It seems like the individual accumulators are reinitialized and overwrite > parts of intermediate solutions. > The following scala snippit exemplifies the problem. > Instead of printing the correct average, the result should be {{50.5}} but is > something completely different, like {{8.08}}, dependent on the number of > cores used. > If the parallelism is set to {{1}} the result is correct, which indicates a > likely threading problem. > The problem occurs using the java and scala API. > {code} > env > .fromParallelCollection(new NumberSequenceIterator(1, 100)) > .map(new RichMapFunction[Long, Long] { > var a : AverageAccumulator = _ > override def map(value: Long): Long = { > a.add(value) > value > } > override def open(parameters: Configuration): Unit = { > a = new AverageAccumulator > getRuntimeContext.addAccumulator("test", a) > } > }) > .reduce((a, b) => a + b) > .print() > val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult > println(lastJobExecutionResult.getAccumulatorResult("test")) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue
[ https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15588571#comment-15588571 ] ASF GitHub Bot commented on FLINK-4586: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2639#discussion_r84054277 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java --- @@ -28,51 +28,52 @@ public class AverageAccumulator implements SimpleAccumulator { private static final long serialVersionUID = 3672555084179165255L; - - private double localValue; + private long count; + private double sum; + @Override public void add(Double value) { this.count++; - this.localValue += value; + this.sum += value; } public void add(double value) { this.count++; - this.localValue += value; + this.sum += value; } public void add(long value) { this.count++; - this.localValue += value; + this.sum += value; } public void add(int value) { this.count++; - this.localValue += value; + this.sum += value; } @Override public Double getLocalValue() { if (this.count == 0) { return 0.0; } - return this.localValue / (double)this.count; + return this.sum / this.count; } @Override public void resetLocal() { this.count = 0; - this.localValue = 0; + this.sum = 0; } @Override public void merge(Accumulatorother) { if (other instanceof AverageAccumulator) { - AverageAccumulator temp = (AverageAccumulator)other; - this.count += temp.count; - this.localValue += other.getLocalValue(); --- End diff -- Yes, and the test did not catch the bug since it was only merging the average of two single values (where the sum is the average). > NumberSequenceIterator and Accumulator threading issue > -- > > Key: FLINK-4586 > URL: https://issues.apache.org/jira/browse/FLINK-4586 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.1.2 >Reporter: Johannes >Assignee: Greg Hogan >Priority: Minor > Fix For: 1.2.0, 1.1.4 > > Attachments: FLINK4586Test.scala > > > There is a strange problem when using the NumberSequenceIterator in > combination with an AverageAccumulator. > It seems like the individual accumulators are reinitialized and overwrite > parts of intermediate solutions. > The following scala snippit exemplifies the problem. > Instead of printing the correct average, the result should be {{50.5}} but is > something completely different, like {{8.08}}, dependent on the number of > cores used. > If the parallelism is set to {{1}} the result is correct, which indicates a > likely threading problem. > The problem occurs using the java and scala API. > {code} > env > .fromParallelCollection(new NumberSequenceIterator(1, 100)) > .map(new RichMapFunction[Long, Long] { > var a : AverageAccumulator = _ > override def map(value: Long): Long = { > a.add(value) > value > } > override def open(parameters: Configuration): Unit = { > a = new AverageAccumulator > getRuntimeContext.addAccumulator("test", a) > } > }) > .reduce((a, b) => a + b) > .print() > val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult > println(lastJobExecutionResult.getAccumulatorResult("test")) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue
[ https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582283#comment-15582283 ] ASF GitHub Bot commented on FLINK-4586: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2639#discussion_r83643344 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java --- @@ -28,51 +28,52 @@ public class AverageAccumulator implements SimpleAccumulator { private static final long serialVersionUID = 3672555084179165255L; - - private double localValue; + private long count; + private double sum; + @Override public void add(Double value) { this.count++; - this.localValue += value; + this.sum += value; } public void add(double value) { this.count++; - this.localValue += value; + this.sum += value; } public void add(long value) { this.count++; - this.localValue += value; + this.sum += value; } public void add(int value) { this.count++; - this.localValue += value; + this.sum += value; } @Override public Double getLocalValue() { if (this.count == 0) { return 0.0; } - return this.localValue / (double)this.count; + return this.sum / this.count; } @Override public void resetLocal() { this.count = 0; - this.localValue = 0; + this.sum = 0; } @Override public void merge(Accumulatorother) { if (other instanceof AverageAccumulator) { - AverageAccumulator temp = (AverageAccumulator)other; - this.count += temp.count; - this.localValue += other.getLocalValue(); --- End diff -- I guess this was the buggy line? > NumberSequenceIterator and Accumulator threading issue > -- > > Key: FLINK-4586 > URL: https://issues.apache.org/jira/browse/FLINK-4586 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.1.2 >Reporter: Johannes >Assignee: Greg Hogan >Priority: Minor > Fix For: 1.2.0, 1.1.4 > > Attachments: FLINK4586Test.scala > > > There is a strange problem when using the NumberSequenceIterator in > combination with an AverageAccumulator. > It seems like the individual accumulators are reinitialized and overwrite > parts of intermediate solutions. > The following scala snippit exemplifies the problem. > Instead of printing the correct average, the result should be {{50.5}} but is > something completely different, like {{8.08}}, dependent on the number of > cores used. > If the parallelism is set to {{1}} the result is correct, which indicates a > likely threading problem. > The problem occurs using the java and scala API. > {code} > env > .fromParallelCollection(new NumberSequenceIterator(1, 100)) > .map(new RichMapFunction[Long, Long] { > var a : AverageAccumulator = _ > override def map(value: Long): Long = { > a.add(value) > value > } > override def open(parameters: Configuration): Unit = { > a = new AverageAccumulator > getRuntimeContext.addAccumulator("test", a) > } > }) > .reduce((a, b) => a + b) > .print() > val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult > println(lastJobExecutionResult.getAccumulatorResult("test")) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue
[ https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15576379#comment-15576379 ] ASF GitHub Bot commented on FLINK-4586: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2639 [FLINK-4586] [core] Broken AverageAccumulator You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4586_broken_averageaccumulator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2639.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2639 commit 58d79b40ed837542e727c5f4f3a410ccb5f9ff24 Author: Greg HoganDate: 2016-10-14T20:18:52Z [FLINK-4586] [core] Broken AverageAccumulator > NumberSequenceIterator and Accumulator threading issue > -- > > Key: FLINK-4586 > URL: https://issues.apache.org/jira/browse/FLINK-4586 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.1.2 >Reporter: Johannes >Assignee: Greg Hogan >Priority: Minor > Fix For: 1.2.0, 1.1.4 > > Attachments: FLINK4586Test.scala > > > There is a strange problem when using the NumberSequenceIterator in > combination with an AverageAccumulator. > It seems like the individual accumulators are reinitialized and overwrite > parts of intermediate solutions. > The following scala snippit exemplifies the problem. > Instead of printing the correct average, the result should be {{50.5}} but is > something completely different, like {{8.08}}, dependent on the number of > cores used. > If the parallelism is set to {{1}} the result is correct, which indicates a > likely threading problem. > The problem occurs using the java and scala API. > {code} > env > .fromParallelCollection(new NumberSequenceIterator(1, 100)) > .map(new RichMapFunction[Long, Long] { > var a : AverageAccumulator = _ > override def map(value: Long): Long = { > a.add(value) > value > } > override def open(parameters: Configuration): Unit = { > a = new AverageAccumulator > getRuntimeContext.addAccumulator("test", a) > } > }) > .reduce((a, b) => a + b) > .print() > val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult > println(lastJobExecutionResult.getAccumulatorResult("test")) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue
[ https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15468729#comment-15468729 ] Johannes commented on FLINK-4586: - The testcase also fails, when just using a plain collection and a rebalance to create some form of a parallel collection {code} fromCollection(1 to 100).rebalance() {code} So it seems to be not specific to the NumberSequenceIterator. So either the initialization of the accumulator is wrong in the sample code, or there is a deeper issue. > NumberSequenceIterator and Accumulator threading issue > -- > > Key: FLINK-4586 > URL: https://issues.apache.org/jira/browse/FLINK-4586 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.1.2 >Reporter: Johannes >Priority: Minor > Attachments: FLINK4586Test.scala > > > There is a strange problem when using the NumberSequenceIterator in > combination with an AverageAccumulator. > It seems like the individual accumulators are reinitialized and overwrite > parts of intermediate solutions. > The following scala snippit exemplifies the problem. > Instead of printing the correct average, the result should be {{50.5}} but is > something completely different, like {{8.08}}, dependent on the number of > cores used. > If the parallelism is set to {{1}} the result is correct, which indicates a > likely threading problem. > The problem occurs using the java and scala API. > {code} > env > .fromParallelCollection(new NumberSequenceIterator(1, 100)) > .map(new RichMapFunction[Long, Long] { > var a : AverageAccumulator = _ > override def map(value: Long): Long = { > a.add(value) > value > } > override def open(parameters: Configuration): Unit = { > a = new AverageAccumulator > getRuntimeContext.addAccumulator("test", a) > } > }) > .reduce((a, b) => a + b) > .print() > val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult > println(lastJobExecutionResult.getAccumulatorResult("test")) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)