Repository: spark Updated Branches: refs/heads/master a59419c27 -> 2a5161708
SPARK-1205: Clean up callSite/origin/generator. This patch removes the `generator` field and simplifies + documents the tracking of callsites. There are two places where we care about call sites, when a job is run and when an RDD is created. This patch retains both of those features but does a slight refactoring and renaming to make things less confusing. There was another feature of an rdd called the `generator` which was by default the user class that in which the RDD was created. This is used exclusively in the JobLogger. It been subsumed by the ability to name a job group. The job logger can later be refectored to read the job group directly (will require some work) but for now this just preserves the default logged value of the user class. I'm not sure any users ever used the ability to override this. Author: Patrick Wendell <pwend...@gmail.com> Closes #106 from pwendell/callsite and squashes the following commits: fc1d009 [Patrick Wendell] Compile fix e17fb76 [Patrick Wendell] Review feedback: callSite -> creationSite 62e77ef [Patrick Wendell] Review feedback 576e60b [Patrick Wendell] SPARK-1205: Clean up callSite/origin/generator. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a516170 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a516170 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a516170 Branch: refs/heads/master Commit: 2a5161708f4d2f743c7bd69ed3d98bb7bff46460 Parents: a59419c Author: Patrick Wendell <pwend...@gmail.com> Authored: Mon Mar 10 16:28:41 2014 -0700 Committer: Patrick Wendell <pwend...@gmail.com> Committed: Mon Mar 10 16:28:41 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 11 +++++------ .../scala/org/apache/spark/api/java/JavaRDD.scala | 2 -- .../org/apache/spark/api/java/JavaRDDLike.scala | 5 ----- .../src/main/scala/org/apache/spark/rdd/RDD.scala | 18 ++++-------------- .../org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../org/apache/spark/scheduler/JobLogger.scala | 10 +++------- .../scala/org/apache/spark/scheduler/Stage.scala | 2 +- .../main/scala/org/apache/spark/util/Utils.scala | 4 ++-- 8 files changed, 16 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2a516170/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cdc0e5a..745e3fa 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -832,13 +832,12 @@ class SparkContext( setLocalProperty("externalCallSite", null) } + /** + * Capture the current user callsite and return a formatted version for printing. If the user + * has overridden the call site, this will return the user's version. + */ private[spark] def getCallSite(): String = { - val callSite = getLocalProperty("externalCallSite") - if (callSite == null) { - Utils.formatSparkCallSite - } else { - callSite - } + Option(getLocalProperty("externalCallSite")).getOrElse(Utils.formatCallSiteInfo()) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/2a516170/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 91bf404..01d9357 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -135,8 +135,6 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] = wrapRDD(rdd.subtract(other, p)) - def generator: String = rdd.generator - override def toString = rdd.toString /** Assign a name to this RDD */ http://git-wip-us.apache.org/repos/asf/spark/blob/2a516170/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index af0114b..a89419b 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -19,7 +19,6 @@ package org.apache.spark.api.java import java.util.{Comparator, List => JList} -import scala.Tuple2 import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -500,8 +499,4 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def name(): String = rdd.name - /** Reset generator */ - def setGenerator(_generator: String) = { - rdd.setGenerator(_generator) - } } http://git-wip-us.apache.org/repos/asf/spark/blob/2a516170/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 3fe5696..4afa752 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -126,14 +126,6 @@ abstract class RDD[T: ClassTag]( this } - /** User-defined generator of this RDD*/ - @transient var generator = Utils.getCallSiteInfo.firstUserClass - - /** Reset generator*/ - def setGenerator(_generator: String) = { - generator = _generator - } - /** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. This can only be used to assign a new storage level if the RDD does not @@ -1031,8 +1023,9 @@ abstract class RDD[T: ClassTag]( private var storageLevel: StorageLevel = StorageLevel.NONE - /** Record user function generating this RDD. */ - @transient private[spark] val origin = sc.getCallSite() + /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ + @transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo + private[spark] def getCreationSite = Utils.formatCallSiteInfo(creationSiteInfo) private[spark] def elementClassTag: ClassTag[T] = classTag[T] @@ -1095,10 +1088,7 @@ abstract class RDD[T: ClassTag]( } override def toString: String = "%s%s[%d] at %s".format( - Option(name).map(_ + " ").getOrElse(""), - getClass.getSimpleName, - id, - origin) + Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCreationSite) def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) http://git-wip-us.apache.org/repos/asf/spark/blob/2a516170/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index dc5b25d..d83d034 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -279,7 +279,7 @@ class DAGScheduler( } else { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown - logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")") + logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size) } stage http://git-wip-us.apache.org/repos/asf/spark/blob/2a516170/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 80f9ec7..01cbcc3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -213,14 +213,10 @@ class JobLogger(val user: String, val logDirName: String) * @param indent Indent number before info */ protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) { + val cacheStr = if (rdd.getStorageLevel != StorageLevel.NONE) "CACHED" else "NONE" val rddInfo = - if (rdd.getStorageLevel != StorageLevel.NONE) { - "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " + - rdd.origin + " " + rdd.generator - } else { - "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " + - rdd.origin + " " + rdd.generator - } + s"RDD_ID=$rdd.id ${getRddName(rdd)} $cacheStr " + + s"${rdd.getCreationSite} ${rdd.creationSiteInfo.firstUserClass}" jobLogInfo(jobID, indentString(indent) + rddInfo, false) rdd.dependencies.foreach { case shufDep: ShuffleDependency[_, _] => http://git-wip-us.apache.org/repos/asf/spark/blob/2a516170/core/src/main/scala/org/apache/spark/scheduler/Stage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index a78b018..5c1fc30 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -100,7 +100,7 @@ private[spark] class Stage( id } - val name = callSite.getOrElse(rdd.origin) + val name = callSite.getOrElse(rdd.getCreationSite) override def toString = "Stage " + id http://git-wip-us.apache.org/repos/asf/spark/blob/2a516170/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ac376fc..38a275d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -719,8 +719,8 @@ private[spark] object Utils extends Logging { new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass) } - def formatSparkCallSite = { - val callSiteInfo = getCallSiteInfo + /** Returns a printable version of the call site info suitable for logs. */ + def formatCallSiteInfo(callSiteInfo: CallSiteInfo = Utils.getCallSiteInfo) = { "%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile, callSiteInfo.firstUserLine) }