[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

2017-11-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17936


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

2017-05-19 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17936#discussion_r117432268
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
@@ -71,9 +72,92 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[(T, U)] = {
+val blockManager = SparkEnv.get.blockManager
 val currSplit = split.asInstanceOf[CartesianPartition]
-for (x <- rdd1.iterator(currSplit.s1, context);
- y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
+val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
+var cachedInLocal = false
+var holdReadLock = false
+
+// Try to get data from the local, otherwise it will be cached to the 
local.
+def getOrElseCache(
--- End diff --

Ok, I will change it too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

2017-05-19 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17936#discussion_r117429928
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
@@ -71,9 +72,92 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[(T, U)] = {
+val blockManager = SparkEnv.get.blockManager
 val currSplit = split.asInstanceOf[CartesianPartition]
-for (x <- rdd1.iterator(currSplit.s1, context);
- y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
+val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
+var cachedInLocal = false
+var holdReadLock = false
+
+// Try to get data from the local, otherwise it will be cached to the 
local.
+def getOrElseCache(
--- End diff --

Btw, can we move those functions out of `compute`? Too many nested 
functions here and making `compute` too big.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

2017-05-19 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17936#discussion_r117427240
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
@@ -71,9 +72,92 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[(T, U)] = {
+val blockManager = SparkEnv.get.blockManager
 val currSplit = split.asInstanceOf[CartesianPartition]
-for (x <- rdd1.iterator(currSplit.s1, context);
- y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
+val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
+var cachedInLocal = false
+var holdReadLock = false
+
+// Try to get data from the local, otherwise it will be cached to the 
local.
+def getOrElseCache(
+rdd: RDD[U],
+partition: Partition,
+context: TaskContext,
+level: StorageLevel): Iterator[U] = {
+  getLocalValues() match {
+case Some(result) =>
+  return result
+case None => if (holdReadLock) {
+  throw new SparkException(s"get() failed for block $blockId2 even 
though we held a lock")
+}
+  }
+
+  val iterator = rdd.iterator(partition, context)
+  if (rdd.getStorageLevel != StorageLevel.NONE || 
rdd.isCheckpointedAndMaterialized) {
+// If the block is cached in local, wo shouldn't cache it again.
--- End diff --

Ok, I'll change it, thanks very much.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

2017-05-19 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17936#discussion_r117426327
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
@@ -71,9 +72,92 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[(T, U)] = {
+val blockManager = SparkEnv.get.blockManager
 val currSplit = split.asInstanceOf[CartesianPartition]
-for (x <- rdd1.iterator(currSplit.s1, context);
- y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
+val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
+var cachedInLocal = false
+var holdReadLock = false
+
+// Try to get data from the local, otherwise it will be cached to the 
local.
+def getOrElseCache(
+rdd: RDD[U],
+partition: Partition,
+context: TaskContext,
+level: StorageLevel): Iterator[U] = {
+  getLocalValues() match {
+case Some(result) =>
+  return result
+case None => if (holdReadLock) {
+  throw new SparkException(s"get() failed for block $blockId2 even 
though we held a lock")
+}
+  }
+
+  val iterator = rdd.iterator(partition, context)
+  if (rdd.getStorageLevel != StorageLevel.NONE || 
rdd.isCheckpointedAndMaterialized) {
+// If the block is cached in local, wo shouldn't cache it again.
--- End diff --

No. I mean here if `getStorageLevel != StorageLevel.NONE`, you assume the 
block is cached and return the iterator. However, the caching can be failed and 
you just return the computed iterator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

2017-05-19 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17936#discussion_r117424810
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
@@ -71,9 +72,92 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[(T, U)] = {
+val blockManager = SparkEnv.get.blockManager
 val currSplit = split.asInstanceOf[CartesianPartition]
-for (x <- rdd1.iterator(currSplit.s1, context);
- y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
+val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
+var cachedInLocal = false
+var holdReadLock = false
+
+// Try to get data from the local, otherwise it will be cached to the 
local.
+def getOrElseCache(
+rdd: RDD[U],
+partition: Partition,
+context: TaskContext,
+level: StorageLevel): Iterator[U] = {
+  getLocalValues() match {
+case Some(result) =>
+  return result
+case None => if (holdReadLock) {
+  throw new SparkException(s"get() failed for block $blockId2 even 
though we held a lock")
+}
+  }
+
+  val iterator = rdd.iterator(partition, context)
+  if (rdd.getStorageLevel != StorageLevel.NONE || 
rdd.isCheckpointedAndMaterialized) {
+// If the block is cached in local, wo shouldn't cache it again.
--- End diff --

`getOrElseUpdate` doesn't guarantee the block can be successfully cached. 
It can be failed to cache it. In this case, it simply returns 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

2017-05-19 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17936#discussion_r117425810
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
@@ -71,9 +72,92 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[(T, U)] = {
+val blockManager = SparkEnv.get.blockManager
 val currSplit = split.asInstanceOf[CartesianPartition]
-for (x <- rdd1.iterator(currSplit.s1, context);
- y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
+val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
+var cachedInLocal = false
+var holdReadLock = false
+
+// Try to get data from the local, otherwise it will be cached to the 
local.
+def getOrElseCache(
+rdd: RDD[U],
+partition: Partition,
+context: TaskContext,
+level: StorageLevel): Iterator[U] = {
+  getLocalValues() match {
+case Some(result) =>
+  return result
+case None => if (holdReadLock) {
+  throw new SparkException(s"get() failed for block $blockId2 even 
though we held a lock")
+}
+  }
+
+  val iterator = rdd.iterator(partition, context)
+  if (rdd.getStorageLevel != StorageLevel.NONE || 
rdd.isCheckpointedAndMaterialized) {
+// If the block is cached in local, wo shouldn't cache it again.
--- End diff --

Yeah, but if it isn't cached in local, the next loop will try call the 
iterator again, then we will call the `getOrElse`. You means we should check if 
it is cached, try cached it again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

2017-05-19 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17936#discussion_r117424223
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
@@ -71,9 +72,92 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[(T, U)] = {
+val blockManager = SparkEnv.get.blockManager
 val currSplit = split.asInstanceOf[CartesianPartition]
-for (x <- rdd1.iterator(currSplit.s1, context);
- y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
+val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
+var cachedInLocal = false
+var holdReadLock = false
+
+// Try to get data from the local, otherwise it will be cached to the 
local.
+def getOrElseCache(
+rdd: RDD[U],
+partition: Partition,
+context: TaskContext,
+level: StorageLevel): Iterator[U] = {
+  getLocalValues() match {
+case Some(result) =>
+  return result
+case None => if (holdReadLock) {
+  throw new SparkException(s"get() failed for block $blockId2 even 
though we held a lock")
+}
+  }
+
+  val iterator = rdd.iterator(partition, context)
+  if (rdd.getStorageLevel != StorageLevel.NONE || 
rdd.isCheckpointedAndMaterialized) {
+// If the block is cached in local, wo shouldn't cache it again.
--- End diff --

@viirya Thanks for review. If the storeage level of rdd2 is not 
`StorageLevel.NONE`, it will cached by in the method `RDD.getOrCompute`. So I 
think we should cache it again, because the `blockManger.getOrElseUpdate` call 
the same method as `blockManager.putIterator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

2017-05-19 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17936#discussion_r117423372
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
@@ -71,9 +72,92 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[(T, U)] = {
+val blockManager = SparkEnv.get.blockManager
 val currSplit = split.asInstanceOf[CartesianPartition]
-for (x <- rdd1.iterator(currSplit.s1, context);
- y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
+val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
+var cachedInLocal = false
+var holdReadLock = false
+
+// Try to get data from the local, otherwise it will be cached to the 
local.
+def getOrElseCache(
+rdd: RDD[U],
+partition: Partition,
+context: TaskContext,
+level: StorageLevel): Iterator[U] = {
+  getLocalValues() match {
+case Some(result) =>
+  return result
+case None => if (holdReadLock) {
+  throw new SparkException(s"get() failed for block $blockId2 even 
though we held a lock")
+}
+  }
+
+  val iterator = rdd.iterator(partition, context)
+  if (rdd.getStorageLevel != StorageLevel.NONE || 
rdd.isCheckpointedAndMaterialized) {
+// If the block is cached in local, wo shouldn't cache it again.
--- End diff --

Even `getStorageLevel` is not `StorageLevel.NONE`, we still can't guarantee 
the block can be successfully cached.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

2017-05-18 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17936#discussion_r117393923
  
--- Diff: 
core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala ---
@@ -198,8 +198,12 @@ class InputOutputMetricsSuite extends SparkFunSuite 
with SharedSparkContext
 // write files to disk so we can read them later.
 sc.parallelize(cartVector).saveAsTextFile(cartFilePath)
 val aRdd = sc.textFile(cartFilePath, numPartitions)
+aRdd.cache()
+aRdd.count()
--- End diff --

There is a very strange mistake. If we cache both `aRdd` & `tmpRdd`,  this 
pr and master branch all pasted the test. But if we just cache the `tmpRdd`, 
both the branch are failed. So here are temporarily set to cache. I will look 
at the details of the problem, it may be a bug, if I understand the wrong me, 
please pointer me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

2017-05-18 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17936#discussion_r117393634
  
--- Diff: 
core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala ---
@@ -198,8 +198,12 @@ class InputOutputMetricsSuite extends SparkFunSuite 
with SharedSparkContext
 // write files to disk so we can read them later.
 sc.parallelize(cartVector).saveAsTextFile(cartFilePath)
 val aRdd = sc.textFile(cartFilePath, numPartitions)
+aRdd.cache()
+aRdd.count()
 
 val tmpRdd = sc.textFile(tmpFilePath, numPartitions)
+tmpRdd.cache()
+tmpRdd.count()
--- End diff --

Because we cache the rdd in the CartesianRDD compute method, so there we 
should count the bytes read from memory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

2017-05-17 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/17936#discussion_r116959739
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
@@ -71,9 +72,85 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[(T, U)] = {
+val blockManager = SparkEnv.get.blockManager
 val currSplit = split.asInstanceOf[CartesianPartition]
-for (x <- rdd1.iterator(currSplit.s1, context);
- y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
+val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
+var cachedInLocal = false
+var holdReadLock = false
+
+// Try to get data from the local, otherwise it will be cached to the 
local.
+def getOrElseCache(
+rdd: RDD[U],
+partition: Partition,
+context: TaskContext,
+level: StorageLevel): Iterator[U] = {
+  getLocalValues() match {
+case Some(result) =>
+  return result
+case None => // do nothing
+  }
+
+  val iterator = rdd.iterator(partition, context)
+  // Keep read lock, because next we need read it. And don't tell 
master.
+  blockManager.putIterator[U](blockId2, iterator, level, false, true) 
match {
+case true =>
--- End diff --

BTW I think match is confusing overkill for booleans. Just use if-else


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org