Repository: spark
Updated Branches:
  refs/heads/master 07b314a57 -> 1b2c2162a


[STREAMING][MINOR] More contextual information in logs + minor code i…

…mprovements

Please review and merge at your convenience. Thanks!

Author: Jacek Laskowski <ja...@japila.pl>

Closes #10595 from jaceklaskowski/streaming-minor-fixes.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b2c2162
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b2c2162
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b2c2162

Branch: refs/heads/master
Commit: 1b2c2162af4d5d2d950af94571e69273b49bf913
Parents: 07b314a
Author: Jacek Laskowski <ja...@japila.pl>
Authored: Thu Jan 7 21:12:57 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Jan 7 21:12:57 2016 +0000

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   |  2 +-
 .../org/apache/spark/storage/BlockManager.scala |  2 +-
 .../storage/ShuffleBlockFetcherIterator.scala   |  4 +-
 .../spark/network/client/StreamCallback.java    |  4 +-
 .../spark/network/client/TransportClient.java   |  2 +-
 .../apache/spark/network/server/RpcHandler.java |  2 +-
 .../spark/streaming/StreamingContext.scala      | 12 +--
 .../spark/streaming/dstream/DStream.scala       | 86 ++++++++++----------
 .../spark/streaming/dstream/InputDStream.scala  |  3 +-
 .../dstream/ReceiverInputDStream.scala          |  4 +-
 .../receiver/ReceivedBlockHandler.scala         |  2 +-
 .../spark/streaming/receiver/Receiver.scala     |  4 +-
 .../streaming/receiver/ReceiverSupervisor.scala |  8 +-
 .../spark/streaming/scheduler/JobSet.scala      |  8 +-
 14 files changed, 69 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1b2c2162/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 92438ba..6b01a10 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -747,7 +747,7 @@ class DAGScheduler(
   }
 
   /**
-   * Check for waiting or failed stages which are now eligible for 
resubmission.
+   * Check for waiting stages which are now eligible for resubmission.
    * Ordinarily run on every iteration of the event loop.
    */
   private def submitWaitingStages() {

http://git-wip-us.apache.org/repos/asf/spark/blob/1b2c2162/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 5c80ac1..4479e68 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -59,7 +59,7 @@ private[spark] class BlockResult(
  * Manager running on every node (driver and executors) which provides 
interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, 
disk, and off-heap).
  *
- * Note that #initialize() must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable.
  */
 private[spark] class BlockManager(
     executorId: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/1b2c2162/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 0d0448f..037bec1 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -36,7 +36,7 @@ import org.apache.spark.util.Utils
  * This creates an iterator of (BlockID, InputStream) tuples so the caller can 
handle blocks
  * in a pipelined fashion as they are received.
  *
- * The implementation throttles the remote fetches to they don't exceed 
maxBytesInFlight to avoid
+ * The implementation throttles the remote fetches so they don't exceed 
maxBytesInFlight to avoid
  * using too much memory.
  *
  * @param context [[TaskContext]], used for metrics update
@@ -329,7 +329,7 @@ final class ShuffleBlockFetcherIterator(
 }
 
 /**
- * Helper class that ensures a ManagedBuffer is release upon 
InputStream.close()
+ * Helper class that ensures a ManagedBuffer is released upon 
InputStream.close()
  */
 private class BufferReleasingInputStream(
     private val delegate: InputStream,

http://git-wip-us.apache.org/repos/asf/spark/blob/1b2c2162/network/common/src/main/java/org/apache/spark/network/client/StreamCallback.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/client/StreamCallback.java
 
b/network/common/src/main/java/org/apache/spark/network/client/StreamCallback.java
index 51d34ca..29e6a30 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/client/StreamCallback.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/client/StreamCallback.java
@@ -21,8 +21,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 /**
- * Callback for streaming data. Stream data will be offered to the {@link 
onData(String, ByteBuffer)}
- * method as it arrives. Once all the stream data is received, {@link 
onComplete(String)} will be
+ * Callback for streaming data. Stream data will be offered to the {@link 
#onData(String, ByteBuffer)}
+ * method as it arrives. Once all the stream data is received, {@link 
#onComplete(String)} will be
  * called.
  * <p>
  * The network library guarantees that a single thread will call these methods 
at a time, but

http://git-wip-us.apache.org/repos/asf/spark/blob/1b2c2162/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
 
b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
index c49ca4d..e15f096 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -288,7 +288,7 @@ public class TransportClient implements Closeable {
   /**
    * Removes any state associated with the given RPC.
    *
-   * @param requestId The RPC id returned by {@link #sendRpc(byte[], 
RpcResponseCallback)}.
+   * @param requestId The RPC id returned by {@link #sendRpc(ByteBuffer, 
RpcResponseCallback)}.
    */
   public void removeRpcRequest(long requestId) {
     handler.removeRpcRequest(requestId);

http://git-wip-us.apache.org/repos/asf/spark/blob/1b2c2162/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java 
b/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
index c6ed0f4..a99c301 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
@@ -57,7 +57,7 @@ public abstract class RpcHandler {
 
   /**
    * Receives an RPC message that does not expect a reply. The default 
implementation will
-   * call "{@link receive(TransportClient, byte[], RpcResponseCallback)}" and 
log a warning if
+   * call "{@link #receive(TransportClient, ByteBuffer, RpcResponseCallback)}" 
and log a warning if
    * any of the callback methods are called.
    *
    * @param client A channel client which enables the handler to make requests 
back to the sender

http://git-wip-us.apache.org/repos/asf/spark/blob/1b2c2162/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index a5ab666..ca0a21f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -226,7 +226,7 @@ class StreamingContext private[streaming] (
    * Set the context to periodically checkpoint the DStream operations for 
driver
    * fault-tolerance.
    * @param directory HDFS-compatible directory where the checkpoint data will 
be reliably stored.
-   *                  Note that this must be a fault-tolerant file system like 
HDFS for
+   *                  Note that this must be a fault-tolerant file system like 
HDFS.
    */
   def checkpoint(directory: String) {
     if (directory != null) {
@@ -274,7 +274,7 @@ class StreamingContext private[streaming] (
    * Find more details at: 
http://spark.apache.org/docs/latest/streaming-custom-receivers.html
    * @param receiver Custom implementation of Receiver
    *
-   * @deprecated As of 1.0.0", replaced by `receiverStream`.
+   * @deprecated As of 1.0.0 replaced by `receiverStream`.
    */
   @deprecated("Use receiverStream", "1.0.0")
   def networkStream[T: ClassTag](receiver: Receiver[T]): 
ReceiverInputDStream[T] = {
@@ -285,7 +285,7 @@ class StreamingContext private[streaming] (
 
   /**
    * Create an input stream with any arbitrary user implemented receiver.
-   * Find more details at: 
http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+   * Find more details at 
http://spark.apache.org/docs/latest/streaming-custom-receivers.html
    * @param receiver Custom implementation of Receiver
    */
   def receiverStream[T: ClassTag](receiver: Receiver[T]): 
ReceiverInputDStream[T] = {
@@ -549,7 +549,7 @@ class StreamingContext private[streaming] (
 
     // Verify whether the DStream checkpoint is serializable
     if (isCheckpointingEnabled) {
-      val checkpoint = new Checkpoint(this, Time.apply(0))
+      val checkpoint = new Checkpoint(this, Time(0))
       try {
         Checkpoint.serialize(checkpoint, conf)
       } catch {
@@ -575,9 +575,9 @@ class StreamingContext private[streaming] (
    *
    * Return the current state of the context. The context can be in three 
possible states -
    *
-   *  - StreamingContextState.INTIALIZED - The context has been created, but 
not been started yet.
+   *  - StreamingContextState.INITIALIZED - The context has been created, but 
not started yet.
    *    Input DStreams, transformations and output operations can be created 
on the context.
-   *  - StreamingContextState.ACTIVE - The context has been started, and been 
not stopped.
+   *  - StreamingContextState.ACTIVE - The context has been started, and not 
stopped.
    *    Input DStreams, transformations and output operations cannot be 
created on the context.
    *  - StreamingContextState.STOPPED - The context has been stopped and 
cannot be used any more.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/1b2c2162/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index c59348a..1dfb4e7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -103,7 +103,7 @@ abstract class DStream[T: ClassTag] (
   // Reference to whole DStream graph
   private[streaming] var graph: DStreamGraph = null
 
-  private[streaming] def isInitialized = (zeroTime != null)
+  private[streaming] def isInitialized = zeroTime != null
 
   // Duration for which the DStream requires its parent DStream to remember 
each RDD created
   private[streaming] def parentRememberDuration = rememberDuration
@@ -189,15 +189,15 @@ abstract class DStream[T: ClassTag] (
    */
   private[streaming] def initialize(time: Time) {
     if (zeroTime != null && zeroTime != time) {
-      throw new SparkException("ZeroTime is already initialized to " + zeroTime
-        + ", cannot initialize it again to " + time)
+      throw new SparkException(s"ZeroTime is already initialized to $zeroTime"
+        + s", cannot initialize it again to $time")
     }
     zeroTime = time
 
     // Set the checkpoint interval to be slideDuration or 10 seconds, which 
ever is larger
     if (mustCheckpoint && checkpointDuration == null) {
       checkpointDuration = slideDuration * math.ceil(Seconds(10) / 
slideDuration).toInt
-      logInfo("Checkpoint interval automatically set to " + checkpointDuration)
+      logInfo(s"Checkpoint interval automatically set to $checkpointDuration")
     }
 
     // Set the minimum value of the rememberDuration if not already set
@@ -234,7 +234,7 @@ abstract class DStream[T: ClassTag] (
 
     require(
       !mustCheckpoint || checkpointDuration != null,
-      "The checkpoint interval for " + this.getClass.getSimpleName + " has not 
been set." +
+      s"The checkpoint interval for ${this.getClass.getSimpleName} has not 
been set." +
         " Please use DStream.checkpoint() to set the interval."
     )
 
@@ -245,53 +245,53 @@ abstract class DStream[T: ClassTag] (
 
     require(
       checkpointDuration == null || checkpointDuration >= slideDuration,
-      "The checkpoint interval for " + this.getClass.getSimpleName + " has 
been set to " +
-        checkpointDuration + " which is lower than its slide time (" + 
slideDuration + "). " +
-        "Please set it to at least " + slideDuration + "."
+      s"The checkpoint interval for ${this.getClass.getSimpleName} has been 
set to " +
+        s"$checkpointDuration which is lower than its slide time 
($slideDuration). " +
+        s"Please set it to at least $slideDuration."
     )
 
     require(
       checkpointDuration == null || 
checkpointDuration.isMultipleOf(slideDuration),
-      "The checkpoint interval for " + this.getClass.getSimpleName + " has 
been set to " +
-        checkpointDuration + " which not a multiple of its slide time (" + 
slideDuration + "). " +
-        "Please set it to a multiple of " + slideDuration + "."
+      s"The checkpoint interval for ${this.getClass.getSimpleName} has been 
set to " +
+        s" $checkpointDuration which not a multiple of its slide time 
($slideDuration). " +
+        s"Please set it to a multiple of $slideDuration."
     )
 
     require(
       checkpointDuration == null || storageLevel != StorageLevel.NONE,
-      "" + this.getClass.getSimpleName + " has been marked for checkpointing 
but the storage " +
+      s"${this.getClass.getSimpleName} has been marked for checkpointing but 
the storage " +
         "level has not been set to enable persisting. Please use 
DStream.persist() to set the " +
         "storage level to use memory for better checkpointing performance."
     )
 
     require(
       checkpointDuration == null || rememberDuration > checkpointDuration,
-      "The remember duration for " + this.getClass.getSimpleName + " has been 
set to " +
-        rememberDuration + " which is not more than the checkpoint interval (" 
+
-        checkpointDuration + "). Please set it to higher than " + 
checkpointDuration + "."
+      s"The remember duration for ${this.getClass.getSimpleName} has been set 
to " +
+        s" $rememberDuration which is not more than the checkpoint interval" +
+        s" ($checkpointDuration). Please set it to higher than 
$checkpointDuration."
     )
 
     dependencies.foreach(_.validateAtStart())
 
-    logInfo("Slide time = " + slideDuration)
-    logInfo("Storage level = " + storageLevel)
-    logInfo("Checkpoint interval = " + checkpointDuration)
-    logInfo("Remember duration = " + rememberDuration)
-    logInfo("Initialized and validated " + this)
+    logInfo(s"Slide time = $slideDuration")
+    logInfo(s"Storage level = ${storageLevel.description}")
+    logInfo(s"Checkpoint interval = $checkpointDuration")
+    logInfo(s"Remember duration = $rememberDuration")
+    logInfo(s"Initialized and validated $this")
   }
 
   private[streaming] def setContext(s: StreamingContext) {
     if (ssc != null && ssc != s) {
-      throw new SparkException("Context is already set in " + this + ", cannot 
set it again")
+      throw new SparkException(s"Context must not be set again for $this")
     }
     ssc = s
-    logInfo("Set context for " + this)
+    logInfo(s"Set context for $this")
     dependencies.foreach(_.setContext(ssc))
   }
 
   private[streaming] def setGraph(g: DStreamGraph) {
     if (graph != null && graph != g) {
-      throw new SparkException("Graph is already set in " + this + ", cannot 
set it again")
+      throw new SparkException(s"Graph must not be set again for $this")
     }
     graph = g
     dependencies.foreach(_.setGraph(graph))
@@ -300,7 +300,7 @@ abstract class DStream[T: ClassTag] (
   private[streaming] def remember(duration: Duration) {
     if (duration != null && (rememberDuration == null || duration > 
rememberDuration)) {
       rememberDuration = duration
-      logInfo("Duration for remembering RDDs set to " + rememberDuration + " 
for " + this)
+      logInfo(s"Duration for remembering RDDs set to $rememberDuration for 
$this")
     }
     dependencies.foreach(_.remember(parentRememberDuration))
   }
@@ -310,11 +310,11 @@ abstract class DStream[T: ClassTag] (
     if (!isInitialized) {
       throw new SparkException (this + " has not been initialized")
     } else if (time <= zeroTime || ! (time - 
zeroTime).isMultipleOf(slideDuration)) {
-      logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime +
-        " and slideDuration is " + slideDuration + " and difference is " + 
(time - zeroTime))
+      logInfo(s"Time $time is invalid as zeroTime is $zeroTime" +
+        s" , slideDuration is $slideDuration and difference is ${time - 
zeroTime}")
       false
     } else {
-      logDebug("Time " + time + " is valid")
+      logDebug(s"Time $time is valid")
       true
     }
   }
@@ -452,20 +452,20 @@ abstract class DStream[T: ClassTag] (
       oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
     generatedRDDs --= oldRDDs.keys
     if (unpersistData) {
-      logDebug("Unpersisting old RDDs: " + 
oldRDDs.values.map(_.id).mkString(", "))
+      logDebug(s"Unpersisting old RDDs: ${oldRDDs.values.map(_.id).mkString(", 
")}")
       oldRDDs.values.foreach { rdd =>
         rdd.unpersist(false)
         // Explicitly remove blocks of BlockRDD
         rdd match {
           case b: BlockRDD[_] =>
-            logInfo("Removing blocks of RDD " + b + " of time " + time)
+            logInfo(s"Removing blocks of RDD $b of time $time")
             b.removeBlocks()
           case _ =>
         }
       }
     }
-    logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
-      (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
+    logDebug(s"Cleared ${oldRDDs.size} RDDs that were older than " +
+      s"${time - rememberDuration}: ${oldRDDs.keys.mkString(", ")}")
     dependencies.foreach(_.clearMetadata(time))
   }
 
@@ -477,10 +477,10 @@ abstract class DStream[T: ClassTag] (
    * this method to save custom checkpoint data.
    */
   private[streaming] def updateCheckpointData(currentTime: Time) {
-    logDebug("Updating checkpoint data for time " + currentTime)
+    logDebug(s"Updating checkpoint data for time $currentTime")
     checkpointData.update(currentTime)
     dependencies.foreach(_.updateCheckpointData(currentTime))
-    logDebug("Updated checkpoint data for time " + currentTime + ": " + 
checkpointData)
+    logDebug(s"Updated checkpoint data for time $currentTime: $checkpointData")
   }
 
   private[streaming] def clearCheckpointData(time: Time) {
@@ -509,13 +509,13 @@ abstract class DStream[T: ClassTag] (
 
   @throws(classOf[IOException])
   private def writeObject(oos: ObjectOutputStream): Unit = 
Utils.tryOrIOException {
-    logDebug(this.getClass().getSimpleName + ".writeObject used")
+    logDebug(s"${this.getClass().getSimpleName}.writeObject used")
     if (graph != null) {
       graph.synchronized {
         if (graph.checkpointInProgress) {
           oos.defaultWriteObject()
         } else {
-          val msg = "Object of " + this.getClass.getName + " is being 
serialized " +
+          val msg = s"Object of ${this.getClass.getName} is being serialized " 
+
             " possibly as a part of closure of an RDD operation. This is 
because " +
             " the DStream object is being referred to from within the closure. 
" +
             " Please rewrite the RDD operation inside this DStream to avoid 
this. " +
@@ -532,7 +532,7 @@ abstract class DStream[T: ClassTag] (
 
   @throws(classOf[IOException])
   private def readObject(ois: ObjectInputStream): Unit = 
Utils.tryOrIOException {
-    logDebug(this.getClass().getSimpleName + ".readObject used")
+    logDebug(s"${this.getClass().getSimpleName}.readObject used")
     ois.defaultReadObject()
     generatedRDDs = new HashMap[Time, RDD[T]] ()
   }
@@ -756,7 +756,7 @@ abstract class DStream[T: ClassTag] (
         val firstNum = rdd.take(num + 1)
         // scalastyle:off println
         println("-------------------------------------------")
-        println("Time: " + time)
+        println(s"Time: $time")
         println("-------------------------------------------")
         firstNum.take(num).foreach(println)
         if (firstNum.length > num) println("...")
@@ -903,21 +903,19 @@ abstract class DStream[T: ClassTag] (
     val alignedToTime = if ((toTime - zeroTime).isMultipleOf(slideDuration)) {
       toTime
     } else {
-      logWarning("toTime (" + toTime + ") is not a multiple of slideDuration ("
-          + slideDuration + ")")
-        toTime.floor(slideDuration, zeroTime)
+      logWarning(s"toTime ($toTime) is not a multiple of slideDuration 
($slideDuration)")
+      toTime.floor(slideDuration, zeroTime)
     }
 
     val alignedFromTime = if ((fromTime - 
zeroTime).isMultipleOf(slideDuration)) {
       fromTime
     } else {
-      logWarning("fromTime (" + fromTime + ") is not a multiple of 
slideDuration ("
-      + slideDuration + ")")
+      logWarning(s"fromTime ($fromTime) is not a multiple of slideDuration 
($slideDuration)")
       fromTime.floor(slideDuration, zeroTime)
     }
 
-    logInfo("Slicing from " + fromTime + " to " + toTime +
-      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
+    logInfo(s"Slicing from $fromTime to $toTime" +
+      s" (aligned to $alignedFromTime and $alignedToTime)")
 
     alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
       if (time >= zeroTime) getOrCompute(time) else None

http://git-wip-us.apache.org/repos/asf/spark/blob/1b2c2162/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 95994c9..d60f418 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -28,7 +28,8 @@ import org.apache.spark.util.Utils
 
 /**
  * This is the abstract base class for all input streams. This class provides 
methods
- * start() and stop() which is called by Spark Streaming system to start and 
stop receiving data.
+ * start() and stop() which are called by Spark Streaming system to start and 
stop
+ * receiving data, respectively.
  * Input streams that can generate RDDs from new data by running a 
service/thread only on
  * the driver node (that is, without running a receiver on worker nodes), can 
be
  * implemented by directly inheriting this InputDStream. For example,

http://git-wip-us.apache.org/repos/asf/spark/blob/1b2c2162/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index a18551f..565b137 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -32,7 +32,7 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils
  * Abstract class for defining any 
[[org.apache.spark.streaming.dstream.InputDStream]]
  * that has to start a receiver on worker nodes to receive external data.
  * Specific implementations of ReceiverInputDStream must
- * define `the getReceiver()` function that gets the receiver object of type
+ * define [[getReceiver]] function that gets the receiver object of type
  * [[org.apache.spark.streaming.receiver.Receiver]] that will be sent
  * to the workers to receive data.
  * @param ssc_ Streaming context that will execute this input stream
@@ -121,7 +121,7 @@ abstract class ReceiverInputDStream[T: ClassTag](ssc_ : 
StreamingContext)
         }
         if (validBlockIds.size != blockIds.size) {
           logWarning("Some blocks could not be recovered as they were not 
found in memory. " +
-            "To prevent such data loss, enabled Write Ahead Log (see 
programming guide " +
+            "To prevent such data loss, enable Write Ahead Log (see 
programming guide " +
             "for more details.")
         }
         new BlockRDD[T](ssc.sc, validBlockIds)

http://git-wip-us.apache.org/repos/asf/spark/blob/1b2c2162/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 43c605a..faa5aca 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -69,7 +69,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
 
   def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): 
ReceivedBlockStoreResult = {
 
-    var numRecords = None: Option[Long]
+    var numRecords: Option[Long] = None
 
     val putResult: Seq[(BlockId, BlockStatus)] = block match {
       case ArrayBufferBlock(arrayBuffer) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/1b2c2162/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index b081524..639f425 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -103,7 +103,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) 
extends Serializable
 
   /**
    * This method is called by the system when the receiver is stopped. All 
resources
-   * (threads, buffers, etc.) setup in `onStart()` must be cleaned up in this 
method.
+   * (threads, buffers, etc.) set up in `onStart()` must be cleaned up in this 
method.
    */
   def onStop()
 
@@ -273,7 +273,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) 
extends Serializable
   /** Get the attached supervisor. */
   private[streaming] def supervisor: ReceiverSupervisor = {
     assert(_supervisor != null,
-      "A ReceiverSupervisor have not been attached to the receiver yet. Maybe 
you are starting " +
+      "A ReceiverSupervisor has not been attached to the receiver yet. Maybe 
you are starting " +
         "some computation in the receiver before the Receiver.onStart() has 
been called.")
     _supervisor
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1b2c2162/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index c42a9ac..d0195fb 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -143,10 +143,10 @@ private[streaming] abstract class ReceiverSupervisor(
   def startReceiver(): Unit = synchronized {
     try {
       if (onReceiverStart()) {
-        logInfo("Starting receiver")
+        logInfo(s"Starting receiver $streamId")
         receiverState = Started
         receiver.onStart()
-        logInfo("Called receiver onStart")
+        logInfo(s"Called receiver $streamId onStart")
       } else {
         // The driver refused us
         stop("Registered unsuccessfully because Driver refused to start 
receiver " + streamId, None)
@@ -218,11 +218,9 @@ private[streaming] abstract class ReceiverSupervisor(
     stopLatch.await()
     if (stoppingError != null) {
       logError("Stopped receiver with error: " + stoppingError)
+      throw stoppingError
     } else {
       logInfo("Stopped receiver without error")
     }
-    if (stoppingError != null) {
-      throw stoppingError
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1b2c2162/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index f763003..6e7232a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -59,17 +59,15 @@ case class JobSet(
 
   // Time taken to process all the jobs from the time they were submitted
   // (i.e. including the time they wait in the streaming scheduler queue)
-  def totalDelay: Long = {
-    processingEndTime - time.milliseconds
-  }
+  def totalDelay: Long = processingEndTime - time.milliseconds
 
   def toBatchInfo: BatchInfo = {
     BatchInfo(
       time,
       streamIdToInputInfo,
       submissionTime,
-      if (processingStartTime >= 0) Some(processingStartTime) else None,
-      if (processingEndTime >= 0) Some(processingEndTime) else None,
+      if (hasStarted) Some(processingStartTime) else None,
+      if (hasCompleted) Some(processingEndTime) else None,
       jobs.map { job => (job.outputOpId, job.toOutputOperationInfo) }.toMap
     )
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to