[jira] [Updated] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue

2016-10-14 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-4586:
--
Fix Version/s: 1.1.4
   1.2.0

> NumberSequenceIterator and Accumulator threading issue
> --
>
> Key: FLINK-4586
> URL: https://issues.apache.org/jira/browse/FLINK-4586
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.1.2
>Reporter: Johannes
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.2.0, 1.1.4
>
> Attachments: FLINK4586Test.scala
>
>
> There is a strange problem when using the NumberSequenceIterator in 
> combination with an AverageAccumulator.
> It seems like the individual accumulators are reinitialized and overwrite 
> parts of intermediate solutions.
> The following scala snippit exemplifies the problem.
> Instead of printing the correct average, the result should be {{50.5}} but is 
> something completely different, like {{8.08}}, dependent on the number of 
> cores used.
> If the parallelism is set to {{1}} the result is correct, which indicates a 
> likely threading problem. 
> The problem occurs using the java and scala API.
> {code}
> env
>   .fromParallelCollection(new NumberSequenceIterator(1, 100))
>   .map(new RichMapFunction[Long, Long] {
>   var a : AverageAccumulator = _
>   override def map(value: Long): Long = {
> a.add(value)
> value
>   }
>   override def open(parameters: Configuration): Unit = {
> a = new AverageAccumulator
> getRuntimeContext.addAccumulator("test", a)
>   }
>   })
>   .reduce((a, b) => a + b)
>   .print()
> val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult
> println(lastJobExecutionResult.getAccumulatorResult("test"))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue

2016-09-07 Thread Johannes (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johannes updated FLINK-4586:

Attachment: FLINK4586Test.scala

Scala unit test

> NumberSequenceIterator and Accumulator threading issue
> --
>
> Key: FLINK-4586
> URL: https://issues.apache.org/jira/browse/FLINK-4586
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.1.2
>Reporter: Johannes
>Priority: Minor
> Attachments: FLINK4586Test.scala
>
>
> There is a strange problem when using the NumberSequenceIterator in 
> combination with an AverageAccumulator.
> It seems like the individual accumulators are reinitialized and overwrite 
> parts of intermediate solutions.
> The following scala snippit exemplifies the problem.
> Instead of printing the correct average, the result should be {{50.5}} but is 
> something completely different, like {{8.08}}, dependent on the number of 
> cores used.
> If the parallelism is set to {{1}} the result is correct, which indicates a 
> likely threading problem. 
> The problem occurs using the java and scala API.
> {code}
> env
>   .fromParallelCollection(new NumberSequenceIterator(1, 100))
>   .map(new RichMapFunction[Long, Long] {
>   var a : AverageAccumulator = _
>   override def map(value: Long): Long = {
> a.add(value)
> value
>   }
>   override def open(parameters: Configuration): Unit = {
> a = new AverageAccumulator
> getRuntimeContext.addAccumulator("test", a)
>   }
>   })
>   .reduce((a, b) => a + b)
>   .print()
> val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult
> println(lastJobExecutionResult.getAccumulatorResult("test"))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue

2016-09-07 Thread Johannes (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johannes updated FLINK-4586:

Attachment: (was: FLINK4586Test.scala)

> NumberSequenceIterator and Accumulator threading issue
> --
>
> Key: FLINK-4586
> URL: https://issues.apache.org/jira/browse/FLINK-4586
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.1.2
>Reporter: Johannes
>Priority: Minor
>
> There is a strange problem when using the NumberSequenceIterator in 
> combination with an AverageAccumulator.
> It seems like the individual accumulators are reinitialized and overwrite 
> parts of intermediate solutions.
> The following scala snippit exemplifies the problem.
> Instead of printing the correct average, the result should be {{50.5}} but is 
> something completely different, like {{8.08}}, dependent on the number of 
> cores used.
> If the parallelism is set to {{1}} the result is correct, which indicates a 
> likely threading problem. 
> The problem occurs using the java and scala API.
> {code}
> env
>   .fromParallelCollection(new NumberSequenceIterator(1, 100))
>   .map(new RichMapFunction[Long, Long] {
>   var a : AverageAccumulator = _
>   override def map(value: Long): Long = {
> a.add(value)
> value
>   }
>   override def open(parameters: Configuration): Unit = {
> a = new AverageAccumulator
> getRuntimeContext.addAccumulator("test", a)
>   }
>   })
>   .reduce((a, b) => a + b)
>   .print()
> val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult
> println(lastJobExecutionResult.getAccumulatorResult("test"))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue

2016-09-06 Thread Johannes (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johannes updated FLINK-4586:

Description: 
There is a strange problem when using the NumberSequenceIterator in combination 
with an AverageAccumulator.

It seems like the individual accumulators are reinitialized and overwrite parts 
of intermediate solutions.

The following scala snippit exemplifies the problem.

Instead of printing the correct average, the result should be {{50.5}} but is 
something completely different, like {{8.08}}, dependent on the number of cores 
used.

If the parallelism is set to {{1}} the result is correct, which indicates a 
likely threading problem. 

The problem occurs using the java and scala API.

{code}
env
  .fromParallelCollection(new NumberSequenceIterator(1, 100))
  .map(new RichMapFunction[Long, Long] {
var a : AverageAccumulator = _

override def map(value: Long): Long = {
  a.add(value)
  value
}

override def open(parameters: Configuration): Unit = {
  a = new AverageAccumulator
  getRuntimeContext.addAccumulator("test", a)
}
  })
  .reduce((a, b) => a + b)
  .print()


val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult

println(lastJobExecutionResult.getAccumulatorResult("test"))
{code}

  was:
There is a strange problem when using the NumberSequenceIterator in combination 
with an AverageAccumulator.

It seems like the individual accumulators are reinitialized and overwrite parts 
of intermediate solution.

The following scala snippit exemplifies the problem.

Instead of printing the correct average, the result should be {{50.5}} but is 
something completely different, like {{8.08}}, dependent on the number of cores 
used.

If the parallelism is set to {{1}} the result is correct, which indicates a 
likely threading problem. 

The problem occurs using the java and scala API.

{code}
env
  .fromParallelCollection(new NumberSequenceIterator(1, 100))
  .map(new RichMapFunction[Long, Long] {
var a : AverageAccumulator = _

override def map(value: Long): Long = {
  a.add(value)
  value
}

override def open(parameters: Configuration): Unit = {
  a = new AverageAccumulator
  getRuntimeContext.addAccumulator("test", a)
}
  })
  .reduce((a, b) => a + b)
  .print()


val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult

println(lastJobExecutionResult.getAccumulatorResult("test"))
{code}


> NumberSequenceIterator and Accumulator threading issue
> --
>
> Key: FLINK-4586
> URL: https://issues.apache.org/jira/browse/FLINK-4586
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.1.2
>Reporter: Johannes
>Priority: Minor
> Attachments: FLINK4586Test.scala
>
>
> There is a strange problem when using the NumberSequenceIterator in 
> combination with an AverageAccumulator.
> It seems like the individual accumulators are reinitialized and overwrite 
> parts of intermediate solutions.
> The following scala snippit exemplifies the problem.
> Instead of printing the correct average, the result should be {{50.5}} but is 
> something completely different, like {{8.08}}, dependent on the number of 
> cores used.
> If the parallelism is set to {{1}} the result is correct, which indicates a 
> likely threading problem. 
> The problem occurs using the java and scala API.
> {code}
> env
>   .fromParallelCollection(new NumberSequenceIterator(1, 100))
>   .map(new RichMapFunction[Long, Long] {
>   var a : AverageAccumulator = _
>   override def map(value: Long): Long = {
> a.add(value)
> value
>   }
>   override def open(parameters: Configuration): Unit = {
> a = new AverageAccumulator
> getRuntimeContext.addAccumulator("test", a)
>   }
>   })
>   .reduce((a, b) => a + b)
>   .print()
> val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult
> println(lastJobExecutionResult.getAccumulatorResult("test"))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue

2016-09-06 Thread Johannes (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johannes updated FLINK-4586:

Description: 
There is a strange problem when using the NumberSequenceIterator in combination 
with an AverageAccumulator.

It seems like the individual accumulators are reinitialized and overwrite parts 
of intermediate solution.

The following scala snippit exemplifies the problem.

Instead of printing the correct average, the result should be {{50.5}} but is 
something completely different, like {{8.08}}, dependent on the number of cores 
used.

If the parallelism is set to {{1}} the result is correct, which indicates a 
likely threading problem. 

The problem occurs using the java and scala API.

{code}
env
  .fromParallelCollection(new NumberSequenceIterator(1, 100))
  .map(new RichMapFunction[Long, Long] {
var a : AverageAccumulator = _

override def map(value: Long): Long = {
  a.add(value)
  value
}

override def open(parameters: Configuration): Unit = {
  a = new AverageAccumulator
  getRuntimeContext.addAccumulator("test", a)
}
  })
  .reduce((a, b) => a + b)
  .print()


val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult

println(lastJobExecutionResult.getAccumulatorResult("test"))
{code}

  was:
There is a strange problem when using the NumberSequenceIterator in combination 
with an AverageAccumulator.

It seems like the individual accumulators are reinitialized and overwrite parts 
of intermediate solution.

The following scala snippit exemplifies the problem.
Instead of printing the correct average, the result should be {{50.5}} but is 
something completely different, like {{8.08}}, dependent on the number of cores 
used.
If the parallelism is set to {{1}} the result is correct, which seems like 
there is a problem with threading. The problem occurs using the java and scala 
API.

{code}
env
  .fromParallelCollection(new NumberSequenceIterator(1, 100))
  .map(new RichMapFunction[Long, Long] {
var a : AverageAccumulator = _

override def map(value: Long): Long = {
  a.add(value)
  value
}

override def open(parameters: Configuration): Unit = {
  a = new AverageAccumulator
  getRuntimeContext.addAccumulator("test", a)
}
  })
  .reduce((a, b) => a + b)
  .print()


val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult

println(lastJobExecutionResult.getAccumulatorResult("test"))
{code}


> NumberSequenceIterator and Accumulator threading issue
> --
>
> Key: FLINK-4586
> URL: https://issues.apache.org/jira/browse/FLINK-4586
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.1.2
>Reporter: Johannes
>Priority: Minor
> Attachments: FLINK4586Test.scala
>
>
> There is a strange problem when using the NumberSequenceIterator in 
> combination with an AverageAccumulator.
> It seems like the individual accumulators are reinitialized and overwrite 
> parts of intermediate solution.
> The following scala snippit exemplifies the problem.
> Instead of printing the correct average, the result should be {{50.5}} but is 
> something completely different, like {{8.08}}, dependent on the number of 
> cores used.
> If the parallelism is set to {{1}} the result is correct, which indicates a 
> likely threading problem. 
> The problem occurs using the java and scala API.
> {code}
> env
>   .fromParallelCollection(new NumberSequenceIterator(1, 100))
>   .map(new RichMapFunction[Long, Long] {
>   var a : AverageAccumulator = _
>   override def map(value: Long): Long = {
> a.add(value)
> value
>   }
>   override def open(parameters: Configuration): Unit = {
> a = new AverageAccumulator
> getRuntimeContext.addAccumulator("test", a)
>   }
>   })
>   .reduce((a, b) => a + b)
>   .print()
> val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult
> println(lastJobExecutionResult.getAccumulatorResult("test"))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue

2016-09-06 Thread Johannes (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johannes updated FLINK-4586:

Attachment: FLINK4586Test.scala

Complete Scala Testcase

> NumberSequenceIterator and Accumulator threading issue
> --
>
> Key: FLINK-4586
> URL: https://issues.apache.org/jira/browse/FLINK-4586
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.1.2
>Reporter: Johannes
>Priority: Minor
> Attachments: FLINK4586Test.scala
>
>
> There is a strange problem when using the NumberSequenceIterator in 
> combination with an AverageAccumulator.
> It seems like the individual accumulators are reinitialized and overwrite 
> parts of intermediate solution.
> The following scala snippit exemplifies the problem.
> Instead of printing the correct average, the result should be {{50.5}} but is 
> something completely different, like {{8.08}}, dependent on the number of 
> cores used.
> If the parallelism is set to {{1}} the result is correct, which seems like 
> there is a problem with threading. The problem occurs using the java and 
> scala API.
> {code}
> env
>   .fromParallelCollection(new NumberSequenceIterator(1, 100))
>   .map(new RichMapFunction[Long, Long] {
>   var a : AverageAccumulator = _
>   override def map(value: Long): Long = {
> a.add(value)
> value
>   }
>   override def open(parameters: Configuration): Unit = {
> a = new AverageAccumulator
> getRuntimeContext.addAccumulator("test", a)
>   }
>   })
>   .reduce((a, b) => a + b)
>   .print()
> val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult
> println(lastJobExecutionResult.getAccumulatorResult("test"))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)