[jira] [Commented] (SPARK-10636) RDD filter does not work after if..then..else RDD blocks

2015-09-16 Thread Glenn Strycker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14790681#comment-14790681
 ] 

Glenn Strycker commented on SPARK-10636:


I didn't "forget", I believed that "RDD = if {} else {} . something" would 
automatically take care of the associative property, and that anything after 
the final else {} would apply to both blocks.  I didn't realize that braces 
behave similarly to parentheses and that I needed extras -- makes sense.  I 
have now added these to my code.

This wasn't a question for "user@ first", since I really did believe there was 
a bug.  Jira is the place for submitting bug reports, even when the resolution 
is user error.

> RDD filter does not work after if..then..else RDD blocks
> 
>
> Key: SPARK-10636
> URL: https://issues.apache.org/jira/browse/SPARK-10636
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
>
> I have an RDD declaration of the following form:
> {code}
> val myRDD = if (some condition) { tempRDD1.some operations } else { 
> tempRDD2.some operations}.filter(a => a._2._5 <= 50)
> {code}
> When I output the contents of myRDD, I found entries that clearly had a._2._5 
> > 50... the filter didn't work!
> If I move the filter inside of the if..then blocks, it suddenly does work:
> {code}
> val myRDD = if (some condition) { tempRDD1.some operations.filter(a => 
> a._2._5 <= 50) } else { tempRDD2.some operations.filter(a => a._2._5 <= 50) }
> {code}
> I ran toDebugString after both of these code examples, and "filter" does 
> appear in the DAG for the second example, but DOES NOT appear in the first 
> DAG.  Why not?
> Am I misusing the if..then..else syntax for Spark/Scala?
> Here is my actual code... ignore the crazy naming conventions and what it's 
> doing...
> {code}
> // this does NOT work
> val myRDD = if (tempRDD2.count() > 0) {
>tempRDD1.
>  map(a => (a._1._1, (a._1._2, a._2._1, a._2._2))).
>  leftOuterJoin(tempRDD2).
>  map(a => (a._2._1._1, (a._1, a._2._1._2, a._2._1._3, 
> a._2._2.getOrElse(1L.
>  leftOuterJoin(tempRDD2).
>  map(a => ((a._2._1._1, a._1), (a._2._1._2, a._2._1._3, a._2._1._4, 
> a._2._2.getOrElse(1L.
>  map(a =>((List(a._1._1, a._1._2).min, List(a._1._1, a._1._2).max), 
> (a._2._1, a._2._2, if (a._1._1 <= a._1._2) { a._2._3 } else { a._2._4 }, if 
> (a._1._1 <= a._1._2) { a._2._4 } else { a._2._3 }, a._2._3 + a._2._4)))
>} else {
>  tempRDD1.map(a => (a._1, (a._2._1, a._2._2, 1L, 1L, 2L)))
>}.
>filter(a => a._2._5 <= 50).
>partitionBy(partitioner).
>setName("myRDD").
>persist(StorageLevel.MEMORY_AND_DISK_SER)
> myRDD.checkpoint()
> println(myRDD.toDebugString)
> // (4) MapPartitionsRDD[58] at map at myProgram.scala:2120 []
> //  |  MapPartitionsRDD[57] at map at myProgram.scala:2119 []
> //  |  MapPartitionsRDD[56] at leftOuterJoin at myProgram.scala:2118 []
> //  |  MapPartitionsRDD[55] at leftOuterJoin at myProgram.scala:2118 []
> //  |  CoGroupedRDD[54] at leftOuterJoin at myProgram.scala:2118 []
> //  +-(4) MapPartitionsRDD[53] at map at myProgram.scala:2117 []
> //  |  |  MapPartitionsRDD[52] at leftOuterJoin at myProgram.scala:2116 []
> //  |  |  MapPartitionsRDD[51] at leftOuterJoin at myProgram.scala:2116 []
> //  |  |  CoGroupedRDD[50] at leftOuterJoin at myProgram.scala:2116 []
> //  |  +-(4) MapPartitionsRDD[49] at map at myProgram.scala:2115 []
> //  |  |  |  clusterGraphWithComponentsRDD MapPartitionsRDD[28] at 
> reduceByKey at myProgram.scala:1689 []
> //  |  |  |  CachedPartitions: 4; MemorySize: 1176.0 B; TachyonSize: 0.0 
> B; DiskSize: 0.0 B
> //  |  |  |  CheckpointRDD[29] at count at myProgram.scala:1701 []
> //  |  +-(4) clusterStatsRDD MapPartitionsRDD[16] at distinct at 
> myProgram.scala:383 []
> //  | |  CachedPartitions: 4; MemorySize: 824.0 B; TachyonSize: 0.0 
> B; DiskSize: 0.0 B
> //  | |  CheckpointRDD[17] at count at myProgram.scala:394 []
> //  +-(4) clusterStatsRDD MapPartitionsRDD[16] at distinct at 
> myProgram.scala:383 []
> // |  CachedPartitions: 4; MemorySize: 824.0 B; TachyonSize: 0.0 B; 
> DiskSize: 0.0 B
> // |  CheckpointRDD[17] at count at myProgram.scala:394 []
> // this DOES work!
> val myRDD = if (tempRDD2.count() > 0) {
>tempRDD1.
>  map(a => (a._1._1, (a._1._2, a._2._1, a._2._2))).
>  leftOuterJoin(tempRDD2).
>  map(a => (a._2._1._1, (a._1, a._2._1._2, a._2._1._3, 
> a._2._2.getOrElse(1L.
>  leftOuterJoin(tempRDD2).
>  map(a => ((a._2._1._1, a._1), (a._2._1._2, a._2._1._3, a._2._1._4, 
> a._2._2.getOrElse(1L.
>  map(a =>((List(a._1._1, a._1._2).min, List(a._1._1, a._1._2).max), 
> (a._2._1, a._2._2, if (a._1._1 <= a._1._2) { a._2._3 } else { a._2._4 }, if 
> (a._1._1 <= a._1._2) { a._2._4 } else { 

[jira] [Commented] (SPARK-10636) RDD filter does not work after if..then..else RDD blocks

2015-09-16 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14790792#comment-14790792
 ] 

Sean Owen commented on SPARK-10636:
---

It's a Scala syntax issue, as you say when you wondered if you're not using 
if..else in the way you think. Stuff happens, but if that crosses your mind, 
I'd just float a user@ question first or try a simple test in Scala to narrow 
down the behavior in question.

> RDD filter does not work after if..then..else RDD blocks
> 
>
> Key: SPARK-10636
> URL: https://issues.apache.org/jira/browse/SPARK-10636
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
>
> I have an RDD declaration of the following form:
> {code}
> val myRDD = if (some condition) { tempRDD1.some operations } else { 
> tempRDD2.some operations}.filter(a => a._2._5 <= 50)
> {code}
> When I output the contents of myRDD, I found entries that clearly had a._2._5 
> > 50... the filter didn't work!
> If I move the filter inside of the if..then blocks, it suddenly does work:
> {code}
> val myRDD = if (some condition) { tempRDD1.some operations.filter(a => 
> a._2._5 <= 50) } else { tempRDD2.some operations.filter(a => a._2._5 <= 50) }
> {code}
> I ran toDebugString after both of these code examples, and "filter" does 
> appear in the DAG for the second example, but DOES NOT appear in the first 
> DAG.  Why not?
> Am I misusing the if..then..else syntax for Spark/Scala?
> Here is my actual code... ignore the crazy naming conventions and what it's 
> doing...
> {code}
> // this does NOT work
> val myRDD = if (tempRDD2.count() > 0) {
>tempRDD1.
>  map(a => (a._1._1, (a._1._2, a._2._1, a._2._2))).
>  leftOuterJoin(tempRDD2).
>  map(a => (a._2._1._1, (a._1, a._2._1._2, a._2._1._3, 
> a._2._2.getOrElse(1L.
>  leftOuterJoin(tempRDD2).
>  map(a => ((a._2._1._1, a._1), (a._2._1._2, a._2._1._3, a._2._1._4, 
> a._2._2.getOrElse(1L.
>  map(a =>((List(a._1._1, a._1._2).min, List(a._1._1, a._1._2).max), 
> (a._2._1, a._2._2, if (a._1._1 <= a._1._2) { a._2._3 } else { a._2._4 }, if 
> (a._1._1 <= a._1._2) { a._2._4 } else { a._2._3 }, a._2._3 + a._2._4)))
>} else {
>  tempRDD1.map(a => (a._1, (a._2._1, a._2._2, 1L, 1L, 2L)))
>}.
>filter(a => a._2._5 <= 50).
>partitionBy(partitioner).
>setName("myRDD").
>persist(StorageLevel.MEMORY_AND_DISK_SER)
> myRDD.checkpoint()
> println(myRDD.toDebugString)
> // (4) MapPartitionsRDD[58] at map at myProgram.scala:2120 []
> //  |  MapPartitionsRDD[57] at map at myProgram.scala:2119 []
> //  |  MapPartitionsRDD[56] at leftOuterJoin at myProgram.scala:2118 []
> //  |  MapPartitionsRDD[55] at leftOuterJoin at myProgram.scala:2118 []
> //  |  CoGroupedRDD[54] at leftOuterJoin at myProgram.scala:2118 []
> //  +-(4) MapPartitionsRDD[53] at map at myProgram.scala:2117 []
> //  |  |  MapPartitionsRDD[52] at leftOuterJoin at myProgram.scala:2116 []
> //  |  |  MapPartitionsRDD[51] at leftOuterJoin at myProgram.scala:2116 []
> //  |  |  CoGroupedRDD[50] at leftOuterJoin at myProgram.scala:2116 []
> //  |  +-(4) MapPartitionsRDD[49] at map at myProgram.scala:2115 []
> //  |  |  |  clusterGraphWithComponentsRDD MapPartitionsRDD[28] at 
> reduceByKey at myProgram.scala:1689 []
> //  |  |  |  CachedPartitions: 4; MemorySize: 1176.0 B; TachyonSize: 0.0 
> B; DiskSize: 0.0 B
> //  |  |  |  CheckpointRDD[29] at count at myProgram.scala:1701 []
> //  |  +-(4) clusterStatsRDD MapPartitionsRDD[16] at distinct at 
> myProgram.scala:383 []
> //  | |  CachedPartitions: 4; MemorySize: 824.0 B; TachyonSize: 0.0 
> B; DiskSize: 0.0 B
> //  | |  CheckpointRDD[17] at count at myProgram.scala:394 []
> //  +-(4) clusterStatsRDD MapPartitionsRDD[16] at distinct at 
> myProgram.scala:383 []
> // |  CachedPartitions: 4; MemorySize: 824.0 B; TachyonSize: 0.0 B; 
> DiskSize: 0.0 B
> // |  CheckpointRDD[17] at count at myProgram.scala:394 []
> // this DOES work!
> val myRDD = if (tempRDD2.count() > 0) {
>tempRDD1.
>  map(a => (a._1._1, (a._1._2, a._2._1, a._2._2))).
>  leftOuterJoin(tempRDD2).
>  map(a => (a._2._1._1, (a._1, a._2._1._2, a._2._1._3, 
> a._2._2.getOrElse(1L.
>  leftOuterJoin(tempRDD2).
>  map(a => ((a._2._1._1, a._1), (a._2._1._2, a._2._1._3, a._2._1._4, 
> a._2._2.getOrElse(1L.
>  map(a =>((List(a._1._1, a._1._2).min, List(a._1._1, a._1._2).max), 
> (a._2._1, a._2._2, if (a._1._1 <= a._1._2) { a._2._3 } else { a._2._4 }, if 
> (a._1._1 <= a._1._2) { a._2._4 } else { a._2._3 }, a._2._3 + a._2._4))).
>  filter(a => a._2._5 <= 50)
>} else {
>  tempRDD1.map(a => (a._1, (a._2._1, a._2._2, 1L, 1L, 2L))).
>  filter(a => a._2._5 <= 50)
>}.
>partitionBy(partitioner).
>setName("myRDD").
>