Repository: spark
Updated Branches:
  refs/heads/master 91a577d27 -> c1bc4f439


[SPARK-10227] fatal warnings with sbt on Scala 2.11

The bulk of the changes are on `transient` annotation on class parameter. Often 
the compiler doesn't generate a field for this parameters, so the the transient 
annotation would be unnecessary.
But if the class parameter are used in methods, then fields are created. So it 
is safer to keep the annotations.

The remainder are some potential bugs, and deprecated syntax.

Author: Luc Bourlier <luc.bourl...@typesafe.com>

Closes #8433 from skyluc/issue/sbt-2.11.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1bc4f43
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1bc4f43
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1bc4f43

Branch: refs/heads/master
Commit: c1bc4f439f54625c01a585691e5293cd9961eb0c
Parents: 91a577d
Author: Luc Bourlier <luc.bourl...@typesafe.com>
Authored: Wed Sep 9 09:57:58 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Sep 9 09:57:58 2015 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulators.scala   |  2 +-
 .../scala/org/apache/spark/Dependency.scala     |  2 +-
 .../scala/org/apache/spark/Partitioner.scala    |  4 +-
 .../org/apache/spark/SparkHadoopWriter.scala    |  2 +-
 .../org/apache/spark/api/python/PythonRDD.scala |  4 +-
 .../apache/spark/input/PortableDataStream.scala |  4 +-
 .../netty/NettyBlockTransferService.scala       |  2 +-
 .../org/apache/spark/rdd/BinaryFileRDD.scala    |  6 +--
 .../scala/org/apache/spark/rdd/BlockRDD.scala   |  4 +-
 .../org/apache/spark/rdd/CartesianRDD.scala     |  4 +-
 .../org/apache/spark/rdd/CheckpointRDD.scala    |  2 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  8 ++--
 .../apache/spark/rdd/LocalCheckpointRDD.scala   |  2 +-
 .../spark/rdd/LocalRDDCheckpointData.scala      |  2 +-
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 18 ++++----
 .../spark/rdd/ParallelCollectionRDD.scala       |  4 +-
 .../apache/spark/rdd/PartitionPruningRDD.scala  |  6 +--
 .../spark/rdd/PartitionwiseSampledRDD.scala     |  4 +-
 .../apache/spark/rdd/RDDCheckpointData.scala    |  2 +-
 .../spark/rdd/ReliableCheckpointRDD.scala       |  2 +-
 .../spark/rdd/ReliableRDDCheckpointData.scala   |  2 +-
 .../org/apache/spark/rdd/SqlNewHadoopRDD.scala  |  6 +--
 .../scala/org/apache/spark/rdd/UnionRDD.scala   |  4 +-
 .../apache/spark/rdd/ZippedPartitionsRDD.scala  |  2 +-
 .../apache/spark/rdd/ZippedWithIndexRDD.scala   |  2 +-
 .../org/apache/spark/rpc/RpcEndpointRef.scala   |  2 +-
 .../org/apache/spark/rpc/akka/AkkaRpcEnv.scala  | 21 ++++++---
 .../org/apache/spark/scheduler/ResultTask.scala |  2 +-
 .../shuffle/unsafe/UnsafeShuffleManager.scala   |  2 +-
 .../org/apache/spark/util/ClosureCleaner.scala  |  2 +-
 .../apache/spark/util/ShutdownHookManager.scala |  4 +-
 .../util/logging/RollingFileAppender.scala      | 46 ++++++++++----------
 .../streaming/flume/FlumeInputDStream.scala     |  2 +-
 .../flume/FlumePollingInputDStream.scala        |  2 +-
 .../kafka/DirectKafkaInputDStream.scala         |  4 +-
 .../streaming/kafka/KafkaInputDStream.scala     |  2 +-
 .../spark/streaming/mqtt/MQTTInputDStream.scala |  2 +-
 .../streaming/twitter/TwitterInputDStream.scala |  2 +-
 .../scala/org/apache/spark/graphx/EdgeRDD.scala |  4 +-
 .../scala/org/apache/spark/graphx/Graph.scala   |  6 +--
 .../org/apache/spark/graphx/VertexRDD.scala     |  4 +-
 .../org/apache/spark/mllib/rdd/RandomRDD.scala  | 12 ++---
 .../org/apache/spark/repl/SparkILoop.scala      |  2 +-
 .../expressions/stringExpressions.scala         |  2 +-
 .../spark/sql/types/AbstractDataType.scala      |  2 +-
 .../execution/datasources/WriterContainer.scala | 10 ++---
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  2 +-
 .../org/apache/spark/sql/hive/TableReader.scala |  8 ++--
 .../hive/execution/ScriptTransformation.scala   |  2 +-
 .../spark/sql/hive/hiveWriterContainers.scala   | 10 ++---
 .../org/apache/spark/streaming/Checkpoint.scala |  2 +-
 .../streaming/api/python/PythonDStream.scala    | 12 ++---
 .../streaming/dstream/FileInputDStream.scala    |  2 +-
 .../spark/streaming/dstream/InputDStream.scala  |  2 +-
 .../dstream/PluggableInputDStream.scala         |  2 +-
 .../streaming/dstream/QueueInputDStream.scala   |  4 +-
 .../streaming/dstream/RawInputDStream.scala     |  2 +-
 .../dstream/ReceiverInputDStream.scala          |  2 +-
 .../streaming/dstream/SocketInputDStream.scala  |  2 +-
 .../rdd/WriteAheadLogBackedBlockRDD.scala       | 22 +++++-----
 60 files changed, 158 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala 
b/core/src/main/scala/org/apache/spark/Accumulators.scala
index c39c866..5592b75 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -47,7 +47,7 @@ import org.apache.spark.util.Utils
  * @tparam T partial data that can be added in
  */
 class Accumulable[R, T] private[spark] (
-    @transient initialValue: R,
+    initialValue: R,
     param: AccumulableParam[R, T],
     val name: Option[String],
     internal: Boolean)

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/Dependency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala 
b/core/src/main/scala/org/apache/spark/Dependency.scala
index fc8cdde..cfeeb39 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -66,7 +66,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends 
Dependency[T] {
  */
 @DeveloperApi
 class ShuffleDependency[K, V, C](
-    @transient _rdd: RDD[_ <: Product2[K, V]],
+    @transient private val _rdd: RDD[_ <: Product2[K, V]],
     val partitioner: Partitioner,
     val serializer: Option[Serializer] = None,
     val keyOrdering: Option[Ordering[K]] = None,

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala 
b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 29e581b..e4df7af 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -104,8 +104,8 @@ class HashPartitioner(partitions: Int) extends Partitioner {
  * the value of `partitions`.
  */
 class RangePartitioner[K : Ordering : ClassTag, V](
-    @transient partitions: Int,
-    @transient rdd: RDD[_ <: Product2[K, V]],
+    partitions: Int,
+    rdd: RDD[_ <: Product2[K, V]],
     private var ascending: Boolean = true)
   extends Partitioner {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala 
b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index f5dd36c..ae5926d 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -37,7 +37,7 @@ import org.apache.spark.util.SerializableJobConf
  * a filename to write to, etc, exactly like in a Hadoop MapReduce job.
  */
 private[spark]
-class SparkHadoopWriter(@transient jobConf: JobConf)
+class SparkHadoopWriter(jobConf: JobConf)
   extends Logging
   with SparkHadoopMapRedUtil
   with Serializable {

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index b4d152b..69da180 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -41,7 +41,7 @@ import org.apache.spark.util.{SerializableConfiguration, 
Utils}
 import scala.util.control.NonFatal
 
 private[spark] class PythonRDD(
-    @transient parent: RDD[_],
+    parent: RDD[_],
     command: Array[Byte],
     envVars: JMap[String, String],
     pythonIncludes: JList[String],
@@ -785,7 +785,7 @@ class BytesToString extends 
org.apache.spark.api.java.function.Function[Array[By
  * Internal class that acts as an `AccumulatorParam` for Python accumulators. 
Inside, it
  * collects a list of pickled strings that we pass to Python through a socket.
  */
-private class PythonAccumulatorParam(@transient serverHost: String, 
serverPort: Int)
+private class PythonAccumulatorParam(@transient private val serverHost: 
String, serverPort: Int)
   extends AccumulatorParam[JList[Array[Byte]]] {
 
   Utils.checkHost(serverHost, "Expected hostname")

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala 
b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index a5ad472..e2ffc3b 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -131,8 +131,8 @@ private[spark] class StreamInputFormat extends 
StreamFileInputFormat[PortableDat
  */
 @Experimental
 class PortableDataStream(
-    @transient isplit: CombineFileSplit,
-    @transient context: TaskAttemptContext,
+    isplit: CombineFileSplit,
+    context: TaskAttemptContext,
     index: Integer)
   extends Serializable {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 4b851bc..70a42f9 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -137,7 +137,7 @@ class NettyBlockTransferService(conf: SparkConf, 
securityManager: SecurityManage
       new RpcResponseCallback {
         override def onSuccess(response: Array[Byte]): Unit = {
           logTrace(s"Successfully uploaded block $blockId")
-          result.success()
+          result.success((): Unit)
         }
         override def onFailure(e: Throwable): Unit = {
           logError(s"Error while uploading block $blockId", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index 1f755db..6fec00d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -28,7 +28,7 @@ private[spark] class BinaryFileRDD[T](
     inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
     keyClass: Class[String],
     valueClass: Class[T],
-    @transient conf: Configuration,
+    conf: Configuration,
     minPartitions: Int)
   extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, 
conf) {
 
@@ -36,10 +36,10 @@ private[spark] class BinaryFileRDD[T](
     val inputFormat = inputFormatClass.newInstance
     inputFormat match {
       case configurable: Configurable =>
-        configurable.setConf(conf)
+        configurable.setConf(getConf)
       case _ =>
     }
-    val jobContext = newJobContext(conf, jobId)
+    val jobContext = newJobContext(getConf, jobId)
     inputFormat.setMinPartitions(jobContext, minPartitions)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
     val result = new Array[Partition](rawSplits.size)

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 9220302..fc1710f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -28,7 +28,7 @@ private[spark] class BlockRDDPartition(val blockId: BlockId, 
idx: Int) extends P
 }
 
 private[spark]
-class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val 
blockIds: Array[BlockId])
+class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: 
Array[BlockId])
   extends RDD[T](sc, Nil) {
 
   @transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, 
SparkEnv.get)
@@ -64,7 +64,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, 
@transient val blockIds
    */
   private[spark] def removeBlocks() {
     blockIds.foreach { blockId =>
-      sc.env.blockManager.master.removeBlock(blockId)
+      sparkContext.env.blockManager.master.removeBlock(blockId)
     }
     _isValid = false
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
index c1d6971..18e8cdd 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
@@ -27,8 +27,8 @@ import org.apache.spark.util.Utils
 private[spark]
 class CartesianPartition(
     idx: Int,
-    @transient rdd1: RDD[_],
-    @transient rdd2: RDD[_],
+    @transient private val rdd1: RDD[_],
+    @transient private val rdd2: RDD[_],
     s1Index: Int,
     s2Index: Int
   ) extends Partition {

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 72fe215..b036462 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -29,7 +29,7 @@ private[spark] class CheckpointRDDPartition(val index: Int) 
extends Partition
 /**
  * An RDD that recovers checkpointed data from storage.
  */
-private[spark] abstract class CheckpointRDD[T: ClassTag](@transient sc: 
SparkContext)
+private[spark] abstract class CheckpointRDD[T: ClassTag](sc: SparkContext)
   extends RDD[T](sc, Nil) {
 
   // CheckpointRDD should not be checkpointed again

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index e1f8719..8f2655d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -51,7 +51,7 @@ import org.apache.spark.storage.StorageLevel
 /**
  * A Spark split class that wraps around a Hadoop InputSplit.
  */
-private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: 
InputSplit)
+private[spark] class HadoopPartition(rddId: Int, idx: Int, s: InputSplit)
   extends Partition {
 
   val inputSplit = new SerializableWritable[InputSplit](s)
@@ -99,7 +99,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, 
@transient s: InputSp
  */
 @DeveloperApi
 class HadoopRDD[K, V](
-    @transient sc: SparkContext,
+    sc: SparkContext,
     broadcastedConf: Broadcast[SerializableConfiguration],
     initLocalJobConfFuncOpt: Option[JobConf => Unit],
     inputFormatClass: Class[_ <: InputFormat[K, V]],
@@ -109,7 +109,7 @@ class HadoopRDD[K, V](
   extends RDD[(K, V)](sc, Nil) with Logging {
 
   if (initLocalJobConfFuncOpt.isDefined) {
-    sc.clean(initLocalJobConfFuncOpt.get)
+    sparkContext.clean(initLocalJobConfFuncOpt.get)
   }
 
   def this(
@@ -137,7 +137,7 @@ class HadoopRDD[K, V](
   // used to build JobTracker ID
   private val createTime = new Date()
 
-  private val shouldCloneJobConf = 
sc.conf.getBoolean("spark.hadoop.cloneConf", false)
+  private val shouldCloneJobConf = 
sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
 
   // Returns a JobConf that will be used on slaves to obtain input splits for 
Hadoop reads.
   protected def getJobConf(): JobConf = {

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
index daa5779..bfe1919 100644
--- a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
@@ -35,7 +35,7 @@ import org.apache.spark.storage.RDDBlockId
  * @param numPartitions the number of partitions in the checkpointed RDD
  */
 private[spark] class LocalCheckpointRDD[T: ClassTag](
-    @transient sc: SparkContext,
+    sc: SparkContext,
     rddId: Int,
     numPartitions: Int)
   extends CheckpointRDD[T](sc) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala 
b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
index d6fad89..c115e0f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
  * is written to the local, ephemeral block storage that lives in each 
executor. This is useful
  * for use cases where RDDs build up long lineages that need to be truncated 
often (e.g. GraphX).
  */
-private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient rdd: 
RDD[T])
+private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient private 
val rdd: RDD[T])
   extends RDDCheckpointData[T](rdd) with Logging {
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 6a9c004..174979a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -40,7 +40,7 @@ import org.apache.spark.storage.StorageLevel
 private[spark] class NewHadoopPartition(
     rddId: Int,
     val index: Int,
-    @transient rawSplit: InputSplit with Writable)
+    rawSplit: InputSplit with Writable)
   extends Partition {
 
   val serializableHadoopSplit = new SerializableWritable(rawSplit)
@@ -68,14 +68,14 @@ class NewHadoopRDD[K, V](
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
     valueClass: Class[V],
-    @transient conf: Configuration)
+    @transient private val _conf: Configuration)
   extends RDD[(K, V)](sc, Nil)
   with SparkHadoopMapReduceUtil
   with Logging {
 
   // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
-  private val confBroadcast = sc.broadcast(new SerializableConfiguration(conf))
-  // private val serializableConf = new SerializableWritable(conf)
+  private val confBroadcast = sc.broadcast(new 
SerializableConfiguration(_conf))
+  // private val serializableConf = new SerializableWritable(_conf)
 
   private val jobTrackerId: String = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
@@ -88,10 +88,10 @@ class NewHadoopRDD[K, V](
     val inputFormat = inputFormatClass.newInstance
     inputFormat match {
       case configurable: Configurable =>
-        configurable.setConf(conf)
+        configurable.setConf(_conf)
       case _ =>
     }
-    val jobContext = newJobContext(conf, jobId)
+    val jobContext = newJobContext(_conf, jobId)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
     val result = new Array[Partition](rawSplits.size)
     for (i <- 0 until rawSplits.size) {
@@ -262,7 +262,7 @@ private[spark] class WholeTextFileRDD(
     inputFormatClass: Class[_ <: WholeTextFileInputFormat],
     keyClass: Class[String],
     valueClass: Class[String],
-    @transient conf: Configuration,
+    conf: Configuration,
     minPartitions: Int)
   extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, 
valueClass, conf) {
 
@@ -270,10 +270,10 @@ private[spark] class WholeTextFileRDD(
     val inputFormat = inputFormatClass.newInstance
     inputFormat match {
       case configurable: Configurable =>
-        configurable.setConf(conf)
+        configurable.setConf(getConf)
       case _ =>
     }
-    val jobContext = newJobContext(conf, jobId)
+    val jobContext = newJobContext(getConf, jobId)
     inputFormat.setMinPartitions(jobContext, minPartitions)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
     val result = new Array[Partition](rawSplits.size)

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index e2394e2..582fa93 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -83,8 +83,8 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
 }
 
 private[spark] class ParallelCollectionRDD[T: ClassTag](
-    @transient sc: SparkContext,
-    @transient data: Seq[T],
+    sc: SparkContext,
+    @transient private val data: Seq[T],
     numSlices: Int,
     locationPrefs: Map[Int, Seq[String]])
     extends RDD[T](sc, Nil) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
index a00f4c1..d6a37e8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
@@ -32,7 +32,7 @@ private[spark] class PartitionPruningRDDPartition(idx: Int, 
val parentSplit: Par
  * Represents a dependency between the PartitionPruningRDD and its parent. In 
this
  * case, the child RDD contains a subset of partitions of the parents'.
  */
-private[spark] class PruneDependency[T](rdd: RDD[T], @transient 
partitionFilterFunc: Int => Boolean)
+private[spark] class PruneDependency[T](rdd: RDD[T], partitionFilterFunc: Int 
=> Boolean)
   extends NarrowDependency[T](rdd) {
 
   @transient
@@ -55,8 +55,8 @@ private[spark] class PruneDependency[T](rdd: RDD[T], 
@transient partitionFilterF
  */
 @DeveloperApi
 class PartitionPruningRDD[T: ClassTag](
-    @transient prev: RDD[T],
-    @transient partitionFilterFunc: Int => Boolean)
+    prev: RDD[T],
+    partitionFilterFunc: Int => Boolean)
   extends RDD[T](prev.context, List(new PruneDependency(prev, 
partitionFilterFunc))) {
 
   override def compute(split: Partition, context: TaskContext): Iterator[T] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
index a637d6f..3b1acac 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
@@ -47,8 +47,8 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, 
val seed: Long)
 private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
     prev: RDD[T],
     sampler: RandomSampler[T, U],
-    @transient preservesPartitioning: Boolean,
-    @transient seed: Long = Utils.random.nextLong)
+    preservesPartitioning: Boolean,
+    @transient private val seed: Long = Utils.random.nextLong)
   extends RDD[U](prev) {
 
   @transient override val partitioner = if (preservesPartitioning) 
prev.partitioner else None

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index 0e43520..429514b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -36,7 +36,7 @@ private[spark] object CheckpointState extends Enumeration {
  * as well as, manages the post-checkpoint state by providing the updated 
partitions,
  * iterator and preferred locations of the checkpointed RDD.
  */
-private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient rdd: 
RDD[T])
+private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient 
private val rdd: RDD[T])
   extends Serializable {
 
   import CheckpointState._

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index 35d8b0b..1c3b5da 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.{SerializableConfiguration, 
Utils}
  * An RDD that reads from checkpoint files previously written to reliable 
storage.
  */
 private[spark] class ReliableCheckpointRDD[T: ClassTag](
-    @transient sc: SparkContext,
+    sc: SparkContext,
     val checkpointPath: String)
   extends CheckpointRDD[T](sc) {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala 
b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index 1df8eef..e9f6060 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.SerializableConfiguration
  * An implementation of checkpointing that writes the RDD data to reliable 
storage.
  * This allows drivers to be restarted on failure with previously computed 
state.
  */
-private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient rdd: 
RDD[T])
+private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private 
val rdd: RDD[T])
   extends RDDCheckpointData[T](rdd) with Logging {
 
   // The directory to which the associated RDD has been checkpointed to

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
index fa3fecc..9babe56 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.{SerializableConfiguration, 
ShutdownHookManager, Ut
 private[spark] class SqlNewHadoopPartition(
     rddId: Int,
     val index: Int,
-    @transient rawSplit: InputSplit with Writable)
+    rawSplit: InputSplit with Writable)
   extends SparkPartition {
 
   val serializableHadoopSplit = new SerializableWritable(rawSplit)
@@ -61,9 +61,9 @@ private[spark] class SqlNewHadoopPartition(
  * changes based on [[org.apache.spark.rdd.HadoopRDD]].
  */
 private[spark] class SqlNewHadoopRDD[V: ClassTag](
-    @transient sc : SparkContext,
+    sc : SparkContext,
     broadcastedConf: Broadcast[SerializableConfiguration],
-    @transient initDriverSideJobFuncOpt: Option[Job => Unit],
+    @transient private val initDriverSideJobFuncOpt: Option[Job => Unit],
     initLocalJobFuncOpt: Option[Job => Unit],
     inputFormatClass: Class[_ <: InputFormat[Void, V]],
     valueClass: Class[V])

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 3986645..66cf436 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -37,9 +37,9 @@ import org.apache.spark.util.Utils
  */
 private[spark] class UnionPartition[T: ClassTag](
     idx: Int,
-    @transient rdd: RDD[T],
+    @transient private val rdd: RDD[T],
     val parentRddIndex: Int,
-    @transient parentRddPartitionIndex: Int)
+    @transient private val parentRddPartitionIndex: Int)
   extends Partition {
 
   var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex)

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index b3c6439..70bf04d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -26,7 +26,7 @@ import org.apache.spark.util.Utils
 
 private[spark] class ZippedPartitionsPartition(
     idx: Int,
-    @transient rdds: Seq[RDD[_]],
+    @transient private val rdds: Seq[RDD[_]],
     @transient val preferredLocations: Seq[String])
   extends Partition {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index e277ae2..32931d5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -37,7 +37,7 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val 
startIndex: Long)
  * @tparam T parent RDD item type
  */
 private[spark]
-class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, 
Long)](prev) {
+class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, 
Long)](prev) {
 
   /** The start index of each partition. */
   @transient private val startIndices: Array[Long] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala 
b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
index 7409ac8..f25710b 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -26,7 +26,7 @@ import org.apache.spark.{SparkException, Logging, SparkConf}
 /**
  * A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.
  */
-private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
+private[spark] abstract class RpcEndpointRef(conf: SparkConf)
   extends Serializable with Logging {
 
   private[this] val maxRetries = RpcUtils.numRetries(conf)

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
index fc17542..ad67e1c 100644
--- a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
@@ -87,9 +87,9 @@ private[spark] class AkkaRpcEnv private[akka] (
 
   override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
     @volatile var endpointRef: AkkaRpcEndpointRef = null
-    // Use lazy because the Actor needs to use `endpointRef`.
+    // Use defered function because the Actor needs to use `endpointRef`.
     // So `actorRef` should be created after assigning `endpointRef`.
-    lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+    val actorRef = () => actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
 
       assert(endpointRef != null)
 
@@ -272,13 +272,20 @@ private[akka] class ErrorMonitor extends Actor with 
ActorLogReceive with Logging
 }
 
 private[akka] class AkkaRpcEndpointRef(
-    @transient defaultAddress: RpcAddress,
-    @transient _actorRef: => ActorRef,
-    @transient conf: SparkConf,
-    @transient initInConstructor: Boolean = true)
+    @transient private val defaultAddress: RpcAddress,
+    @transient private val _actorRef: () => ActorRef,
+    conf: SparkConf,
+    initInConstructor: Boolean)
   extends RpcEndpointRef(conf) with Logging {
 
-  lazy val actorRef = _actorRef
+  def this(
+      defaultAddress: RpcAddress,
+      _actorRef: ActorRef,
+      conf: SparkConf) = {
+    this(defaultAddress, () => _actorRef, conf, true)
+  }
+
+  lazy val actorRef = _actorRef()
 
   override lazy val address: RpcAddress = {
     val akkaAddress = actorRef.path.address

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index c4dc080..fb69372 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -44,7 +44,7 @@ private[spark] class ResultTask[T, U](
     stageAttemptId: Int,
     taskBinary: Broadcast[Array[Byte]],
     partition: Partition,
-    @transient locs: Seq[TaskLocation],
+    locs: Seq[TaskLocation],
     val outputId: Int,
     internalAccumulators: Seq[Accumulator[Long]])
   extends Task[U](stageId, stageAttemptId, partition.index, 
internalAccumulators)

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
 
b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
index df7bbd6..75f22f6 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
@@ -159,7 +159,7 @@ private[spark] class UnsafeShuffleManager(conf: SparkConf) 
extends ShuffleManage
       mapId: Int,
       context: TaskContext): ShuffleWriter[K, V] = {
     handle match {
-      case unsafeShuffleHandle: UnsafeShuffleHandle[K, V] =>
+      case unsafeShuffleHandle: UnsafeShuffleHandle[K @unchecked, V 
@unchecked] =>
         numMapsForShufflesThatUsedNewPath.putIfAbsent(handle.shuffleId, 
unsafeShuffleHandle.numMaps)
         val env = SparkEnv.get
         new UnsafeShuffleWriter(

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala 
b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 150d82b..1b49dca 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -94,7 +94,7 @@ private[spark] object ClosureCleaner extends Logging {
     if (cls.isPrimitive) {
       cls match {
         case java.lang.Boolean.TYPE => new java.lang.Boolean(false)
-        case java.lang.Character.TYPE => new java.lang.Character('\0')
+        case java.lang.Character.TYPE => new java.lang.Character('\u0000')
         case java.lang.Void.TYPE =>
           // This should not happen because `Foo(void x) {}` does not compile.
           throw new IllegalStateException("Unexpected void parameter in 
constructor")

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala 
b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 61ff9b8..db4a8b3 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -217,7 +217,9 @@ private [util] class SparkShutdownHookManager {
     }
     Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) 
match {
       case Success(shmClass) =>
-        val fsPriority = 
classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
+        val fsPriority = classOf[FileSystem]
+          .getField("SHUTDOWN_HOOK_PRIORITY")
+          .get(null) // static field, the value is not used
           .asInstanceOf[Int]
         val shm = shmClass.getMethod("get").invoke(null)
         shm.getClass().getMethod("addShutdownHook", classOf[Runnable], 
classOf[Int])

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala 
b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
index 7138b4b..1e8476c 100644
--- 
a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -79,32 +79,30 @@ private[spark] class RollingFileAppender(
     val rolloverSuffix = rollingPolicy.generateRolledOverFileSuffix()
     val rolloverFile = new File(
       activeFile.getParentFile, activeFile.getName + 
rolloverSuffix).getAbsoluteFile
-    try {
-      logDebug(s"Attempting to rollover file $activeFile to file 
$rolloverFile")
-      if (activeFile.exists) {
-        if (!rolloverFile.exists) {
-          Files.move(activeFile, rolloverFile)
-          logInfo(s"Rolled over $activeFile to $rolloverFile")
-        } else {
-          // In case the rollover file name clashes, make a unique file name.
-          // The resultant file names are long and ugly, so this is used only
-          // if there is a name collision. This can be avoided by the using
-          // the right pattern such that name collisions do not occur.
-          var i = 0
-          var altRolloverFile: File = null
-          do {
-            altRolloverFile = new File(activeFile.getParent,
-              s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
-            i += 1
-          } while (i < 10000 && altRolloverFile.exists)
-
-          logWarning(s"Rollover file $rolloverFile already exists, " +
-            s"rolled over $activeFile to file $altRolloverFile")
-          Files.move(activeFile, altRolloverFile)
-        }
+    logDebug(s"Attempting to rollover file $activeFile to file $rolloverFile")
+    if (activeFile.exists) {
+      if (!rolloverFile.exists) {
+        Files.move(activeFile, rolloverFile)
+        logInfo(s"Rolled over $activeFile to $rolloverFile")
       } else {
-        logWarning(s"File $activeFile does not exist")
+        // In case the rollover file name clashes, make a unique file name.
+        // The resultant file names are long and ugly, so this is used only
+        // if there is a name collision. This can be avoided by the using
+        // the right pattern such that name collisions do not occur.
+        var i = 0
+        var altRolloverFile: File = null
+        do {
+          altRolloverFile = new File(activeFile.getParent,
+            s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
+          i += 1
+        } while (i < 10000 && altRolloverFile.exists)
+
+        logWarning(s"Rollover file $rolloverFile already exists, " +
+          s"rolled over $activeFile to file $altRolloverFile")
+        Files.move(activeFile, altRolloverFile)
       }
+    } else {
+      logWarning(s"File $activeFile does not exist")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 2bf99cb..c8780aa 100644
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -43,7 +43,7 @@ import org.jboss.netty.handler.codec.compression._
 
 private[streaming]
 class FlumeInputDStream[T: ClassTag](
-  @transient ssc_ : StreamingContext,
+  ssc_ : StreamingContext,
   host: String,
   port: Int,
   storageLevel: StorageLevel,

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
index 0bc4620..3b936d8 100644
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -46,7 +46,7 @@ import org.apache.spark.streaming.flume.sink._
  * @tparam T Class type of the object of this stream
  */
 private[streaming] class FlumePollingInputDStream[T: ClassTag](
-    @transient _ssc: StreamingContext,
+    _ssc: StreamingContext,
     val addresses: Seq[InetSocketAddress],
     val maxBatchSize: Int,
     val parallelism: Int,

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 1000094..8a08747 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -58,7 +58,7 @@ class DirectKafkaInputDStream[
   U <: Decoder[K]: ClassTag,
   T <: Decoder[V]: ClassTag,
   R: ClassTag](
-    @transient ssc_ : StreamingContext,
+    ssc_ : StreamingContext,
     val kafkaParams: Map[String, String],
     val fromOffsets: Map[TopicAndPartition, Long],
     messageHandler: MessageAndMetadata[K, V] => R
@@ -79,7 +79,7 @@ class DirectKafkaInputDStream[
   override protected[streaming] val rateController: Option[RateController] = {
     if (RateController.isBackPressureEnabled(ssc.conf)) {
       Some(new DirectKafkaRateController(id,
-        RateEstimator.create(ssc.conf, ssc_.graph.batchDuration)))
+        RateEstimator.create(ssc.conf, context.graph.batchDuration)))
     } else {
       None
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 04b2dc1..38730fe 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -48,7 +48,7 @@ class KafkaInputDStream[
   V: ClassTag,
   U <: Decoder[_]: ClassTag,
   T <: Decoder[_]: ClassTag](
-    @transient ssc_ : StreamingContext,
+    ssc_ : StreamingContext,
     kafkaParams: Map[String, String],
     topics: Map[String, Int],
     useReliableReceiver: Boolean,

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
 
b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 7c2f18c..116c170 100644
--- 
a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ 
b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -38,7 +38,7 @@ import org.apache.spark.streaming.receiver.Receiver
 
 private[streaming]
 class MQTTInputDStream(
-    @transient ssc_ : StreamingContext,
+    ssc_ : StreamingContext,
     brokerUrl: String,
     topic: String,
     storageLevel: StorageLevel

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
 
b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 7cf02d8..d7de74b 100644
--- 
a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ 
b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -39,7 +39,7 @@ import org.apache.spark.streaming.receiver.Receiver
 */
 private[streaming]
 class TwitterInputDStream(
-    @transient ssc_ : StreamingContext,
+    ssc_ : StreamingContext,
     twitterAuth: Option[Authorization],
     filters: Seq[String],
     storageLevel: StorageLevel

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 4611a3a..ee7302a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -38,8 +38,8 @@ import org.apache.spark.graphx.impl.EdgeRDDImpl
  * `impl.ReplicatedVertexView`.
  */
 abstract class EdgeRDD[ED](
-    @transient sc: SparkContext,
-    @transient deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) {
+    sc: SparkContext,
+    deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) {
 
   // scalastyle:off structural.type
   private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])] 
forSome { type VD }

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index db73a8a..869caa3 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -46,7 +46,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () 
extends Serializab
    * @note vertex ids are unique.
    * @return an RDD containing the vertices in this graph
    */
-  @transient val vertices: VertexRDD[VD]
+  val vertices: VertexRDD[VD]
 
   /**
    * An RDD containing the edges and their associated attributes.  The entries 
in the RDD contain
@@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () 
extends Serializab
    * along with their vertex data.
    *
    */
-  @transient val edges: EdgeRDD[ED]
+  val edges: EdgeRDD[ED]
 
   /**
    * An RDD containing the edge triplets, which are edges along with the 
vertex data associated with
@@ -77,7 +77,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () 
extends Serializab
    * val numInvalid = graph.triplets.map(e => if (e.src.data == e.dst.data) 1 
else 0).sum
    * }}}
    */
-  @transient val triplets: RDD[EdgeTriplet[VD, ED]]
+  val triplets: RDD[EdgeTriplet[VD, ED]]
 
   /**
    * Caches the vertices and edges associated with this graph at the specified 
storage level,

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index a9f04b5..1ef7a78 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -55,8 +55,8 @@ import org.apache.spark.graphx.impl.VertexRDDImpl
  * @tparam VD the vertex attribute associated with each vertex in the set.
  */
 abstract class VertexRDD[VD](
-    @transient sc: SparkContext,
-    @transient deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps) 
{
+    sc: SparkContext,
+    deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps) {
 
   implicit protected def vdTag: ClassTag[VD]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala
index 910eff9..f8cea7e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala
@@ -35,11 +35,11 @@ private[mllib] class RandomRDDPartition[T](override val 
index: Int,
 }
 
 // These two classes are necessary since Range objects in Scala cannot have 
size > Int.MaxValue
-private[mllib] class RandomRDD[T: ClassTag](@transient sc: SparkContext,
+private[mllib] class RandomRDD[T: ClassTag](sc: SparkContext,
     size: Long,
     numPartitions: Int,
-    @transient rng: RandomDataGenerator[T],
-    @transient seed: Long = Utils.random.nextLong) extends RDD[T](sc, Nil) {
+    @transient private val rng: RandomDataGenerator[T],
+    @transient private val seed: Long = Utils.random.nextLong) extends 
RDD[T](sc, Nil) {
 
   require(size > 0, "Positive RDD size required.")
   require(numPartitions > 0, "Positive number of partitions required")
@@ -56,12 +56,12 @@ private[mllib] class RandomRDD[T: ClassTag](@transient sc: 
SparkContext,
   }
 }
 
-private[mllib] class RandomVectorRDD(@transient sc: SparkContext,
+private[mllib] class RandomVectorRDD(sc: SparkContext,
     size: Long,
     vectorSize: Int,
     numPartitions: Int,
-    @transient rng: RandomDataGenerator[Double],
-    @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, 
Nil) {
+    @transient private val rng: RandomDataGenerator[Double],
+    @transient private val seed: Long = Utils.random.nextLong) extends 
RDD[Vector](sc, Nil) {
 
   require(size > 0, "Positive RDD size required.")
   require(numPartitions > 0, "Positive number of partitions required")

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index bf609ff..33d2625 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -118,5 +118,5 @@ object SparkILoop {
       }
     }
   }
-  def run(lines: List[String]): String = run(lines map (_ + "\n") mkString)
+  def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index 48d02bb..a09d5b6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -255,7 +255,7 @@ object StringTranslate {
     val dict = new HashMap[Character, Character]()
     var i = 0
     while (i < matching.length()) {
-      val rep = if (i < replace.length()) replace.charAt(i) else '\0'
+      val rep = if (i < replace.length()) replace.charAt(i) else '\u0000'
       if (null == dict.get(matching.charAt(i))) {
         dict.put(matching.charAt(i), rep)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala
index e0667c6..1d2d007 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala
@@ -126,7 +126,7 @@ protected[sql] object AnyDataType extends AbstractDataType {
  */
 protected[sql] abstract class AtomicType extends DataType {
   private[sql] type InternalType
-  @transient private[sql] val tag: TypeTag[InternalType]
+  private[sql] val tag: TypeTag[InternalType]
   private[sql] val ordering: Ordering[InternalType]
 
   @transient private[sql] val classTag = ScalaReflectionLock.synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 879fd69..9a573db 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.SerializableConfiguration
 
 private[sql] abstract class BaseWriterContainer(
     @transient val relation: HadoopFsRelation,
-    @transient job: Job,
+    @transient private val job: Job,
     isAppend: Boolean)
   extends SparkHadoopMapReduceUtil
   with Logging
@@ -222,8 +222,8 @@ private[sql] abstract class BaseWriterContainer(
  * A writer that writes all of the rows in a partition to a single file.
  */
 private[sql] class DefaultWriterContainer(
-    @transient relation: HadoopFsRelation,
-    @transient job: Job,
+    relation: HadoopFsRelation,
+    job: Job,
     isAppend: Boolean)
   extends BaseWriterContainer(relation, job, isAppend) {
 
@@ -286,8 +286,8 @@ private[sql] class DefaultWriterContainer(
  * writer externally sorts the remaining rows and then writes out them out one 
file at a time.
  */
 private[sql] class DynamicPartitionWriterContainer(
-    @transient relation: HadoopFsRelation,
-    @transient job: Job,
+    relation: HadoopFsRelation,
+    job: Job,
     partitionColumns: Seq[Attribute],
     dataColumns: Seq[Attribute],
     inputSchema: Seq[Attribute],

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index b8da084..0a5569b 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -767,7 +767,7 @@ private[hive] case class InsertIntoHiveTable(
 private[hive] case class MetastoreRelation
     (databaseName: String, tableName: String, alias: Option[String])
     (val table: HiveTable)
-    (@transient sqlContext: SQLContext)
+    (@transient private val sqlContext: SQLContext)
   extends LeafNode with MultiInstanceRelation with FileRelation {
 
   override def equals(other: Any): Boolean = other match {

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index dc35569..e35468a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -54,10 +54,10 @@ private[hive] sealed trait TableReader {
  */
 private[hive]
 class HadoopTableReader(
-    @transient attributes: Seq[Attribute],
-    @transient relation: MetastoreRelation,
-    @transient sc: HiveContext,
-    @transient hiveExtraConf: HiveConf)
+    @transient private val attributes: Seq[Attribute],
+    @transient private val relation: MetastoreRelation,
+    @transient private val sc: HiveContext,
+    hiveExtraConf: HiveConf)
   extends TableReader with Logging {
 
   // Hadoop honors "mapred.map.tasks" as hint, but will ignore when 
mapred.job.tracker is "local".

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index c7651da..32bddba 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -53,7 +53,7 @@ case class ScriptTransformation(
     script: String,
     output: Seq[Attribute],
     child: SparkPlan,
-    ioschema: HiveScriptIOSchema)(@transient sc: HiveContext)
+    ioschema: HiveScriptIOSchema)(@transient private val sc: HiveContext)
   extends UnaryNode {
 
   override def otherCopyArgs: Seq[HiveContext] = sc :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 8dc796b..29a6f08 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -45,7 +45,7 @@ import org.apache.spark.util.SerializableJobConf
  * It is based on [[SparkHadoopWriter]].
  */
 private[hive] class SparkHiveWriterContainer(
-    @transient jobConf: JobConf,
+    jobConf: JobConf,
     fileSinkConf: FileSinkDesc)
   extends Logging
   with SparkHadoopMapRedUtil
@@ -163,7 +163,7 @@ private[spark] object 
SparkHiveDynamicPartitionWriterContainer {
 }
 
 private[spark] class SparkHiveDynamicPartitionWriterContainer(
-    @transient jobConf: JobConf,
+    jobConf: JobConf,
     fileSinkConf: FileSinkDesc,
     dynamicPartColNames: Array[String])
   extends SparkHiveWriterContainer(jobConf, fileSinkConf) {
@@ -194,10 +194,10 @@ private[spark] class 
SparkHiveDynamicPartitionWriterContainer(
     // Better solution is to add a step similar to what Hive 
FileSinkOperator.jobCloseOp does:
     // calling something like Utilities.mvFileToFinalPath to cleanup the 
output directory and then
     // load it with loadDynamicPartitions/loadPartition/loadTable.
-    val oldMarker = jobConf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
-    jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
+    val oldMarker = conf.value.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, 
true)
+    conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
     super.commitJob()
-    jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
+    conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
   }
 
   override def getLocalFileWriter(row: InternalRow, schema: StructType)

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 27024ec..8a6050f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -32,7 +32,7 @@ import org.apache.spark.streaming.scheduler.JobGenerator
 
 
 private[streaming]
-class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
+class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
   extends Logging with Serializable {
   val master = ssc.sc.master
   val framework = ssc.sc.appName

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index 2c37364..dfc5694 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -170,7 +170,7 @@ private[python] object PythonDStream {
  */
 private[python] abstract class PythonDStream(
     parent: DStream[_],
-    @transient pfunc: PythonTransformFunction)
+    pfunc: PythonTransformFunction)
   extends DStream[Array[Byte]] (parent.ssc) {
 
   val func = new TransformFunction(pfunc)
@@ -187,7 +187,7 @@ private[python] abstract class PythonDStream(
  */
 private[python] class PythonTransformedDStream (
     parent: DStream[_],
-    @transient pfunc: PythonTransformFunction)
+    pfunc: PythonTransformFunction)
   extends PythonDStream(parent, pfunc) {
 
   override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
@@ -206,7 +206,7 @@ private[python] class PythonTransformedDStream (
 private[python] class PythonTransformed2DStream(
     parent: DStream[_],
     parent2: DStream[_],
-    @transient pfunc: PythonTransformFunction)
+    pfunc: PythonTransformFunction)
   extends DStream[Array[Byte]] (parent.ssc) {
 
   val func = new TransformFunction(pfunc)
@@ -230,7 +230,7 @@ private[python] class PythonTransformed2DStream(
  */
 private[python] class PythonStateDStream(
     parent: DStream[Array[Byte]],
-    @transient reduceFunc: PythonTransformFunction)
+    reduceFunc: PythonTransformFunction)
   extends PythonDStream(parent, reduceFunc) {
 
   super.persist(StorageLevel.MEMORY_ONLY)
@@ -252,8 +252,8 @@ private[python] class PythonStateDStream(
  */
 private[python] class PythonReducedWindowedDStream(
     parent: DStream[Array[Byte]],
-    @transient preduceFunc: PythonTransformFunction,
-    @transient pinvReduceFunc: PythonTransformFunction,
+    preduceFunc: PythonTransformFunction,
+    @transient private val pinvReduceFunc: PythonTransformFunction,
     _windowDuration: Duration,
     _slideDuration: Duration)
   extends PythonDStream(parent, preduceFunc) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index c358f5b..40208a6 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -70,7 +70,7 @@ import org.apache.spark.util.{SerializableConfiguration, 
TimeStampedHashMap, Uti
  */
 private[streaming]
 class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
-    @transient ssc_ : StreamingContext,
+    ssc_ : StreamingContext,
     directory: String,
     filter: Path => Boolean = FileInputDStream.defaultFilter,
     newFilesOnly: Boolean = true,

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index a6c4cd2..95994c9 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.Utils
  *
  * @param ssc_ Streaming context that will execute this input stream
  */
-abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
+abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
   extends DStream[T](ssc_) {
 
   private[streaming] var lastValidTime: Time = null

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
index 186e1bf..002aac9 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
@@ -23,7 +23,7 @@ import org.apache.spark.streaming.receiver.Receiver
 
 private[streaming]
 class PluggableInputDStream[T: ClassTag](
-  @transient ssc_ : StreamingContext,
+  ssc_ : StreamingContext,
   receiver: Receiver[T]) extends ReceiverInputDStream[T](ssc_) {
 
   def getReceiver(): Receiver[T] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index bab78a3..a268504 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -27,7 +27,7 @@ import org.apache.spark.streaming.{Time, StreamingContext}
 
 private[streaming]
 class QueueInputDStream[T: ClassTag](
-    @transient ssc: StreamingContext,
+    ssc: StreamingContext,
     val queue: Queue[RDD[T]],
     oneAtATime: Boolean,
     defaultRDD: RDD[T]
@@ -57,7 +57,7 @@ class QueueInputDStream[T: ClassTag](
       if (oneAtATime) {
         Some(buffer.head)
       } else {
-        Some(new UnionRDD(ssc.sc, buffer.toSeq))
+        Some(new UnionRDD(context.sc, buffer.toSeq))
       }
     } else if (defaultRDD != null) {
       Some(defaultRDD)

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
index e2925b9..5a9eda7 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
@@ -39,7 +39,7 @@ import org.apache.spark.streaming.receiver.Receiver
  */
 private[streaming]
 class RawInputDStream[T: ClassTag](
-    @transient ssc_ : StreamingContext,
+    ssc_ : StreamingContext,
     host: String,
     port: Int,
     storageLevel: StorageLevel

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 6c139f3..87c20af 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -38,7 +38,7 @@ import org.apache.spark.streaming.{StreamingContext, Time}
  * @param ssc_ Streaming context that will execute this input stream
  * @tparam T Class type of the object of this stream
  */
-abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : 
StreamingContext)
+abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)
   extends InputDStream[T](ssc_) {
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 5ce5b7a..de84e0c 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -32,7 +32,7 @@ import org.apache.spark.streaming.receiver.Receiver
 
 private[streaming]
 class SocketInputDStream[T: ClassTag](
-    @transient ssc_ : StreamingContext,
+    ssc_ : StreamingContext,
     host: String,
     port: Int,
     bytesToObjects: InputStream => Iterator[T],

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bc4f43/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index e081ffe..f811784 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -61,7 +61,7 @@ class WriteAheadLogBackedBlockRDDPartition(
  *
  *
  * @param sc SparkContext
- * @param blockIds Ids of the blocks that contains this RDD's data
+ * @param _blockIds Ids of the blocks that contains this RDD's data
  * @param walRecordHandles Record handles in write ahead logs that contain 
this RDD's data
  * @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are 
present in the Spark
  *                         executors). If not, then block lookups by the block 
ids will be skipped.
@@ -73,23 +73,23 @@ class WriteAheadLogBackedBlockRDDPartition(
  */
 private[streaming]
 class WriteAheadLogBackedBlockRDD[T: ClassTag](
-    @transient sc: SparkContext,
-    @transient blockIds: Array[BlockId],
+    sc: SparkContext,
+    @transient private val _blockIds: Array[BlockId],
     @transient val walRecordHandles: Array[WriteAheadLogRecordHandle],
-    @transient isBlockIdValid: Array[Boolean] = Array.empty,
+    @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
     storeInBlockManager: Boolean = false,
     storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER)
-  extends BlockRDD[T](sc, blockIds) {
+  extends BlockRDD[T](sc, _blockIds) {
 
   require(
-    blockIds.length == walRecordHandles.length,
-    s"Number of block Ids (${blockIds.length}) must be " +
+    _blockIds.length == walRecordHandles.length,
+    s"Number of block Ids (${_blockIds.length}) must be " +
       s" same as number of WAL record handles (${walRecordHandles.length})")
 
   require(
-    isBlockIdValid.isEmpty || isBlockIdValid.length == blockIds.length,
+    isBlockIdValid.isEmpty || isBlockIdValid.length == _blockIds.length,
     s"Number of elements in isBlockIdValid (${isBlockIdValid.length}) must be 
" +
-      s" same as number of block Ids (${blockIds.length})")
+      s" same as number of block Ids (${_blockIds.length})")
 
   // Hadoop configuration is not serializable, so broadcast it as a 
serializable.
   @transient private val hadoopConfig = sc.hadoopConfiguration
@@ -99,9 +99,9 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
 
   override def getPartitions: Array[Partition] = {
     assertValid()
-    Array.tabulate(blockIds.length) { i =>
+    Array.tabulate(_blockIds.length) { i =>
       val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
-      new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), isValid, 
walRecordHandles(i))
+      new WriteAheadLogBackedBlockRDDPartition(i, _blockIds(i), isValid, 
walRecordHandles(i))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to