[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue)

2016-04-11 Thread Hudson (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15234743#comment-15234743
 ] 

Hudson commented on MAHOUT-1810:


FAILURE: Integrated in Mahout-Quality #3324 (See 
[https://builds.apache.org/job/Mahout-Quality/3324/])
MAHOUT-1810: Failing test in flink-bindings: A + B Identically (apalumbo: rev 
f4f42ae4c4c7555659edcc43669fec82f9537219)
* flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
* 
flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
* 
flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
MAHOUT-1810: Use method taken from FlinkMLTools for CheckpointedFlinkDrm 
(apalumbo: rev 202b94f840286d4d0970f0427122697ba27fc1fb)
* flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
* flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
* 
flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala


> Failing test in flink-bindings: A + B Identically partitioned (mapBlock 
> Checkpointing issue)
> 
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>  Components: Flink
>Affects Versions: 0.11.2
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A + B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue)

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205682#comment-15205682
 ] 

ASF GitHub Bot commented on MAHOUT-1810:


Github user andrewpalumbo closed the pull request at:

https://github.com/apache/mahout/pull/198


> Failing test in flink-bindings: A + B Identically partitioned (mapBlock 
> Checkpointing issue)
> 
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>  Components: Flink
>Affects Versions: 0.11.2
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A + B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue)

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205620#comment-15205620
 ] 

ASF GitHub Bot commented on MAHOUT-1810:


Github user smarthi commented on the pull request:

https://github.com/apache/mahout/pull/198#issuecomment-199583714
  
Yeah, please commit this and start another Jira. This is a big win.


> Failing test in flink-bindings: A + B Identically partitioned (mapBlock 
> Checkpointing issue)
> 
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A + B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue)

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205368#comment-15205368
 ] 

ASF GitHub Bot commented on MAHOUT-1810:


GitHub user andrewpalumbo opened a pull request:

https://github.com/apache/mahout/pull/198

MAHOUT-1810 persist data to file system and read back at each cache() call 
(FOR REVIEW)

We still need to investigate Flink's 
`ExecutionEnvironment.registerCachedFile` to really cache things for access at  
back the end.  However tests do pass with this.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/andrewpalumbo/mahout MAHOUT-1810

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/mahout/pull/198.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #198


commit a9ca8d396eb3963a8e344da1e4e94024af4b437e
Author: Andrew Palumbo 
Date:   2016-03-21T23:15:32Z

Write data to file system and read back at each cache() call




> Failing test in flink-bindings: A + B Identically partitioned (mapBlock 
> Checkpointing issue)
> 
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A + B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue)

2016-03-19 Thread Andrew Palumbo (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15202159#comment-15202159
 ] 

Andrew Palumbo commented on MAHOUT-1810:


As far as I know there is no currently no Caching in the same sense as is done 
in Spark.  I've been told this is to come in future releases.

I am just getting familiar with the Flink engine.  I wonder if we can use the 
{{DistributedFileCache}}?

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/cache/DistributedCache.DistributedCacheEntry.html

> Failing test in flink-bindings: A + B Identically partitioned (mapBlock 
> Checkpointing issue)
> 
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A + B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue)

2016-03-19 Thread Dmitriy Lyubimov (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15202129#comment-15202129
 ] 

Dmitriy Lyubimov commented on MAHOUT-1810:
--

so if checkpoint doesn't cache, (which is the intent to get rid of determinism 
in this test), it is formal non-adherence to the contract of checkpoint and 
checkpoint caching capabilities (parameters CacheHint).

So you are saying there's no way to cope with this?

I think, in the worst case, the solution should seek dumping intermediate 
checkpoint (for cache hints other than None) to dfs or in-memory file system. 
various people are telling me that dfs can now have a pretty sizeable local 
cache configured too, so persistence is not so bad (but not as good as keeping 
object trees in the same jvm, of course).

> Failing test in flink-bindings: A + B Identically partitioned (mapBlock 
> Checkpointing issue)
> 
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A + B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)