Repository: spark
Updated Branches:
  refs/heads/master 927e22eff -> 74267beb3


[SPARK-13758][STREAMING][CORE] enhance exception message to avoid misleading

We have a recoverable Spark streaming job with checkpoint enabled, it could be 
executed correctly at first time, but throw following exception when restarted 
and recovered from checkpoint.
```
org.apache.spark.SparkException: RDD transformations and actions can only be 
invoked by the driver, not inside of other transformations; for example, 
rdd1.map(x => rdd2.values.count() * x) is invalid because the values 
transformation and count action cannot be performed inside of the rdd1.map 
transformation. For more information, see SPARK-5063.
        at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:352)
        at org.apache.spark.rdd.RDD.union(RDD.scala:565)
        at 
org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:23)
        at 
org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:19)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
```

According to exception, it shows I invoked transformations and actions in other 
transformations, but I did not. The real reason is that I used external RDD in 
DStream operation. External RDD data is not stored in checkpoint, so that 
during recovering, the initial value of _sc in this RDD is assigned to null and 
hit above exception. But you can find the error message is misleading, it 
indicates nothing about the real issue
Here is the code to reproduce it.

```scala
object Repo {

  def createContext(ip: String, port: Int, checkpointDirectory: 
String):StreamingContext = {

    println("Creating new context")
    val sparkConf = new SparkConf().setAppName("Repo").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint(checkpointDirectory)

    var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))

    val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
    words.foreachRDD((rdd: RDD[String]) => {
      val res = rdd.map(word => (word, word.length)).collect()
      println("words: " + res.mkString(", "))

      cached = cached.union(rdd)
      cached.checkpoint()
      println("cached words: " + cached.collect.mkString(", "))
    })
    ssc
  }

  def main(args: Array[String]) {

    val ip = "localhost"
    val port = 9999
    val dir = "/home/maowei/tmp"

    val ssc = StreamingContext.getOrCreate(dir,
      () => {
        createContext(ip, port, dir)
      })
    ssc.start()
    ssc.awaitTermination()
  }
}
```

Author: mwws <wei....@intel.com>

Closes #11595 from mwws/SPARK-MissleadingLog.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74267beb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74267beb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74267beb

Branch: refs/heads/master
Commit: 74267beb3546d316c659499a9ff577437541f072
Parents: 927e22e
Author: mwws <wei....@intel.com>
Authored: Thu Mar 10 15:45:06 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Mar 10 15:45:06 2016 +0000

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/74267beb/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index e5fdebc..8bf4489 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -85,10 +85,14 @@ abstract class RDD[T: ClassTag](
   private def sc: SparkContext = {
     if (_sc == null) {
       throw new SparkException(
-        "RDD transformations and actions can only be invoked by the driver, 
not inside of other " +
-        "transformations; for example, rdd1.map(x => rdd2.values.count() * x) 
is invalid because " +
-        "the values transformation and count action cannot be performed inside 
of the rdd1.map " +
-        "transformation. For more information, see SPARK-5063.")
+        "This RDD lacks a SparkContext. It could happen in the following 
cases: \n(1) RDD " +
+        "transformations and actions are NOT invoked by the driver, but inside 
of other " +
+        "transformations; for example, rdd1.map(x => rdd2.values.count() * x) 
is invalid " +
+        "because the values transformation and count action cannot be 
performed inside of the " +
+        "rdd1.map transformation. For more information, see SPARK-5063.\n(2) 
When a Spark " +
+        "Streaming job recovers from checkpoint, this exception will be hit if 
a reference to " +
+        "an RDD not defined by the streaming job is used in DStream 
operations. For more " +
+        "information, See SPARK-13758.")
     }
     _sc
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to