spark git commit: [SPARK-4764] Ensure that files are fetched atomically
Repository: spark Updated Branches: refs/heads/master 8817fc7fe - ab2abcb5e [SPARK-4764] Ensure that files are fetched atomically tempFile is created in the same directory than targetFile, so that the move from tempFile to targetFile is always atomic Author: Christophe Préaud christophe.pre...@kelkoo.com Closes #2855 from preaudc/master and squashes the following commits: 9ba89ca [Christophe Préaud] Ensure that files are fetched atomically 54419ae [Christophe Préaud] Merge remote-tracking branch 'upstream/master' c6a5590 [Christophe Préaud] Revert commit 8ea871f8130b2490f1bad7374a819bf56f0ccbbd 7456a33 [Christophe Préaud] Merge remote-tracking branch 'upstream/master' 8ea871f [Christophe Préaud] Ensure that files are fetched atomically Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab2abcb5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab2abcb5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab2abcb5 Branch: refs/heads/master Commit: ab2abcb5ef925f15fa0e08d34a79b94a7b6578ef Parents: 8817fc7 Author: Christophe Préaud christophe.pre...@kelkoo.com Authored: Mon Dec 8 11:44:54 2014 -0800 Committer: Josh Rosen rosenvi...@gmail.com Committed: Mon Dec 8 11:44:54 2014 -0800 -- core/src/main/scala/org/apache/spark/util/Utils.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ab2abcb5/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 336b079..9c04e45 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -425,8 +425,7 @@ private[spark] object Utils extends Logging { conf: SparkConf, securityMgr: SecurityManager, hadoopConf: Configuration) { -val tempDir = getLocalDir(conf) -val tempFile = File.createTempFile(fetchFileTemp, null, new File(tempDir)) +val tempFile = File.createTempFile(fetchFileTemp, null, new File(targetDir.getAbsolutePath)) val targetFile = new File(targetDir, filename) val uri = new URI(url) val fileOverwrite = conf.getBoolean(spark.files.overwrite, defaultValue = false) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4764] Ensure that files are fetched atomically
Repository: spark Updated Branches: refs/heads/branch-1.1 8ee2d1873 - 16bc77b7d [SPARK-4764] Ensure that files are fetched atomically tempFile is created in the same directory than targetFile, so that the move from tempFile to targetFile is always atomic Author: Christophe Préaud christophe.pre...@kelkoo.com Closes #2855 from preaudc/master and squashes the following commits: 9ba89ca [Christophe Préaud] Ensure that files are fetched atomically 54419ae [Christophe Préaud] Merge remote-tracking branch 'upstream/master' c6a5590 [Christophe Préaud] Revert commit 8ea871f8130b2490f1bad7374a819bf56f0ccbbd 7456a33 [Christophe Préaud] Merge remote-tracking branch 'upstream/master' 8ea871f [Christophe Préaud] Ensure that files are fetched atomically (cherry picked from commit ab2abcb5ef925f15fa0e08d34a79b94a7b6578ef) Signed-off-by: Josh Rosen rosenvi...@gmail.com Conflicts: core/src/main/scala/org/apache/spark/util/Utils.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16bc77b7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16bc77b7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16bc77b7 Branch: refs/heads/branch-1.1 Commit: 16bc77b7d91d9b21b9be45d346c47443386295d9 Parents: 8ee2d18 Author: Christophe Préaud christophe.pre...@kelkoo.com Authored: Mon Dec 8 11:44:54 2014 -0800 Committer: Josh Rosen rosenvi...@gmail.com Committed: Mon Dec 8 11:50:10 2014 -0800 -- core/src/main/scala/org/apache/spark/util/Utils.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/16bc77b7/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ef11ed3..20bf36e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -378,8 +378,7 @@ private[spark] object Utils extends Logging { */ def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager) { val filename = url.split(/).last -val tempDir = getLocalDir(conf) -val tempFile = File.createTempFile(fetchFileTemp, null, new File(tempDir)) +val tempFile = File.createTempFile(fetchFileTemp, null, new File(targetDir.getAbsolutePath)) val targetFile = new File(targetDir, filename) val uri = new URI(url) val fileOverwrite = conf.getBoolean(spark.files.overwrite, false) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4774] [SQL] Makes HiveFromSpark more portable
Repository: spark Updated Branches: refs/heads/master ab2abcb5e - d6a972b3e [SPARK-4774] [SQL] Makes HiveFromSpark more portable HiveFromSpark read the kv1.txt file from SPARK_HOME/examples/src/main/resources/kv1.txt which assumed you had a source tree checked out. Now we copy the kv1.txt file to a temporary file and delete it when the jvm shuts down. This allows us to run this example outside of a spark source tree. Author: Kostas Sakellis kos...@cloudera.com Closes #3628 from ksakellis/kostas-spark-4774 and squashes the following commits: 6770f83 [Kostas Sakellis] [SPARK-4774] [SQL] Makes HiveFromSpark more portable Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6a972b3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6a972b3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6a972b3 Branch: refs/heads/master Commit: d6a972b3e4dc35a2d95df47d256462b325f4bda6 Parents: ab2abcb Author: Kostas Sakellis kos...@cloudera.com Authored: Mon Dec 8 15:44:18 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Dec 8 15:44:18 2014 -0800 -- .../apache/spark/examples/sql/hive/HiveFromSpark.scala | 13 +++-- 1 file changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d6a972b3/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala index 138923c..5725da1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -17,6 +17,10 @@ package org.apache.spark.examples.sql.hive +import com.google.common.io.{ByteStreams, Files} + +import java.io.File + import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ import org.apache.spark.sql.hive.HiveContext @@ -24,10 +28,15 @@ import org.apache.spark.sql.hive.HiveContext object HiveFromSpark { case class Record(key: Int, value: String) + // Copy kv1.txt file from classpath to temporary directory + val kv1Stream = HiveFromSpark.getClass.getResourceAsStream(/kv1.txt) + val kv1File = File.createTempFile(kv1, txt) + kv1File.deleteOnExit() + ByteStreams.copy(kv1Stream, Files.newOutputStreamSupplier(kv1File)) + def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(HiveFromSpark) val sc = new SparkContext(sparkConf) -val path = s${System.getenv(SPARK_HOME)}/examples/src/main/resources/kv1.txt // A hive context adds support for finding tables in the MetaStore and writing queries // using HiveQL. Users who do not have an existing Hive deployment can still create a @@ -37,7 +46,7 @@ object HiveFromSpark { import hiveContext._ sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) -sql(sLOAD DATA LOCAL INPATH '$path' INTO TABLE src) +sql(sLOAD DATA LOCAL INPATH '${kv1File.getAbsolutePath}' INTO TABLE src) // Queries are expressed in HiveQL println(Result of 'SELECT *': ) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4774] [SQL] Makes HiveFromSpark more portable
Repository: spark Updated Branches: refs/heads/branch-1.2 6b9e8b081 - 9ed5641a5 [SPARK-4774] [SQL] Makes HiveFromSpark more portable HiveFromSpark read the kv1.txt file from SPARK_HOME/examples/src/main/resources/kv1.txt which assumed you had a source tree checked out. Now we copy the kv1.txt file to a temporary file and delete it when the jvm shuts down. This allows us to run this example outside of a spark source tree. Author: Kostas Sakellis kos...@cloudera.com Closes #3628 from ksakellis/kostas-spark-4774 and squashes the following commits: 6770f83 [Kostas Sakellis] [SPARK-4774] [SQL] Makes HiveFromSpark more portable (cherry picked from commit d6a972b3e4dc35a2d95df47d256462b325f4bda6) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ed5641a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ed5641a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ed5641a Branch: refs/heads/branch-1.2 Commit: 9ed5641a5a4425278283896928efa4e382fb74d8 Parents: 6b9e8b0 Author: Kostas Sakellis kos...@cloudera.com Authored: Mon Dec 8 15:44:18 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Dec 8 15:44:30 2014 -0800 -- .../apache/spark/examples/sql/hive/HiveFromSpark.scala | 13 +++-- 1 file changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9ed5641a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala index 138923c..5725da1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -17,6 +17,10 @@ package org.apache.spark.examples.sql.hive +import com.google.common.io.{ByteStreams, Files} + +import java.io.File + import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ import org.apache.spark.sql.hive.HiveContext @@ -24,10 +28,15 @@ import org.apache.spark.sql.hive.HiveContext object HiveFromSpark { case class Record(key: Int, value: String) + // Copy kv1.txt file from classpath to temporary directory + val kv1Stream = HiveFromSpark.getClass.getResourceAsStream(/kv1.txt) + val kv1File = File.createTempFile(kv1, txt) + kv1File.deleteOnExit() + ByteStreams.copy(kv1Stream, Files.newOutputStreamSupplier(kv1File)) + def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(HiveFromSpark) val sc = new SparkContext(sparkConf) -val path = s${System.getenv(SPARK_HOME)}/examples/src/main/resources/kv1.txt // A hive context adds support for finding tables in the MetaStore and writing queries // using HiveQL. Users who do not have an existing Hive deployment can still create a @@ -37,7 +46,7 @@ object HiveFromSpark { import hiveContext._ sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) -sql(sLOAD DATA LOCAL INPATH '$path' INTO TABLE src) +sql(sLOAD DATA LOCAL INPATH '${kv1File.getAbsolutePath}' INTO TABLE src) // Queries are expressed in HiveQL println(Result of 'SELECT *': ) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4750] Dynamic allocation - synchronize kills
Repository: spark Updated Branches: refs/heads/master d6a972b3e - 65f929d5b [SPARK-4750] Dynamic allocation - synchronize kills Simple omission on my part. Author: Andrew Or and...@databricks.com Closes #3612 from andrewor14/dynamic-allocation-synchronization and squashes the following commits: 1f03b60 [Andrew Or] Synchronize kills Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/65f929d5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/65f929d5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/65f929d5 Branch: refs/heads/master Commit: 65f929d5b3a50a73cd6397bd4b72c3e7d94c99d7 Parents: d6a972b Author: Andrew Or and...@databricks.com Authored: Mon Dec 8 16:02:33 2014 -0800 Committer: Josh Rosen joshro...@databricks.com Committed: Mon Dec 8 16:02:33 2014 -0800 -- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/65f929d5/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 88b196a..29cd344 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -334,7 +334,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste * Request that the cluster manager kill the specified executors. * Return whether the kill request is acknowledged. */ - final def killExecutors(executorIds: Seq[String]): Boolean = { + final def killExecutors(executorIds: Seq[String]): Boolean = synchronized { logInfo(sRequesting to kill executor(s) ${executorIds.mkString(, )}) val filteredExecutorIds = new ArrayBuffer[String] executorIds.foreach { id = - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-3926 [CORE] Reopened: result of JavaRDD collectAsMap() is not serializable
Repository: spark Updated Branches: refs/heads/master 65f929d5b - e829bfa1a SPARK-3926 [CORE] Reopened: result of JavaRDD collectAsMap() is not serializable My original 'fix' didn't fix at all. Now, there's a unit test to check whether it works. Of the two options to really fix it -- copy the `Map` to a `java.util.HashMap`, or copy and modify Scala's implementation in `Wrappers.MapWrapper`, I went with the latter. Author: Sean Owen so...@cloudera.com Closes #3587 from srowen/SPARK-3926 and squashes the following commits: 8586bb9 [Sean Owen] Remove unneeded no-arg constructor, and add additional note about copied code in LICENSE 7bb0e66 [Sean Owen] Make SerializableMapWrapper actually serialize, and add unit test Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e829bfa1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e829bfa1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e829bfa1 Branch: refs/heads/master Commit: e829bfa1ab9b68f44c489d26efb042f793fd9362 Parents: 65f929d Author: Sean Owen so...@cloudera.com Authored: Mon Dec 8 16:13:03 2014 -0800 Committer: Josh Rosen joshro...@databricks.com Committed: Mon Dec 8 16:13:03 2014 -0800 -- LICENSE | 3 +- .../org/apache/spark/api/java/JavaUtils.scala | 62 +++- .../java/org/apache/spark/JavaAPISuite.java | 13 3 files changed, 75 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e829bfa1/LICENSE -- diff --git a/LICENSE b/LICENSE index 3c667bf..0a42d38 100644 --- a/LICENSE +++ b/LICENSE @@ -646,7 +646,8 @@ THE SOFTWARE. For Scala Interpreter classes (all .scala files in repl/src/main/scala -except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala): +except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala), +and for SerializableMapWrapper in JavaUtils.scala: Copyright (c) 2002-2013 EPFL http://git-wip-us.apache.org/repos/asf/spark/blob/e829bfa1/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala index b52d0a5..86e9493 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala @@ -19,7 +19,8 @@ package org.apache.spark.api.java import com.google.common.base.Optional -import scala.collection.convert.Wrappers.MapWrapper +import java.{util = ju} +import scala.collection.mutable private[spark] object JavaUtils { def optionToOptional[T](option: Option[T]): Optional[T] = @@ -32,7 +33,64 @@ private[spark] object JavaUtils { def mapAsSerializableJavaMap[A, B](underlying: collection.Map[A, B]) = new SerializableMapWrapper(underlying) + // Implementation is copied from scala.collection.convert.Wrappers.MapWrapper, + // but implements java.io.Serializable. It can't just be subclassed to make it + // Serializable since the MapWrapper class has no no-arg constructor. This class + // doesn't need a no-arg constructor though. class SerializableMapWrapper[A, B](underlying: collection.Map[A, B]) -extends MapWrapper(underlying) with java.io.Serializable +extends ju.AbstractMap[A, B] with java.io.Serializable { self = +override def size = underlying.size + +override def get(key: AnyRef): B = try { + underlying get key.asInstanceOf[A] match { +case None = null.asInstanceOf[B] +case Some(v) = v + } +} catch { + case ex: ClassCastException = null.asInstanceOf[B] +} + +override def entrySet: ju.Set[ju.Map.Entry[A, B]] = new ju.AbstractSet[ju.Map.Entry[A, B]] { + def size = self.size + + def iterator = new ju.Iterator[ju.Map.Entry[A, B]] { +val ui = underlying.iterator +var prev : Option[A] = None + +def hasNext = ui.hasNext + +def next() = { + val (k, v) = ui.next + prev = Some(k) + new ju.Map.Entry[A, B] { +import scala.util.hashing.byteswap32 +def getKey = k +def getValue = v +def setValue(v1 : B) = self.put(k, v1) +override def hashCode = byteswap32(k.hashCode) + (byteswap32(v.hashCode) 16) +override def equals(other: Any) = other match { + case e: ju.Map.Entry[_, _] = k == e.getKey v == e.getValue + case _ = false +} +
spark git commit: SPARK-3926 [CORE] Reopened: result of JavaRDD collectAsMap() is not serializable
Repository: spark Updated Branches: refs/heads/branch-1.1 16bc77b7d - fe7d7a983 SPARK-3926 [CORE] Reopened: result of JavaRDD collectAsMap() is not serializable My original 'fix' didn't fix at all. Now, there's a unit test to check whether it works. Of the two options to really fix it -- copy the `Map` to a `java.util.HashMap`, or copy and modify Scala's implementation in `Wrappers.MapWrapper`, I went with the latter. Author: Sean Owen so...@cloudera.com Closes #3587 from srowen/SPARK-3926 and squashes the following commits: 8586bb9 [Sean Owen] Remove unneeded no-arg constructor, and add additional note about copied code in LICENSE 7bb0e66 [Sean Owen] Make SerializableMapWrapper actually serialize, and add unit test (cherry picked from commit e829bfa1ab9b68f44c489d26efb042f793fd9362) Signed-off-by: Josh Rosen joshro...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe7d7a98 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe7d7a98 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe7d7a98 Branch: refs/heads/branch-1.1 Commit: fe7d7a9834c953d155b3157242b722c67972eec5 Parents: 16bc77b Author: Sean Owen so...@cloudera.com Authored: Mon Dec 8 16:13:03 2014 -0800 Committer: Josh Rosen joshro...@databricks.com Committed: Mon Dec 8 16:14:28 2014 -0800 -- LICENSE | 3 +- .../org/apache/spark/api/java/JavaUtils.scala | 62 +++- .../java/org/apache/spark/JavaAPISuite.java | 13 3 files changed, 75 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fe7d7a98/LICENSE -- diff --git a/LICENSE b/LICENSE index e9a1153..ad120d8 100644 --- a/LICENSE +++ b/LICENSE @@ -363,7 +363,8 @@ THE SOFTWARE. For Scala Interpreter classes (all .scala files in repl/src/main/scala -except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala): +except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala), +and for SerializableMapWrapper in JavaUtils.scala: Copyright (c) 2002-2013 EPFL http://git-wip-us.apache.org/repos/asf/spark/blob/fe7d7a98/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala index b52d0a5..86e9493 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala @@ -19,7 +19,8 @@ package org.apache.spark.api.java import com.google.common.base.Optional -import scala.collection.convert.Wrappers.MapWrapper +import java.{util = ju} +import scala.collection.mutable private[spark] object JavaUtils { def optionToOptional[T](option: Option[T]): Optional[T] = @@ -32,7 +33,64 @@ private[spark] object JavaUtils { def mapAsSerializableJavaMap[A, B](underlying: collection.Map[A, B]) = new SerializableMapWrapper(underlying) + // Implementation is copied from scala.collection.convert.Wrappers.MapWrapper, + // but implements java.io.Serializable. It can't just be subclassed to make it + // Serializable since the MapWrapper class has no no-arg constructor. This class + // doesn't need a no-arg constructor though. class SerializableMapWrapper[A, B](underlying: collection.Map[A, B]) -extends MapWrapper(underlying) with java.io.Serializable +extends ju.AbstractMap[A, B] with java.io.Serializable { self = +override def size = underlying.size + +override def get(key: AnyRef): B = try { + underlying get key.asInstanceOf[A] match { +case None = null.asInstanceOf[B] +case Some(v) = v + } +} catch { + case ex: ClassCastException = null.asInstanceOf[B] +} + +override def entrySet: ju.Set[ju.Map.Entry[A, B]] = new ju.AbstractSet[ju.Map.Entry[A, B]] { + def size = self.size + + def iterator = new ju.Iterator[ju.Map.Entry[A, B]] { +val ui = underlying.iterator +var prev : Option[A] = None + +def hasNext = ui.hasNext + +def next() = { + val (k, v) = ui.next + prev = Some(k) + new ju.Map.Entry[A, B] { +import scala.util.hashing.byteswap32 +def getKey = k +def getValue = v +def setValue(v1 : B) = self.put(k, v1) +override def hashCode = byteswap32(k.hashCode) + (byteswap32(v.hashCode) 16) +override def equals(other: Any) = other match { +
spark git commit: SPARK-4770. [DOC] [YARN] spark.scheduler.minRegisteredResourcesRatio doc...
Repository: spark Updated Branches: refs/heads/master e829bfa1a - cda94d15e SPARK-4770. [DOC] [YARN] spark.scheduler.minRegisteredResourcesRatio doc... ...umented default is incorrect for YARN Author: Sandy Ryza sa...@cloudera.com Closes #3624 from sryza/sandy-spark-4770 and squashes the following commits: bd81a3a [Sandy Ryza] SPARK-4770. [DOC] [YARN] spark.scheduler.minRegisteredResourcesRatio documented default is incorrect for YARN Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cda94d15 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cda94d15 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cda94d15 Branch: refs/heads/master Commit: cda94d15ea2a70ed3f0651ba2766b1e2f80308c1 Parents: e829bfa Author: Sandy Ryza sa...@cloudera.com Authored: Mon Dec 8 16:28:36 2014 -0800 Committer: Josh Rosen joshro...@databricks.com Committed: Mon Dec 8 16:28:36 2014 -0800 -- docs/configuration.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cda94d15/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index 4b4bbea..d50b046 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -939,11 +939,11 @@ Apart from these, the following properties are also available, and may be useful /tr /tr tdcodespark.scheduler.minRegisteredResourcesRatio/code/td - td0/td + td0.0 for Mesos and Standalone mode, 0.8 for YARN/td td The minimum ratio of registered resources (registered resources / total expected resources) (resources are executors in yarn mode, CPU cores in standalone mode) -to wait for before scheduling begins. Specified as a double between 0 and 1. +to wait for before scheduling begins. Specified as a double between 0.0 and 1.0. Regardless of whether the minimum ratio of resources has been reached, the maximum amount of time it will wait before scheduling begins is controlled by config codespark.scheduler.maxRegisteredResourcesWaitingTime/code. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-4770. [DOC] [YARN] spark.scheduler.minRegisteredResourcesRatio doc...
Repository: spark Updated Branches: refs/heads/branch-1.2 9ed5641a5 - f4160324c SPARK-4770. [DOC] [YARN] spark.scheduler.minRegisteredResourcesRatio doc... ...umented default is incorrect for YARN Author: Sandy Ryza sa...@cloudera.com Closes #3624 from sryza/sandy-spark-4770 and squashes the following commits: bd81a3a [Sandy Ryza] SPARK-4770. [DOC] [YARN] spark.scheduler.minRegisteredResourcesRatio documented default is incorrect for YARN (cherry picked from commit cda94d15ea2a70ed3f0651ba2766b1e2f80308c1) Signed-off-by: Josh Rosen joshro...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f4160324 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f4160324 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f4160324 Branch: refs/heads/branch-1.2 Commit: f4160324c55b4d168421af5473ce306bc03a77bb Parents: 9ed5641 Author: Sandy Ryza sa...@cloudera.com Authored: Mon Dec 8 16:28:36 2014 -0800 Committer: Josh Rosen joshro...@databricks.com Committed: Mon Dec 8 16:28:57 2014 -0800 -- docs/configuration.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f4160324/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index cdd9f1e..55d41c0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -939,11 +939,11 @@ Apart from these, the following properties are also available, and may be useful /tr /tr tdcodespark.scheduler.minRegisteredResourcesRatio/code/td - td0/td + td0.0 for Mesos and Standalone mode, 0.8 for YARN/td td The minimum ratio of registered resources (registered resources / total expected resources) (resources are executors in yarn mode, CPU cores in standalone mode) -to wait for before scheduling begins. Specified as a double between 0 and 1. +to wait for before scheduling begins. Specified as a double between 0.0 and 1.0. Regardless of whether the minimum ratio of resources has been reached, the maximum amount of time it will wait before scheduling begins is controlled by config codespark.scheduler.maxRegisteredResourcesWaitingTime/code. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SQL] remove unnecessary import in spark-sql
Repository: spark Updated Branches: refs/heads/master cda94d15e - 944384363 [SQL] remove unnecessary import in spark-sql Author: Jacky Li jacky.li...@huawei.com Closes #3630 from jackylk/remove and squashes the following commits: 150e7e0 [Jacky Li] remove unnecessary import Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94438436 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94438436 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94438436 Branch: refs/heads/master Commit: 944384363d390a133529c08a1d0ac70aa8e778b5 Parents: cda94d1 Author: Jacky Li jacky.li...@huawei.com Authored: Mon Dec 8 17:27:46 2014 -0800 Committer: Josh Rosen joshro...@databricks.com Committed: Mon Dec 8 17:27:46 2014 -0800 -- sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala | 2 +- .../main/scala/org/apache/spark/sql/execution/ExistingRDD.scala | 4 +--- .../org/apache/spark/sql/execution/GeneratedAggregate.scala | 1 - .../main/scala/org/apache/spark/sql/execution/SparkPlan.scala| 3 --- .../org/apache/spark/sql/types/util/DataTypeConversions.scala| 2 +- 5 files changed, 3 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/94438436/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index c6d4dab..95d73c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql import java.util.{Map = JMap, List = JList} -import java.io.StringWriter + import scala.collection.JavaConversions._ http://git-wip-us.apache.org/repos/asf/spark/blob/94438436/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index ed6b95d..d2d8cb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -19,13 +19,11 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataType, StructType, Row, SQLContext} +import org.apache.spark.sql.{StructType, Row, SQLContext} import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.ScalaReflection.Schema import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} -import org.apache.spark.sql.catalyst.types.UserDefinedType /** * :: DeveloperApi :: http://git-wip-us.apache.org/repos/asf/spark/blob/94438436/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index 18afc5d..7c3bf94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.trees._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ http://git-wip-us.apache.org/repos/asf/spark/blob/94438436/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 81c60e0..4cd8e7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -22,14 +22,11 @@ import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.{ScalaReflection, trees} -import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import
spark git commit: [SPARK-4769] [SQL] CTAS does not work when reading from temporary tables
Repository: spark Updated Branches: refs/heads/master 944384363 - 51b1fe142 [SPARK-4769] [SQL] CTAS does not work when reading from temporary tables This is the code refactor and follow ups for #2570 Author: Cheng Hao hao.ch...@intel.com Closes #3336 from chenghao-intel/createtbl and squashes the following commits: 3563142 [Cheng Hao] remove the unused variable e215187 [Cheng Hao] eliminate the compiling warning 4f97f14 [Cheng Hao] fix bug in unittest 5d58812 [Cheng Hao] revert the API changes b85b620 [Cheng Hao] fix the regression of temp tabl not found in CTAS Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51b1fe14 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51b1fe14 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51b1fe14 Branch: refs/heads/master Commit: 51b1fe1426ffecac6c4644523633ea1562ff9a4e Parents: 9443843 Author: Cheng Hao hao.ch...@intel.com Authored: Mon Dec 8 17:39:12 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Dec 8 17:39:12 2014 -0800 -- .../spark/sql/hive/HiveMetastoreCatalog.scala | 26 ++-- .../apache/spark/sql/hive/HiveStrategies.scala | 14 --- .../hive/execution/CreateTableAsSelect.scala| 16 .../sql/hive/execution/SQLQuerySuite.scala | 9 +++ 4 files changed, 49 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/51b1fe14/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 91a1577..6086563 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -254,15 +254,37 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with * For example, because of a CREATE TABLE X AS statement. */ object CreateTables extends Rule[LogicalPlan] { +import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.hive.ql.parse.{QB, ASTNode, SemanticAnalyzer} + def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Wait until children are resolved. case p: LogicalPlan if !p.childrenResolved = p - case CreateTableAsSelect(db, tableName, child, allowExisting, extra) = + case CreateTableAsSelect(db, tableName, child, allowExisting, Some(extra: ASTNode)) = val (dbName, tblName) = processDatabaseAndTableName(db, tableName) val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) -CreateTableAsSelect(Some(databaseName), tableName, child, allowExisting, extra) +// Get the CreateTableDesc from Hive SemanticAnalyzer +val desc: Option[CreateTableDesc] = if (tableExists(Some(databaseName), tblName)) { + None +} else { + val sa = new SemanticAnalyzer(hive.hiveconf) { +override def analyzeInternal(ast: ASTNode) { + // A hack to intercept the SemanticAnalyzer.analyzeInternal, + // to ignore the SELECT clause of the CTAS + val method = classOf[SemanticAnalyzer].getDeclaredMethod( +analyzeCreateTable, classOf[ASTNode], classOf[QB]) + method.setAccessible(true) + method.invoke(this, ast, this.getQB) +} + } + + sa.analyze(extra, new Context(hive.hiveconf)) + Some(sa.getQB().getTableDesc) +} + +CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, desc) } } http://git-wip-us.apache.org/repos/asf/spark/blob/51b1fe14/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index edf291f..5f02e95 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import org.apache.hadoop.hive.ql.parse.ASTNode +import org.apache.hadoop.hive.ql.plan.CreateTableDesc import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute @@ -181,13 +182,20 @@ private[hive] trait HiveStrategies { execution.InsertIntoHiveTable( table, partition, planLater(child), overwrite)(hiveContext) :: Nil
spark git commit: [SPARK-3154][STREAMING] Replace ConcurrentHashMap with mutable.HashMap and remove @volatile from 'stopped'
Repository: spark Updated Branches: refs/heads/master 51b1fe142 - bcb5cdad6 [SPARK-3154][STREAMING] Replace ConcurrentHashMap with mutable.HashMap and remove @volatile from 'stopped' Since `sequenceNumberToProcessor` and `stopped` are both protected by the lock `sequenceNumberToProcessor`, `ConcurrentHashMap` and `volatile` is unnecessary. So this PR updated them accordingly. Author: zsxwing zsxw...@gmail.com Closes #3634 from zsxwing/SPARK-3154 and squashes the following commits: 0d087ac [zsxwing] Replace ConcurrentHashMap with mutable.HashMap and remove @volatile from 'stopped' Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bcb5cdad Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bcb5cdad Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bcb5cdad Branch: refs/heads/master Commit: bcb5cdad614d4fce43725dfec3ce88172d2f8c11 Parents: 51b1fe1 Author: zsxwing zsxw...@gmail.com Authored: Mon Dec 8 23:54:15 2014 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Mon Dec 8 23:54:15 2014 -0800 -- .../flume/sink/SparkAvroCallbackHandler.scala | 23 ++-- 1 file changed, 12 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bcb5cdad/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala -- diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala index 3c656a3..4373be4 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala @@ -16,10 +16,10 @@ */ package org.apache.spark.streaming.flume.sink -import java.util.concurrent.{CountDownLatch, ConcurrentHashMap, Executors} +import java.util.concurrent.{CountDownLatch, Executors} import java.util.concurrent.atomic.AtomicLong -import scala.collection.JavaConversions._ +import scala.collection.mutable import org.apache.flume.Channel import org.apache.commons.lang.RandomStringUtils @@ -47,8 +47,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads, new ThreadFactoryBuilder().setDaemon(true) .setNameFormat(Spark Sink Processor Thread - %d).build())) - private val sequenceNumberToProcessor = -new ConcurrentHashMap[CharSequence, TransactionProcessor]() + // Protected by `sequenceNumberToProcessor` + private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]() // This sink will not persist sequence numbers and reuses them if it gets restarted. // So it is possible to commit a transaction which may have been meant for the sink before the // restart. @@ -58,8 +58,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha private val seqBase = RandomStringUtils.randomAlphanumeric(8) private val seqCounter = new AtomicLong(0) - - @volatile private var stopped = false + // Protected by `sequenceNumberToProcessor` + private var stopped = false @volatile private var isTest = false private var testLatch: CountDownLatch = null @@ -131,7 +131,7 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha * @param success Whether the batch was successful or not. */ private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) { -Option(removeAndGetProcessor(sequenceNumber)).foreach(processor = { +removeAndGetProcessor(sequenceNumber).foreach(processor = { processor.batchProcessed(success) }) } @@ -139,10 +139,11 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha /** * Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak. * @param sequenceNumber - * @return The transaction processor for the corresponding batch. Note that this instance is no - * longer tracked and the caller is responsible for that txn processor. + * @return An `Option` of the transaction processor for the corresponding batch. Note that this + * instance is no longer tracked and the caller is responsible for that txn processor. */ - private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = { + private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): + Option[TransactionProcessor] = {