spark git commit: [SPARK-4764] Ensure that files are fetched atomically

2014-12-08 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 8817fc7fe - ab2abcb5e


[SPARK-4764] Ensure that files are fetched atomically

tempFile is created in the same directory than targetFile, so that the
move from tempFile to targetFile is always atomic

Author: Christophe Préaud christophe.pre...@kelkoo.com

Closes #2855 from preaudc/master and squashes the following commits:

9ba89ca [Christophe Préaud] Ensure that files are fetched atomically
54419ae [Christophe Préaud] Merge remote-tracking branch 'upstream/master'
c6a5590 [Christophe Préaud] Revert commit 
8ea871f8130b2490f1bad7374a819bf56f0ccbbd
7456a33 [Christophe Préaud] Merge remote-tracking branch 'upstream/master'
8ea871f [Christophe Préaud] Ensure that files are fetched atomically


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab2abcb5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab2abcb5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab2abcb5

Branch: refs/heads/master
Commit: ab2abcb5ef925f15fa0e08d34a79b94a7b6578ef
Parents: 8817fc7
Author: Christophe Préaud christophe.pre...@kelkoo.com
Authored: Mon Dec 8 11:44:54 2014 -0800
Committer: Josh Rosen rosenvi...@gmail.com
Committed: Mon Dec 8 11:44:54 2014 -0800

--
 core/src/main/scala/org/apache/spark/util/Utils.scala | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ab2abcb5/core/src/main/scala/org/apache/spark/util/Utils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 336b079..9c04e45 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -425,8 +425,7 @@ private[spark] object Utils extends Logging {
   conf: SparkConf,
   securityMgr: SecurityManager,
   hadoopConf: Configuration) {
-val tempDir = getLocalDir(conf)
-val tempFile =  File.createTempFile(fetchFileTemp, null, new 
File(tempDir))
+val tempFile = File.createTempFile(fetchFileTemp, null, new 
File(targetDir.getAbsolutePath))
 val targetFile = new File(targetDir, filename)
 val uri = new URI(url)
 val fileOverwrite = conf.getBoolean(spark.files.overwrite, defaultValue 
= false)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4764] Ensure that files are fetched atomically

2014-12-08 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 8ee2d1873 - 16bc77b7d


[SPARK-4764] Ensure that files are fetched atomically

tempFile is created in the same directory than targetFile, so that the
move from tempFile to targetFile is always atomic

Author: Christophe Préaud christophe.pre...@kelkoo.com

Closes #2855 from preaudc/master and squashes the following commits:

9ba89ca [Christophe Préaud] Ensure that files are fetched atomically
54419ae [Christophe Préaud] Merge remote-tracking branch 'upstream/master'
c6a5590 [Christophe Préaud] Revert commit 
8ea871f8130b2490f1bad7374a819bf56f0ccbbd
7456a33 [Christophe Préaud] Merge remote-tracking branch 'upstream/master'
8ea871f [Christophe Préaud] Ensure that files are fetched atomically

(cherry picked from commit ab2abcb5ef925f15fa0e08d34a79b94a7b6578ef)
Signed-off-by: Josh Rosen rosenvi...@gmail.com

Conflicts:
core/src/main/scala/org/apache/spark/util/Utils.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16bc77b7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16bc77b7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16bc77b7

Branch: refs/heads/branch-1.1
Commit: 16bc77b7d91d9b21b9be45d346c47443386295d9
Parents: 8ee2d18
Author: Christophe Préaud christophe.pre...@kelkoo.com
Authored: Mon Dec 8 11:44:54 2014 -0800
Committer: Josh Rosen rosenvi...@gmail.com
Committed: Mon Dec 8 11:50:10 2014 -0800

--
 core/src/main/scala/org/apache/spark/util/Utils.scala | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/16bc77b7/core/src/main/scala/org/apache/spark/util/Utils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index ef11ed3..20bf36e 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -378,8 +378,7 @@ private[spark] object Utils extends Logging {
*/
   def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: 
SecurityManager) {
 val filename = url.split(/).last
-val tempDir = getLocalDir(conf)
-val tempFile =  File.createTempFile(fetchFileTemp, null, new 
File(tempDir))
+val tempFile = File.createTempFile(fetchFileTemp, null, new 
File(targetDir.getAbsolutePath))
 val targetFile = new File(targetDir, filename)
 val uri = new URI(url)
 val fileOverwrite = conf.getBoolean(spark.files.overwrite, false)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4774] [SQL] Makes HiveFromSpark more portable

2014-12-08 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master ab2abcb5e - d6a972b3e


[SPARK-4774] [SQL] Makes HiveFromSpark more portable

HiveFromSpark read the kv1.txt file from 
SPARK_HOME/examples/src/main/resources/kv1.txt which assumed
you had a source tree checked out. Now we copy the kv1.txt file to a temporary 
file and delete it when
the jvm shuts down. This allows us to run this example outside of a spark 
source tree.

Author: Kostas Sakellis kos...@cloudera.com

Closes #3628 from ksakellis/kostas-spark-4774 and squashes the following 
commits:

6770f83 [Kostas Sakellis] [SPARK-4774] [SQL] Makes HiveFromSpark more portable


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6a972b3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6a972b3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6a972b3

Branch: refs/heads/master
Commit: d6a972b3e4dc35a2d95df47d256462b325f4bda6
Parents: ab2abcb
Author: Kostas Sakellis kos...@cloudera.com
Authored: Mon Dec 8 15:44:18 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Dec 8 15:44:18 2014 -0800

--
 .../apache/spark/examples/sql/hive/HiveFromSpark.scala | 13 +++--
 1 file changed, 11 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d6a972b3/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
 
b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
index 138923c..5725da1 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.examples.sql.hive
 
+import com.google.common.io.{ByteStreams, Files}
+
+import java.io.File
+
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql._
 import org.apache.spark.sql.hive.HiveContext
@@ -24,10 +28,15 @@ import org.apache.spark.sql.hive.HiveContext
 object HiveFromSpark {
   case class Record(key: Int, value: String)
 
+  // Copy kv1.txt file from classpath to temporary directory
+  val kv1Stream = HiveFromSpark.getClass.getResourceAsStream(/kv1.txt)
+  val kv1File = File.createTempFile(kv1, txt)
+  kv1File.deleteOnExit()
+  ByteStreams.copy(kv1Stream, Files.newOutputStreamSupplier(kv1File))
+
   def main(args: Array[String]) {
 val sparkConf = new SparkConf().setAppName(HiveFromSpark)
 val sc = new SparkContext(sparkConf)
-val path = 
s${System.getenv(SPARK_HOME)}/examples/src/main/resources/kv1.txt
 
 // A hive context adds support for finding tables in the MetaStore and 
writing queries
 // using HiveQL. Users who do not have an existing Hive deployment can 
still create a
@@ -37,7 +46,7 @@ object HiveFromSpark {
 import hiveContext._
 
 sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING))
-sql(sLOAD DATA LOCAL INPATH '$path' INTO TABLE src)
+sql(sLOAD DATA LOCAL INPATH '${kv1File.getAbsolutePath}' INTO TABLE src)
 
 // Queries are expressed in HiveQL
 println(Result of 'SELECT *': )


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4774] [SQL] Makes HiveFromSpark more portable

2014-12-08 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 6b9e8b081 - 9ed5641a5


[SPARK-4774] [SQL] Makes HiveFromSpark more portable

HiveFromSpark read the kv1.txt file from 
SPARK_HOME/examples/src/main/resources/kv1.txt which assumed
you had a source tree checked out. Now we copy the kv1.txt file to a temporary 
file and delete it when
the jvm shuts down. This allows us to run this example outside of a spark 
source tree.

Author: Kostas Sakellis kos...@cloudera.com

Closes #3628 from ksakellis/kostas-spark-4774 and squashes the following 
commits:

6770f83 [Kostas Sakellis] [SPARK-4774] [SQL] Makes HiveFromSpark more portable

(cherry picked from commit d6a972b3e4dc35a2d95df47d256462b325f4bda6)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ed5641a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ed5641a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ed5641a

Branch: refs/heads/branch-1.2
Commit: 9ed5641a5a4425278283896928efa4e382fb74d8
Parents: 6b9e8b0
Author: Kostas Sakellis kos...@cloudera.com
Authored: Mon Dec 8 15:44:18 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Dec 8 15:44:30 2014 -0800

--
 .../apache/spark/examples/sql/hive/HiveFromSpark.scala | 13 +++--
 1 file changed, 11 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9ed5641a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
 
b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
index 138923c..5725da1 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.examples.sql.hive
 
+import com.google.common.io.{ByteStreams, Files}
+
+import java.io.File
+
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql._
 import org.apache.spark.sql.hive.HiveContext
@@ -24,10 +28,15 @@ import org.apache.spark.sql.hive.HiveContext
 object HiveFromSpark {
   case class Record(key: Int, value: String)
 
+  // Copy kv1.txt file from classpath to temporary directory
+  val kv1Stream = HiveFromSpark.getClass.getResourceAsStream(/kv1.txt)
+  val kv1File = File.createTempFile(kv1, txt)
+  kv1File.deleteOnExit()
+  ByteStreams.copy(kv1Stream, Files.newOutputStreamSupplier(kv1File))
+
   def main(args: Array[String]) {
 val sparkConf = new SparkConf().setAppName(HiveFromSpark)
 val sc = new SparkContext(sparkConf)
-val path = 
s${System.getenv(SPARK_HOME)}/examples/src/main/resources/kv1.txt
 
 // A hive context adds support for finding tables in the MetaStore and 
writing queries
 // using HiveQL. Users who do not have an existing Hive deployment can 
still create a
@@ -37,7 +46,7 @@ object HiveFromSpark {
 import hiveContext._
 
 sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING))
-sql(sLOAD DATA LOCAL INPATH '$path' INTO TABLE src)
+sql(sLOAD DATA LOCAL INPATH '${kv1File.getAbsolutePath}' INTO TABLE src)
 
 // Queries are expressed in HiveQL
 println(Result of 'SELECT *': )


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4750] Dynamic allocation - synchronize kills

2014-12-08 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master d6a972b3e - 65f929d5b


[SPARK-4750] Dynamic allocation - synchronize kills

Simple omission on my part.

Author: Andrew Or and...@databricks.com

Closes #3612 from andrewor14/dynamic-allocation-synchronization and squashes 
the following commits:

1f03b60 [Andrew Or] Synchronize kills


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/65f929d5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/65f929d5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/65f929d5

Branch: refs/heads/master
Commit: 65f929d5b3a50a73cd6397bd4b72c3e7d94c99d7
Parents: d6a972b
Author: Andrew Or and...@databricks.com
Authored: Mon Dec 8 16:02:33 2014 -0800
Committer: Josh Rosen joshro...@databricks.com
Committed: Mon Dec 8 16:02:33 2014 -0800

--
 .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/65f929d5/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 88b196a..29cd344 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -334,7 +334,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val actorSyste
* Request that the cluster manager kill the specified executors.
* Return whether the kill request is acknowledged.
*/
-  final def killExecutors(executorIds: Seq[String]): Boolean = {
+  final def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
 logInfo(sRequesting to kill executor(s) ${executorIds.mkString(, )})
 val filteredExecutorIds = new ArrayBuffer[String]
 executorIds.foreach { id =


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: SPARK-3926 [CORE] Reopened: result of JavaRDD collectAsMap() is not serializable

2014-12-08 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 65f929d5b - e829bfa1a


SPARK-3926 [CORE] Reopened: result of JavaRDD collectAsMap() is not serializable

My original 'fix' didn't fix at all. Now, there's a unit test to check whether 
it works. Of the two options to really fix it -- copy the `Map` to a 
`java.util.HashMap`, or copy and modify Scala's implementation in 
`Wrappers.MapWrapper`, I went with the latter.

Author: Sean Owen so...@cloudera.com

Closes #3587 from srowen/SPARK-3926 and squashes the following commits:

8586bb9 [Sean Owen] Remove unneeded no-arg constructor, and add additional note 
about copied code in LICENSE
7bb0e66 [Sean Owen] Make SerializableMapWrapper actually serialize, and add 
unit test


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e829bfa1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e829bfa1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e829bfa1

Branch: refs/heads/master
Commit: e829bfa1ab9b68f44c489d26efb042f793fd9362
Parents: 65f929d
Author: Sean Owen so...@cloudera.com
Authored: Mon Dec 8 16:13:03 2014 -0800
Committer: Josh Rosen joshro...@databricks.com
Committed: Mon Dec 8 16:13:03 2014 -0800

--
 LICENSE |  3 +-
 .../org/apache/spark/api/java/JavaUtils.scala   | 62 +++-
 .../java/org/apache/spark/JavaAPISuite.java | 13 
 3 files changed, 75 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e829bfa1/LICENSE
--
diff --git a/LICENSE b/LICENSE
index 3c667bf..0a42d38 100644
--- a/LICENSE
+++ b/LICENSE
@@ -646,7 +646,8 @@ THE SOFTWARE.
 
 
 For Scala Interpreter classes (all .scala files in repl/src/main/scala
-except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala):
+except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala),
+and for SerializableMapWrapper in JavaUtils.scala:
 
 
 Copyright (c) 2002-2013 EPFL

http://git-wip-us.apache.org/repos/asf/spark/blob/e829bfa1/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
index b52d0a5..86e9493 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
@@ -19,7 +19,8 @@ package org.apache.spark.api.java
 
 import com.google.common.base.Optional
 
-import scala.collection.convert.Wrappers.MapWrapper
+import java.{util = ju}
+import scala.collection.mutable
 
 private[spark] object JavaUtils {
   def optionToOptional[T](option: Option[T]): Optional[T] =
@@ -32,7 +33,64 @@ private[spark] object JavaUtils {
   def mapAsSerializableJavaMap[A, B](underlying: collection.Map[A, B]) =
 new SerializableMapWrapper(underlying)
 
+  // Implementation is copied from 
scala.collection.convert.Wrappers.MapWrapper,
+  // but implements java.io.Serializable. It can't just be subclassed to make 
it
+  // Serializable since the MapWrapper class has no no-arg constructor. This 
class
+  // doesn't need a no-arg constructor though.
   class SerializableMapWrapper[A, B](underlying: collection.Map[A, B])
-extends MapWrapper(underlying) with java.io.Serializable
+extends ju.AbstractMap[A, B] with java.io.Serializable { self =
 
+override def size = underlying.size
+
+override def get(key: AnyRef): B = try {
+  underlying get key.asInstanceOf[A] match {
+case None = null.asInstanceOf[B]
+case Some(v) = v
+  }
+} catch {
+  case ex: ClassCastException = null.asInstanceOf[B]
+}
+
+override def entrySet: ju.Set[ju.Map.Entry[A, B]] = new 
ju.AbstractSet[ju.Map.Entry[A, B]] {
+  def size = self.size
+
+  def iterator = new ju.Iterator[ju.Map.Entry[A, B]] {
+val ui = underlying.iterator
+var prev : Option[A] = None
+
+def hasNext = ui.hasNext
+
+def next() = {
+  val (k, v) = ui.next
+  prev = Some(k)
+  new ju.Map.Entry[A, B] {
+import scala.util.hashing.byteswap32
+def getKey = k
+def getValue = v
+def setValue(v1 : B) = self.put(k, v1)
+override def hashCode = byteswap32(k.hashCode) + 
(byteswap32(v.hashCode)  16)
+override def equals(other: Any) = other match {
+  case e: ju.Map.Entry[_, _] = k == e.getKey  v == e.getValue
+  case _ = false
+}
+  

spark git commit: SPARK-3926 [CORE] Reopened: result of JavaRDD collectAsMap() is not serializable

2014-12-08 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 16bc77b7d - fe7d7a983


SPARK-3926 [CORE] Reopened: result of JavaRDD collectAsMap() is not serializable

My original 'fix' didn't fix at all. Now, there's a unit test to check whether 
it works. Of the two options to really fix it -- copy the `Map` to a 
`java.util.HashMap`, or copy and modify Scala's implementation in 
`Wrappers.MapWrapper`, I went with the latter.

Author: Sean Owen so...@cloudera.com

Closes #3587 from srowen/SPARK-3926 and squashes the following commits:

8586bb9 [Sean Owen] Remove unneeded no-arg constructor, and add additional note 
about copied code in LICENSE
7bb0e66 [Sean Owen] Make SerializableMapWrapper actually serialize, and add 
unit test

(cherry picked from commit e829bfa1ab9b68f44c489d26efb042f793fd9362)
Signed-off-by: Josh Rosen joshro...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe7d7a98
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe7d7a98
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe7d7a98

Branch: refs/heads/branch-1.1
Commit: fe7d7a9834c953d155b3157242b722c67972eec5
Parents: 16bc77b
Author: Sean Owen so...@cloudera.com
Authored: Mon Dec 8 16:13:03 2014 -0800
Committer: Josh Rosen joshro...@databricks.com
Committed: Mon Dec 8 16:14:28 2014 -0800

--
 LICENSE |  3 +-
 .../org/apache/spark/api/java/JavaUtils.scala   | 62 +++-
 .../java/org/apache/spark/JavaAPISuite.java | 13 
 3 files changed, 75 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fe7d7a98/LICENSE
--
diff --git a/LICENSE b/LICENSE
index e9a1153..ad120d8 100644
--- a/LICENSE
+++ b/LICENSE
@@ -363,7 +363,8 @@ THE SOFTWARE.
 
 
 For Scala Interpreter classes (all .scala files in repl/src/main/scala
-except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala):
+except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala),
+and for SerializableMapWrapper in JavaUtils.scala:
 
 
 Copyright (c) 2002-2013 EPFL

http://git-wip-us.apache.org/repos/asf/spark/blob/fe7d7a98/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
index b52d0a5..86e9493 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
@@ -19,7 +19,8 @@ package org.apache.spark.api.java
 
 import com.google.common.base.Optional
 
-import scala.collection.convert.Wrappers.MapWrapper
+import java.{util = ju}
+import scala.collection.mutable
 
 private[spark] object JavaUtils {
   def optionToOptional[T](option: Option[T]): Optional[T] =
@@ -32,7 +33,64 @@ private[spark] object JavaUtils {
   def mapAsSerializableJavaMap[A, B](underlying: collection.Map[A, B]) =
 new SerializableMapWrapper(underlying)
 
+  // Implementation is copied from 
scala.collection.convert.Wrappers.MapWrapper,
+  // but implements java.io.Serializable. It can't just be subclassed to make 
it
+  // Serializable since the MapWrapper class has no no-arg constructor. This 
class
+  // doesn't need a no-arg constructor though.
   class SerializableMapWrapper[A, B](underlying: collection.Map[A, B])
-extends MapWrapper(underlying) with java.io.Serializable
+extends ju.AbstractMap[A, B] with java.io.Serializable { self =
 
+override def size = underlying.size
+
+override def get(key: AnyRef): B = try {
+  underlying get key.asInstanceOf[A] match {
+case None = null.asInstanceOf[B]
+case Some(v) = v
+  }
+} catch {
+  case ex: ClassCastException = null.asInstanceOf[B]
+}
+
+override def entrySet: ju.Set[ju.Map.Entry[A, B]] = new 
ju.AbstractSet[ju.Map.Entry[A, B]] {
+  def size = self.size
+
+  def iterator = new ju.Iterator[ju.Map.Entry[A, B]] {
+val ui = underlying.iterator
+var prev : Option[A] = None
+
+def hasNext = ui.hasNext
+
+def next() = {
+  val (k, v) = ui.next
+  prev = Some(k)
+  new ju.Map.Entry[A, B] {
+import scala.util.hashing.byteswap32
+def getKey = k
+def getValue = v
+def setValue(v1 : B) = self.put(k, v1)
+override def hashCode = byteswap32(k.hashCode) + 
(byteswap32(v.hashCode)  16)
+override def equals(other: Any) = other match {
+  

spark git commit: SPARK-4770. [DOC] [YARN] spark.scheduler.minRegisteredResourcesRatio doc...

2014-12-08 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master e829bfa1a - cda94d15e


SPARK-4770. [DOC] [YARN] spark.scheduler.minRegisteredResourcesRatio doc...

...umented default is incorrect for YARN

Author: Sandy Ryza sa...@cloudera.com

Closes #3624 from sryza/sandy-spark-4770 and squashes the following commits:

bd81a3a [Sandy Ryza] SPARK-4770. [DOC] [YARN] 
spark.scheduler.minRegisteredResourcesRatio documented default is incorrect for 
YARN


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cda94d15
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cda94d15
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cda94d15

Branch: refs/heads/master
Commit: cda94d15ea2a70ed3f0651ba2766b1e2f80308c1
Parents: e829bfa
Author: Sandy Ryza sa...@cloudera.com
Authored: Mon Dec 8 16:28:36 2014 -0800
Committer: Josh Rosen joshro...@databricks.com
Committed: Mon Dec 8 16:28:36 2014 -0800

--
 docs/configuration.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cda94d15/docs/configuration.md
--
diff --git a/docs/configuration.md b/docs/configuration.md
index 4b4bbea..d50b046 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -939,11 +939,11 @@ Apart from these, the following properties are also 
available, and may be useful
 /tr
 /tr
   tdcodespark.scheduler.minRegisteredResourcesRatio/code/td
-  td0/td
+  td0.0 for Mesos and Standalone mode, 0.8 for YARN/td
   td
 The minimum ratio of registered resources (registered resources / total 
expected resources)
 (resources are executors in yarn mode, CPU cores in standalone mode)
-to wait for before scheduling begins. Specified as a double between 0 and 
1.
+to wait for before scheduling begins. Specified as a double between 0.0 
and 1.0.
 Regardless of whether the minimum ratio of resources has been reached,
 the maximum amount of time it will wait before scheduling begins is 
controlled by config 
 codespark.scheduler.maxRegisteredResourcesWaitingTime/code.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: SPARK-4770. [DOC] [YARN] spark.scheduler.minRegisteredResourcesRatio doc...

2014-12-08 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 9ed5641a5 - f4160324c


SPARK-4770. [DOC] [YARN] spark.scheduler.minRegisteredResourcesRatio doc...

...umented default is incorrect for YARN

Author: Sandy Ryza sa...@cloudera.com

Closes #3624 from sryza/sandy-spark-4770 and squashes the following commits:

bd81a3a [Sandy Ryza] SPARK-4770. [DOC] [YARN] 
spark.scheduler.minRegisteredResourcesRatio documented default is incorrect for 
YARN

(cherry picked from commit cda94d15ea2a70ed3f0651ba2766b1e2f80308c1)
Signed-off-by: Josh Rosen joshro...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f4160324
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f4160324
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f4160324

Branch: refs/heads/branch-1.2
Commit: f4160324c55b4d168421af5473ce306bc03a77bb
Parents: 9ed5641
Author: Sandy Ryza sa...@cloudera.com
Authored: Mon Dec 8 16:28:36 2014 -0800
Committer: Josh Rosen joshro...@databricks.com
Committed: Mon Dec 8 16:28:57 2014 -0800

--
 docs/configuration.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f4160324/docs/configuration.md
--
diff --git a/docs/configuration.md b/docs/configuration.md
index cdd9f1e..55d41c0 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -939,11 +939,11 @@ Apart from these, the following properties are also 
available, and may be useful
 /tr
 /tr
   tdcodespark.scheduler.minRegisteredResourcesRatio/code/td
-  td0/td
+  td0.0 for Mesos and Standalone mode, 0.8 for YARN/td
   td
 The minimum ratio of registered resources (registered resources / total 
expected resources)
 (resources are executors in yarn mode, CPU cores in standalone mode)
-to wait for before scheduling begins. Specified as a double between 0 and 
1.
+to wait for before scheduling begins. Specified as a double between 0.0 
and 1.0.
 Regardless of whether the minimum ratio of resources has been reached,
 the maximum amount of time it will wait before scheduling begins is 
controlled by config 
 codespark.scheduler.maxRegisteredResourcesWaitingTime/code.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SQL] remove unnecessary import in spark-sql

2014-12-08 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master cda94d15e - 944384363


[SQL] remove unnecessary import in spark-sql

Author: Jacky Li jacky.li...@huawei.com

Closes #3630 from jackylk/remove and squashes the following commits:

150e7e0 [Jacky Li] remove unnecessary import


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94438436
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94438436
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94438436

Branch: refs/heads/master
Commit: 944384363d390a133529c08a1d0ac70aa8e778b5
Parents: cda94d1
Author: Jacky Li jacky.li...@huawei.com
Authored: Mon Dec 8 17:27:46 2014 -0800
Committer: Josh Rosen joshro...@databricks.com
Committed: Mon Dec 8 17:27:46 2014 -0800

--
 sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala | 2 +-
 .../main/scala/org/apache/spark/sql/execution/ExistingRDD.scala  | 4 +---
 .../org/apache/spark/sql/execution/GeneratedAggregate.scala  | 1 -
 .../main/scala/org/apache/spark/sql/execution/SparkPlan.scala| 3 ---
 .../org/apache/spark/sql/types/util/DataTypeConversions.scala| 2 +-
 5 files changed, 3 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/94438436/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index c6d4dab..95d73c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql
 
 import java.util.{Map = JMap, List = JList}
-import java.io.StringWriter
+
 
 import scala.collection.JavaConversions._
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94438436/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index ed6b95d..d2d8cb1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -19,13 +19,11 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataType, StructType, Row, SQLContext}
+import org.apache.spark.sql.{StructType, Row, SQLContext}
 import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.ScalaReflection.Schema
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
-import org.apache.spark.sql.catalyst.types.UserDefinedType
 
 /**
  * :: DeveloperApi ::

http://git-wip-us.apache.org/repos/asf/spark/blob/94438436/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
index 18afc5d..7c3bf94 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.trees._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._

http://git-wip-us.apache.org/repos/asf/spark/blob/94438436/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 81c60e0..4cd8e7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -22,14 +22,11 @@ import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.{ScalaReflection, trees}
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import 

spark git commit: [SPARK-4769] [SQL] CTAS does not work when reading from temporary tables

2014-12-08 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 944384363 - 51b1fe142


[SPARK-4769] [SQL] CTAS does not work when reading from temporary tables

This is the code refactor and follow ups for #2570

Author: Cheng Hao hao.ch...@intel.com

Closes #3336 from chenghao-intel/createtbl and squashes the following commits:

3563142 [Cheng Hao] remove the unused variable
e215187 [Cheng Hao] eliminate the compiling warning
4f97f14 [Cheng Hao] fix bug in unittest
5d58812 [Cheng Hao] revert the API changes
b85b620 [Cheng Hao] fix the regression of temp tabl not found in CTAS


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51b1fe14
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51b1fe14
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51b1fe14

Branch: refs/heads/master
Commit: 51b1fe1426ffecac6c4644523633ea1562ff9a4e
Parents: 9443843
Author: Cheng Hao hao.ch...@intel.com
Authored: Mon Dec 8 17:39:12 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Dec 8 17:39:12 2014 -0800

--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 26 ++--
 .../apache/spark/sql/hive/HiveStrategies.scala  | 14 ---
 .../hive/execution/CreateTableAsSelect.scala| 16 
 .../sql/hive/execution/SQLQuerySuite.scala  |  9 +++
 4 files changed, 49 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/51b1fe14/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 91a1577..6086563 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -254,15 +254,37 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
* For example, because of a CREATE TABLE X AS statement.
*/
   object CreateTables extends Rule[LogicalPlan] {
+import org.apache.hadoop.hive.ql.Context
+import org.apache.hadoop.hive.ql.parse.{QB, ASTNode, SemanticAnalyzer}
+
 def apply(plan: LogicalPlan): LogicalPlan = plan transform {
   // Wait until children are resolved.
   case p: LogicalPlan if !p.childrenResolved = p
 
-  case CreateTableAsSelect(db, tableName, child, allowExisting, extra) =
+  case CreateTableAsSelect(db, tableName, child, allowExisting, 
Some(extra: ASTNode)) =
 val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
 val databaseName = 
dbName.getOrElse(hive.sessionState.getCurrentDatabase)
 
-CreateTableAsSelect(Some(databaseName), tableName, child, 
allowExisting, extra)
+// Get the CreateTableDesc from Hive SemanticAnalyzer
+val desc: Option[CreateTableDesc] = if 
(tableExists(Some(databaseName), tblName)) {
+  None
+} else {
+  val sa = new SemanticAnalyzer(hive.hiveconf) {
+override def analyzeInternal(ast: ASTNode) {
+  // A hack to intercept the SemanticAnalyzer.analyzeInternal,
+  // to ignore the SELECT clause of the CTAS
+  val method = classOf[SemanticAnalyzer].getDeclaredMethod(
+analyzeCreateTable, classOf[ASTNode], classOf[QB])
+  method.setAccessible(true)
+  method.invoke(this, ast, this.getQB)
+}
+  }
+
+  sa.analyze(extra, new Context(hive.hiveconf))
+  Some(sa.getQB().getTableDesc)
+}
+
+CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, 
desc)
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/51b1fe14/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index edf291f..5f02e95 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import org.apache.hadoop.hive.ql.parse.ASTNode
+import org.apache.hadoop.hive.ql.plan.CreateTableDesc
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -181,13 +182,20 @@ private[hive] trait HiveStrategies {
 execution.InsertIntoHiveTable(
   table, partition, planLater(child), overwrite)(hiveContext) :: Nil
   

spark git commit: [SPARK-3154][STREAMING] Replace ConcurrentHashMap with mutable.HashMap and remove @volatile from 'stopped'

2014-12-08 Thread adav
Repository: spark
Updated Branches:
  refs/heads/master 51b1fe142 - bcb5cdad6


[SPARK-3154][STREAMING] Replace ConcurrentHashMap with mutable.HashMap and 
remove @volatile from 'stopped'

Since `sequenceNumberToProcessor` and `stopped` are both protected by the lock 
`sequenceNumberToProcessor`, `ConcurrentHashMap` and `volatile` is unnecessary. 
So this PR updated them accordingly.

Author: zsxwing zsxw...@gmail.com

Closes #3634 from zsxwing/SPARK-3154 and squashes the following commits:

0d087ac [zsxwing] Replace ConcurrentHashMap with mutable.HashMap and remove 
@volatile from 'stopped'


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bcb5cdad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bcb5cdad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bcb5cdad

Branch: refs/heads/master
Commit: bcb5cdad614d4fce43725dfec3ce88172d2f8c11
Parents: 51b1fe1
Author: zsxwing zsxw...@gmail.com
Authored: Mon Dec 8 23:54:15 2014 -0800
Committer: Aaron Davidson aa...@databricks.com
Committed: Mon Dec 8 23:54:15 2014 -0800

--
 .../flume/sink/SparkAvroCallbackHandler.scala   | 23 ++--
 1 file changed, 12 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bcb5cdad/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
--
diff --git 
a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
 
b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
index 3c656a3..4373be4 100644
--- 
a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
+++ 
b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -16,10 +16,10 @@
  */
 package org.apache.spark.streaming.flume.sink
 
-import java.util.concurrent.{CountDownLatch, ConcurrentHashMap, Executors}
+import java.util.concurrent.{CountDownLatch, Executors}
 import java.util.concurrent.atomic.AtomicLong
 
-import scala.collection.JavaConversions._
+import scala.collection.mutable
 
 import org.apache.flume.Channel
 import org.apache.commons.lang.RandomStringUtils
@@ -47,8 +47,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: 
Int, val channel: Cha
   val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
 new ThreadFactoryBuilder().setDaemon(true)
   .setNameFormat(Spark Sink Processor Thread - %d).build()))
-  private val sequenceNumberToProcessor =
-new ConcurrentHashMap[CharSequence, TransactionProcessor]()
+  // Protected by `sequenceNumberToProcessor`
+  private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, 
TransactionProcessor]()
   // This sink will not persist sequence numbers and reuses them if it gets 
restarted.
   // So it is possible to commit a transaction which may have been meant for 
the sink before the
   // restart.
@@ -58,8 +58,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: 
Int, val channel: Cha
   private val seqBase = RandomStringUtils.randomAlphanumeric(8)
   private val seqCounter = new AtomicLong(0)
 
-
-  @volatile private var stopped = false
+  // Protected by `sequenceNumberToProcessor`
+  private var stopped = false
 
   @volatile private var isTest = false
   private var testLatch: CountDownLatch = null
@@ -131,7 +131,7 @@ private[flume] class SparkAvroCallbackHandler(val threads: 
Int, val channel: Cha
* @param success Whether the batch was successful or not.
*/
   private def completeTransaction(sequenceNumber: CharSequence, success: 
Boolean) {
-Option(removeAndGetProcessor(sequenceNumber)).foreach(processor = {
+removeAndGetProcessor(sequenceNumber).foreach(processor = {
   processor.batchProcessed(success)
 })
   }
@@ -139,10 +139,11 @@ private[flume] class SparkAvroCallbackHandler(val 
threads: Int, val channel: Cha
   /**
* Helper method to remove the TxnProcessor for a Sequence Number. Can be 
used to avoid a leak.
* @param sequenceNumber
-   * @return The transaction processor for the corresponding batch. Note that 
this instance is no
-   * longer tracked and the caller is responsible for that txn 
processor.
+   * @return An `Option` of the transaction processor for the corresponding 
batch. Note that this
+   * instance is no longer tracked and the caller is responsible for 
that txn processor.
*/
-  private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): 
TransactionProcessor = {
+  private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence):
+  Option[TransactionProcessor] = {