Repository: spark
Updated Branches:
  refs/heads/master a59419c27 -> 2a5161708


SPARK-1205: Clean up callSite/origin/generator.

This patch removes the `generator` field and simplifies + documents
the tracking of callsites.

There are two places where we care about call sites, when a job is
run and when an RDD is created. This patch retains both of those
features but does a slight refactoring and renaming to make things
less confusing.

There was another feature of an rdd called the `generator` which was
by default the user class that in which the RDD was created. This is
used exclusively in the JobLogger. It been subsumed by the ability
to name a job group. The job logger can later be refectored to
read the job group directly (will require some work) but for now
this just preserves the default logged value of the user class.
I'm not sure any users ever used the ability to override this.

Author: Patrick Wendell <pwend...@gmail.com>

Closes #106 from pwendell/callsite and squashes the following commits:

fc1d009 [Patrick Wendell] Compile fix
e17fb76 [Patrick Wendell] Review feedback: callSite -> creationSite
62e77ef [Patrick Wendell] Review feedback
576e60b [Patrick Wendell] SPARK-1205: Clean up callSite/origin/generator.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a516170
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a516170
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a516170

Branch: refs/heads/master
Commit: 2a5161708f4d2f743c7bd69ed3d98bb7bff46460
Parents: a59419c
Author: Patrick Wendell <pwend...@gmail.com>
Authored: Mon Mar 10 16:28:41 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Mon Mar 10 16:28:41 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala     | 11 +++++------
 .../scala/org/apache/spark/api/java/JavaRDD.scala |  2 --
 .../org/apache/spark/api/java/JavaRDDLike.scala   |  5 -----
 .../src/main/scala/org/apache/spark/rdd/RDD.scala | 18 ++++--------------
 .../org/apache/spark/scheduler/DAGScheduler.scala |  2 +-
 .../org/apache/spark/scheduler/JobLogger.scala    | 10 +++-------
 .../scala/org/apache/spark/scheduler/Stage.scala  |  2 +-
 .../main/scala/org/apache/spark/util/Utils.scala  |  4 ++--
 8 files changed, 16 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2a516170/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cdc0e5a..745e3fa 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -832,13 +832,12 @@ class SparkContext(
     setLocalProperty("externalCallSite", null)
   }
 
+  /**
+   * Capture the current user callsite and return a formatted version for 
printing. If the user
+   * has overridden the call site, this will return the user's version.
+   */
   private[spark] def getCallSite(): String = {
-    val callSite = getLocalProperty("externalCallSite")
-    if (callSite == null) {
-      Utils.formatSparkCallSite
-    } else {
-      callSite
-    }
+    
Option(getLocalProperty("externalCallSite")).getOrElse(Utils.formatCallSiteInfo())
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2a516170/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 91bf404..01d9357 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -135,8 +135,6 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: 
ClassTag[T])
   def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
     wrapRDD(rdd.subtract(other, p))
 
-  def generator: String = rdd.generator
-
   override def toString = rdd.toString
 
   /** Assign a name to this RDD */

http://git-wip-us.apache.org/repos/asf/spark/blob/2a516170/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index af0114b..a89419b 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -19,7 +19,6 @@ package org.apache.spark.api.java
 
 import java.util.{Comparator, List => JList}
 
-import scala.Tuple2
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 
@@ -500,8 +499,4 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
 
   def name(): String = rdd.name
 
-  /** Reset generator */
-  def setGenerator(_generator: String) = {
-    rdd.setGenerator(_generator)
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2a516170/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3fe5696..4afa752 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -126,14 +126,6 @@ abstract class RDD[T: ClassTag](
     this
   }
 
-  /** User-defined generator of this RDD*/
-  @transient var generator = Utils.getCallSiteInfo.firstUserClass
-
-  /** Reset generator*/
-  def setGenerator(_generator: String) = {
-    generator = _generator
-  }
-
   /**
    * Set this RDD's storage level to persist its values across operations 
after the first time
    * it is computed. This can only be used to assign a new storage level if 
the RDD does not
@@ -1031,8 +1023,9 @@ abstract class RDD[T: ClassTag](
 
   private var storageLevel: StorageLevel = StorageLevel.NONE
 
-  /** Record user function generating this RDD. */
-  @transient private[spark] val origin = sc.getCallSite()
+  /** User code that created this RDD (e.g. `textFile`, `parallelize`). */
+  @transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo
+  private[spark] def getCreationSite = 
Utils.formatCallSiteInfo(creationSiteInfo)
 
   private[spark] def elementClassTag: ClassTag[T] = classTag[T]
 
@@ -1095,10 +1088,7 @@ abstract class RDD[T: ClassTag](
   }
 
   override def toString: String = "%s%s[%d] at %s".format(
-    Option(name).map(_ + " ").getOrElse(""),
-    getClass.getSimpleName,
-    id,
-    origin)
+    Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, 
getCreationSite)
 
   def toJavaRDD() : JavaRDD[T] = {
     new JavaRDD(this)(elementClassTag)

http://git-wip-us.apache.org/repos/asf/spark/blob/2a516170/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index dc5b25d..d83d034 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -279,7 +279,7 @@ class DAGScheduler(
     } else {
       // Kind of ugly: need to register RDDs with the cache and map output 
tracker here
       // since we can't do it in the RDD constructor because # of partitions 
is unknown
-      logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
+      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
       mapOutputTracker.registerShuffle(shuffleDep.shuffleId, 
rdd.partitions.size)
     }
     stage

http://git-wip-us.apache.org/repos/asf/spark/blob/2a516170/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala 
b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 80f9ec7..01cbcc3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -213,14 +213,10 @@ class JobLogger(val user: String, val logDirName: String)
    * @param indent Indent number before info
    */
   protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
+    val cacheStr = if (rdd.getStorageLevel != StorageLevel.NONE) "CACHED" else 
"NONE"
     val rddInfo =
-      if (rdd.getStorageLevel != StorageLevel.NONE) {
-        "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " +
-                rdd.origin + " " + rdd.generator
-      } else {
-        "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " +
-                rdd.origin + " " + rdd.generator
-      }
+      s"RDD_ID=$rdd.id ${getRddName(rdd)} $cacheStr " +
+      s"${rdd.getCreationSite} ${rdd.creationSiteInfo.firstUserClass}"
     jobLogInfo(jobID, indentString(indent) + rddInfo, false)
     rdd.dependencies.foreach {
       case shufDep: ShuffleDependency[_, _] =>

http://git-wip-us.apache.org/repos/asf/spark/blob/2a516170/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index a78b018..5c1fc30 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -100,7 +100,7 @@ private[spark] class Stage(
     id
   }
 
-  val name = callSite.getOrElse(rdd.origin)
+  val name = callSite.getOrElse(rdd.getCreationSite)
 
   override def toString = "Stage " + id
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2a516170/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index ac376fc..38a275d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -719,8 +719,8 @@ private[spark] object Utils extends Logging {
     new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, 
firstUserClass)
   }
 
-  def formatSparkCallSite = {
-    val callSiteInfo = getCallSiteInfo
+  /** Returns a printable version of the call site info suitable for logs. */
+  def formatCallSiteInfo(callSiteInfo: CallSiteInfo = Utils.getCallSiteInfo) = 
{
     "%s at %s:%s".format(callSiteInfo.lastSparkMethod, 
callSiteInfo.firstUserFile,
                          callSiteInfo.firstUserLine)
   }

Reply via email to