spark git commit: [SPARK-9015] [BUILD] Clean project import in scala ide
Repository: spark Updated Branches: refs/heads/master 4ea6480a3 - b536d5dc6 [SPARK-9015] [BUILD] Clean project import in scala ide Cleanup maven for a clean import in scala-ide / eclipse. * remove groovy plugin which is really not needed at all * add-source from build-helper-maven-plugin is not needed as recent version of scala-maven-plugin do it automatically * add lifecycle-mapping plugin to hide a few useless warnings from ide Author: Jan Prach jen...@gmail.com Closes #7375 from jendap/clean-project-import-in-scala-ide and squashes the following commits: c4b4c0f [Jan Prach] fix whitespaces 5a83e07 [Jan Prach] Revert remove java compiler warnings from java tests 312007e [Jan Prach] scala-maven-plugin itself add scala sources by default f47d856 [Jan Prach] remove spark-1.4-staging repository c8a54db [Jan Prach] remove java compiler warnings from java tests 999a068 [Jan Prach] remove some maven warnings in scala ide 80fbdc5 [Jan Prach] remove groovy and gmavenplus plugin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b536d5dc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b536d5dc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b536d5dc Branch: refs/heads/master Commit: b536d5dc6c2c712270b8130ddd9945dff19a27d9 Parents: 4ea6480 Author: Jan Prach jen...@gmail.com Authored: Thu Jul 16 18:42:41 2015 +0100 Committer: Sean Owen so...@cloudera.com Committed: Thu Jul 16 18:42:41 2015 +0100 -- pom.xml | 130 -- repl/pom.xml | 2 - sql/core/pom.xml | 1 - sql/hive/pom.xml | 1 - tools/pom.xml| 4 -- 5 files changed, 53 insertions(+), 85 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b536d5dc/pom.xml -- diff --git a/pom.xml b/pom.xml index aa49e2a..c5c6558 100644 --- a/pom.xml +++ b/pom.xml @@ -152,7 +152,6 @@ aws.kinesis.client.version1.2.1/aws.kinesis.client.version commons.httpclient.version4.3.2/commons.httpclient.version commons.math3.version3.4.1/commons.math3.version - test_classpath_file${project.build.directory}/spark-test-classpath.txt/test_classpath_file scala.version2.10.4/scala.version scala.binary.version2.10/scala.binary.version jline.version${scala.version}/jline.version @@ -283,18 +282,6 @@ enabledfalse/enabled /snapshots /repository -!-- TODO: This can be deleted after Spark 1.4 is posted -- -repository - idspark-1.4-staging/id - nameSpark 1.4 RC4 Staging Repository/name - urlhttps://repository.apache.org/content/repositories/orgapachespark-1112/url - releases -enabledtrue/enabled - /releases - snapshots -enabledfalse/enabled - /snapshots -/repository /repositories pluginRepositories pluginRepository @@ -319,17 +306,6 @@ version1.0.0/version /dependency !-- - This depndency has been added to provided scope as it is needed for executing build - specific groovy scripts using gmaven+ and not required for downstream project building - with spark. --- -dependency - groupIdorg.codehaus.groovy/groupId - artifactIdgroovy-all/artifactId - version2.3.7/version - scopeprovided/scope -/dependency -!-- This is needed by the scalatest plugin, and so is declared here to be available in all child modules, just as scalatest is run in all children -- @@ -1412,6 +1388,58 @@ artifactIdmaven-deploy-plugin/artifactId version2.8.2/version /plugin +!-- This plugin's configuration is used to store Eclipse m2e settings only. -- +!-- It has no influence on the Maven build itself. -- +plugin + groupIdorg.eclipse.m2e/groupId + artifactIdlifecycle-mapping/artifactId + version1.0.0/version + configuration +lifecycleMappingMetadata + pluginExecutions +pluginExecution + pluginExecutionFilter +groupIdorg.apache.maven.plugins/groupId +artifactIdmaven-dependency-plugin/artifactId +versionRange[2.8,)/versionRange +goals + goalbuild-classpath/goal +/goals + /pluginExecutionFilter + action +ignore/ignore + /action +/pluginExecution +pluginExecution + pluginExecutionFilter +groupIdorg.apache.maven.plugins/groupId +artifactIdmaven-jar-plugin/artifactId +
spark git commit: [SPARK-6941] [SQL] Provide a better error message to when inserting into RDD based table
Repository: spark Updated Branches: refs/heads/master b536d5dc6 - 43dac2c88 [SPARK-6941] [SQL] Provide a better error message to when inserting into RDD based table JIRA: https://issues.apache.org/jira/browse/SPARK-6941 Author: Yijie Shen henry.yijies...@gmail.com Closes #7342 from yijieshen/SPARK-6941 and squashes the following commits: f82cbe7 [Yijie Shen] reorder import dd67e40 [Yijie Shen] resolve comments 09518af [Yijie Shen] fix import order in DataframeSuite 0c635d4 [Yijie Shen] make match more specific 9df388d [Yijie Shen] move check into PreWriteCheck 847ab20 [Yijie Shen] Detect insertion error in DataSourceStrategy Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43dac2c8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43dac2c8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43dac2c8 Branch: refs/heads/master Commit: 43dac2c880d6f310a958531aee0bb4ac1d9b7025 Parents: b536d5d Author: Yijie Shen henry.yijies...@gmail.com Authored: Thu Jul 16 10:52:09 2015 -0700 Committer: Yin Huai yh...@databricks.com Committed: Thu Jul 16 10:52:09 2015 -0700 -- .../org/apache/spark/sql/sources/rules.scala| 9 +++- .../org/apache/spark/sql/DataFrameSuite.scala | 55 ++-- 2 files changed, 60 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/43dac2c8/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala index a3fd7f1..40ee048 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.{SaveMode, AnalysisException} import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog} import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias} import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types.DataType @@ -119,6 +119,13 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan = // The relation in l is not an InsertableRelation. failAnalysis(s$l does not allow insertion.) + case logical.InsertIntoTable(t, _, _, _, _) = +if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || t.isInstanceOf[LocalRelation]) { + failAnalysis(sInserting into an RDD-based table is not allowed.) +} else { + // OK +} + case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) = // When the SaveMode is Overwrite, we need to check if the table is an input table of // the query. If so, we will throw an AnalysisException to let users know it is not allowed. http://git-wip-us.apache.org/repos/asf/spark/blob/43dac2c8/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f592a99..23244fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -17,19 +17,23 @@ package org.apache.spark.sql +import java.io.File + import scala.language.postfixOps +import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ -import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint} - +import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SQLTestUtils} -class DataFrameSuite extends QueryTest { +class DataFrameSuite extends QueryTest with SQLTestUtils { import org.apache.spark.sql.TestData._ lazy val ctx = org.apache.spark.sql.test.TestSQLContext import ctx.implicits._ + def sqlContext: SQLContext = ctx + test(analysis error should be eagerly reported) { val oldSetting = ctx.conf.dataFrameEagerAnalysis // Eager analysis. @@ -761,4 +765,49 @@ class DataFrameSuite extends QueryTest { assert(f.getMessage.contains(column3)) assert(!f.getMessage.contains(column2)) } + + test(SPARK-6941: Better error message for inserting into RDD-based Table) { +withTempDir { dir = + + val tempParquetFile = new File(dir, tmp_parquet) + val tempJsonFile = new
spark git commit: [SPARK-8807] [SPARKR] Add between operator in SparkR
Repository: spark Updated Branches: refs/heads/master e27212317 - 0a795336d [SPARK-8807] [SPARKR] Add between operator in SparkR JIRA: https://issues.apache.org/jira/browse/SPARK-8807 Add between operator in SparkR. Author: Liang-Chi Hsieh vii...@appier.com Closes #7356 from viirya/add_r_between and squashes the following commits: 7f51b44 [Liang-Chi Hsieh] Add test for non-numeric column. c6a25c5 [Liang-Chi Hsieh] Add between function. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a795336 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a795336 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a795336 Branch: refs/heads/master Commit: 0a795336df20c7ec969366e613286f0c060a4eeb Parents: e272123 Author: Liang-Chi Hsieh vii...@appier.com Authored: Wed Jul 15 23:36:57 2015 -0700 Committer: Shivaram Venkataraman shiva...@cs.berkeley.edu Committed: Wed Jul 15 23:36:57 2015 -0700 -- R/pkg/NAMESPACE | 1 + R/pkg/R/column.R | 17 + R/pkg/R/generics.R | 4 R/pkg/inst/tests/test_sparkSQL.R | 12 4 files changed, 34 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0a795336/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 7f85722..331307c 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -77,6 +77,7 @@ exportMethods(abs, atan, atan2, avg, + between, cast, cbrt, ceiling, http://git-wip-us.apache.org/repos/asf/spark/blob/0a795336/R/pkg/R/column.R -- diff --git a/R/pkg/R/column.R b/R/pkg/R/column.R index 8e4b0f5..2892e14 100644 --- a/R/pkg/R/column.R +++ b/R/pkg/R/column.R @@ -187,6 +187,23 @@ setMethod(substr, signature(x = Column), column(jc) }) +#' between +#' +#' Test if the column is between the lower bound and upper bound, inclusive. +#' +#' @rdname column +#' +#' @param bounds lower and upper bounds +setMethod(between, signature(x = Column), + function(x, bounds) { +if (is.vector(bounds) length(bounds) == 2) { + jc - callJMethod(x@jc, between, bounds[1], bounds[2]) + column(jc) +} else { + stop(bounds should be a vector of lower and upper bounds) +} + }) + #' Casts the column to a different data type. #' #' @rdname column http://git-wip-us.apache.org/repos/asf/spark/blob/0a795336/R/pkg/R/generics.R -- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index fad9d71..ebe6fbd 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -569,6 +569,10 @@ setGeneric(avg, function(x, ...) { standardGeneric(avg) }) #' @rdname column #' @export +setGeneric(between, function(x, bounds) { standardGeneric(between) }) + +#' @rdname column +#' @export setGeneric(cast, function(x, dataType) { standardGeneric(cast) }) #' @rdname column http://git-wip-us.apache.org/repos/asf/spark/blob/0a795336/R/pkg/inst/tests/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index 76f74f8..cdfe648 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -638,6 +638,18 @@ test_that(column functions, { c7 - floor(c) + log(c) + log10(c) + log1p(c) + rint(c) c8 - sign(c) + sin(c) + sinh(c) + tan(c) + tanh(c) c9 - toDegrees(c) + toRadians(c) + + df - jsonFile(sqlContext, jsonPath) + df2 - select(df, between(df$age, c(20, 30)), between(df$age, c(10, 20))) + expect_equal(collect(df2)[[2, 1]], TRUE) + expect_equal(collect(df2)[[2, 2]], FALSE) + expect_equal(collect(df2)[[3, 1]], FALSE) + expect_equal(collect(df2)[[3, 2]], TRUE) + + df3 - select(df, between(df$name, c(Apache, Spark))) + expect_equal(collect(df3)[[1, 1]], TRUE) + expect_equal(collect(df3)[[2, 1]], FALSE) + expect_equal(collect(df3)[[3, 1]], TRUE) }) test_that(column binary mathfunctions, { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8972] [SQL] Incorrect result for rollup
Repository: spark Updated Branches: refs/heads/master ba3309684 - e27212317 [SPARK-8972] [SQL] Incorrect result for rollup We don't support the complex expression keys in the rollup/cube, and we even will not report it if we have the complex group by keys, that will cause very confusing/incorrect result. e.g. `SELECT key%100 FROM src GROUP BY key %100 with ROLLUP` This PR adds an additional project during the analyzing for the complex GROUP BY keys, and that projection will be the child of `Expand`, so to `Expand`, the GROUP BY KEY are always the simple key(attribute names). Author: Cheng Hao hao.ch...@intel.com Closes #7343 from chenghao-intel/expand and squashes the following commits: 1ebbb59 [Cheng Hao] update the comment 827873f [Cheng Hao] update as feedback 34def69 [Cheng Hao] Add more unit test and comments c695760 [Cheng Hao] fix bug of incorrect result for rollup Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2721231 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2721231 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2721231 Branch: refs/heads/master Commit: e27212317c7341852c52d9a85137b8f94cb0d935 Parents: ba33096 Author: Cheng Hao hao.ch...@intel.com Authored: Wed Jul 15 23:35:27 2015 -0700 Committer: Yin Huai yh...@databricks.com Committed: Wed Jul 15 23:35:27 2015 -0700 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 42 +-- ...r CUBE #1-0-63b61fb3f0e74226001ad279be440864 | 6 +++ ...r CUBE #2-0-7a511f02a16f0af4f810b1666cfcd896 | 10 ...oupingSet-0-8c14c24670a4b06c440346277ce9cf1c | 10 ...Rollup #1-0-a78e3dbf242f240249e36b3d3fd0926a | 6 +++ ...Rollup #2-0-bf180c9d1a18f61b9d9f31bb0115cf89 | 10 ...Rollup #3-0-9257085d123728730be96b6d9fbb84ce | 10 .../sql/hive/execution/HiveQuerySuite.scala | 54 8 files changed, 145 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e2721231/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 891408e..df8e7f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -194,16 +194,52 @@ class Analyzer( } def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case a if !a.childrenResolved = a // be sure all of the children are resolved. case a: Cube = GroupingSets(bitmasks(a), a.groupByExprs, a.child, a.aggregations) case a: Rollup = GroupingSets(bitmasks(a), a.groupByExprs, a.child, a.aggregations) case x: GroupingSets = val gid = AttributeReference(VirtualColumn.groupingIdName, IntegerType, false)() +// We will insert another Projection if the GROUP BY keys contains the +// non-attribute expressions. And the top operators can references those +// expressions by its alias. +// e.g. SELECT key%5 as c1 FROM src GROUP BY key%5 == +// SELECT a as c1 FROM (SELECT key%5 AS a FROM src) GROUP BY a + +// find all of the non-attribute expressions in the GROUP BY keys +val nonAttributeGroupByExpressions = new ArrayBuffer[Alias]() + +// The pair of (the original GROUP BY key, associated attribute) +val groupByExprPairs = x.groupByExprs.map(_ match { + case e: NamedExpression = (e, e.toAttribute) + case other = { +val alias = Alias(other, other.toString)() +nonAttributeGroupByExpressions += alias // add the non-attributes expression alias +(other, alias.toAttribute) + } +}) + +// substitute the non-attribute expressions for aggregations. +val aggregation = x.aggregations.map(expr = expr.transformDown { + case e = groupByExprPairs.find(_._1.semanticEquals(e)).map(_._2).getOrElse(e) +}.asInstanceOf[NamedExpression]) + +// substitute the group by expressions. +val newGroupByExprs = groupByExprPairs.map(_._2) + +val child = if (nonAttributeGroupByExpressions.length 0) { + // insert additional projection if contains the + // non-attribute expressions in the GROUP BY keys + Project(x.child.output ++ nonAttributeGroupByExpressions, x.child) +} else { + x.child +} + Aggregate( - x.groupByExprs :+ VirtualColumn.groupingIdAttribute, - x.aggregations, -
spark git commit: [SPARK-8893] Add runtime checks against non-positive number of partitions
Repository: spark Updated Branches: refs/heads/master 0a795336d - 011551620 [SPARK-8893] Add runtime checks against non-positive number of partitions https://issues.apache.org/jira/browse/SPARK-8893 What does `sc.parallelize(1 to 3).repartition(p).collect` return? I would expect `Array(1, 2, 3)` regardless of `p`. But if `p` 1, it returns `Array()`. I think instead it should throw an `IllegalArgumentException`. I think the case is pretty clear for `p` 0. But the behavior for `p` = 0 is also error prone. In fact that's how I found this strange behavior. I used `rdd.repartition(a/b)` with positive `a` and `b`, but `a/b` was rounded down to zero and the results surprised me. I'd prefer an exception instead of unexpected (corrupt) results. Author: Daniel Darabos darabos.dan...@gmail.com Closes #7285 from darabos/patch-1 and squashes the following commits: decba82 [Daniel Darabos] Allow repartitioning empty RDDs to zero partitions. 97de852 [Daniel Darabos] Allow zero partition count in HashPartitioner f6ba5fb [Daniel Darabos] Use require() for simpler syntax. d5e3df8 [Daniel Darabos] Require positive number of partitions in HashPartitioner 897c628 [Daniel Darabos] Require positive maxPartitions in CoalescedRDD Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01155162 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01155162 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01155162 Branch: refs/heads/master Commit: 011551620faa87107a787530f074af3d9be7e695 Parents: 0a79533 Author: Daniel Darabos darabos.dan...@gmail.com Authored: Thu Jul 16 08:16:54 2015 +0100 Committer: Sean Owen so...@cloudera.com Committed: Thu Jul 16 08:16:54 2015 +0100 -- core/src/main/scala/org/apache/spark/Partitioner.scala | 2 ++ core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala | 5 - 2 files changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/01155162/core/src/main/scala/org/apache/spark/Partitioner.scala -- diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 82889bc..ad68512 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -76,6 +76,8 @@ object Partitioner { * produce an unexpected or incorrect result. */ class HashPartitioner(partitions: Int) extends Partitioner { + require(partitions = 0, sNumber of partitions ($partitions) cannot be negative.) + def numPartitions: Int = partitions def getPartition(key: Any): Int = key match { http://git-wip-us.apache.org/repos/asf/spark/blob/01155162/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 663eebb..90d9735 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -69,7 +69,7 @@ private[spark] case class CoalescedRDDPartition( * the preferred location of each new partition overlaps with as many preferred locations of its * parent partitions * @param prev RDD to be coalesced - * @param maxPartitions number of desired partitions in the coalesced RDD + * @param maxPartitions number of desired partitions in the coalesced RDD (must be positive) * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance */ private[spark] class CoalescedRDD[T: ClassTag]( @@ -78,6 +78,9 @@ private[spark] class CoalescedRDD[T: ClassTag]( balanceSlack: Double = 0.10) extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies + require(maxPartitions 0 || maxPartitions == prev.partitions.length, +sNumber of partitions ($maxPartitions) must be positive.) + override def getPartitions: Array[Partition] = { val pc = new PartitionCoalescer(maxPartitions, prev, balanceSlack) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8995] [SQL] cast date strings like '2015-01-01 12:15:31' to date
Repository: spark Updated Branches: refs/heads/master 011551620 - 4ea6480a3 [SPARK-8995] [SQL] cast date strings like '2015-01-01 12:15:31' to date Jira https://issues.apache.org/jira/browse/SPARK-8995 In PR #6981we noticed that we cannot cast date strings that contains a time, like '2015-03-18 12:39:40' to date. Besides it's not possible to cast a string like '18:03:20' to a timestamp. If a time is passed without a date, today is inferred as date. Author: Tarek Auel tarek.a...@googlemail.com Author: Tarek Auel tarek.a...@gmail.com Closes #7353 from tarekauel/SPARK-8995 and squashes the following commits: 14f333b [Tarek Auel] [SPARK-8995] added tests for daylight saving time ca1ae69 [Tarek Auel] [SPARK-8995] style fix d20b8b4 [Tarek Auel] [SPARK-8995] bug fix: distinguish between 0 and null ef05753 [Tarek Auel] [SPARK-8995] added check for year = 1000 01c9ff3 [Tarek Auel] [SPARK-8995] support for time strings 34ec573 [Tarek Auel] fixed style 71622c0 [Tarek Auel] improved timestamp and date parsing 0e30c0a [Tarek Auel] Hive compatibility cfbaed7 [Tarek Auel] fixed wrong checks 71f89c1 [Tarek Auel] [SPARK-8995] minor style fix f7452fa [Tarek Auel] [SPARK-8995] removed old timestamp parsing 30e5aec [Tarek Auel] [SPARK-8995] date and timestamp cast c1083fb [Tarek Auel] [SPARK-8995] cast date strings like '2015-01-01 12:15:31' to date or timestamp Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4ea6480a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4ea6480a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4ea6480a Branch: refs/heads/master Commit: 4ea6480a3ba4ca7e09089c9b99d4a855894b9015 Parents: 0115516 Author: Tarek Auel tarek.a...@googlemail.com Authored: Thu Jul 16 08:26:39 2015 -0700 Committer: Davies Liu davies@gmail.com Committed: Thu Jul 16 08:26:39 2015 -0700 -- .../spark/sql/catalyst/expressions/Cast.scala | 17 +- .../spark/sql/catalyst/util/DateTimeUtils.scala | 198 + .../sql/catalyst/expressions/CastSuite.scala| 144 .../sql/catalyst/util/DateTimeUtilsSuite.scala | 218 +++ 4 files changed, 562 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4ea6480a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index ab02add..83d5b3b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -167,17 +167,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w // TimestampConverter private[this] def castToTimestamp(from: DataType): Any = Any = from match { case StringType = - buildCast[UTF8String](_, utfs = { -// Throw away extra if more than 9 decimal places -val s = utfs.toString -val periodIdx = s.indexOf(.) -var n = s -if (periodIdx != -1 n.length() - periodIdx 9) { - n = n.substring(0, periodIdx + 10) -} -try DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(n)) -catch { case _: java.lang.IllegalArgumentException = null } - }) + buildCast[UTF8String](_, utfs = DateTimeUtils.stringToTimestamp(utfs).orNull) case BooleanType = buildCast[Boolean](_, b = if (b) 1L else 0) case LongType = @@ -220,10 +210,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w // DateConverter private[this] def castToDate(from: DataType): Any = Any = from match { case StringType = - buildCast[UTF8String](_, s = -try DateTimeUtils.fromJavaDate(Date.valueOf(s.toString)) -catch { case _: java.lang.IllegalArgumentException = null } - ) + buildCast[UTF8String](_, s = DateTimeUtils.stringToDate(s).orNull) case TimestampType = // throw valid precision more than seconds, according to Hive. // Timestamp.nanos is in 0 to 999,999,999, no more than a second. http://git-wip-us.apache.org/repos/asf/spark/blob/4ea6480a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index c1ddee3..53c32a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++
spark git commit: [SPARK-8646] PySpark does not run on YARN if master not provided in command line
Repository: spark Updated Branches: refs/heads/master 57e9b13bf - 49351c7f5 [SPARK-8646] PySpark does not run on YARN if master not provided in command line andrewor14 davies vanzin can you take a look at this? thanks Author: Lianhui Wang lianhuiwan...@gmail.com Closes #7438 from lianhuiwang/SPARK-8646 and squashes the following commits: cb3f12d [Lianhui Wang] add whitespace 6d874a6 [Lianhui Wang] support pyspark for yarn-client Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49351c7f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49351c7f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49351c7f Branch: refs/heads/master Commit: 49351c7f597c67950cc65e5014a89fad31b9a6f7 Parents: 57e9b13 Author: Lianhui Wang lianhuiwan...@gmail.com Authored: Thu Jul 16 19:31:14 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Thu Jul 16 19:31:45 2015 -0700 -- python/pyspark/context.py | 5 + yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/49351c7f/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index d746672..43bde5a 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -152,6 +152,11 @@ class SparkContext(object): self.master = self._conf.get(spark.master) self.appName = self._conf.get(spark.app.name) self.sparkHome = self._conf.get(spark.home, None) + +# Let YARN know it's a pyspark app, so it distributes needed libraries. +if self.master == yarn-client: +self._conf.set(spark.yarn.isPython, true) + for (k, v) in self._conf.getAll(): if k.startswith(spark.executorEnv.): varName = k[len(spark.executorEnv.):] http://git-wip-us.apache.org/repos/asf/spark/blob/49351c7f/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index f86b6d1..b74ea9a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -616,7 +616,7 @@ private[spark] class Client( val appId = newAppResponse.getApplicationId val appStagingDir = getAppStagingDir(appId) val pySparkArchives = - if (sys.props.getOrElse(spark.yarn.isPython, false).toBoolean) { + if (sparkConf.getBoolean(spark.yarn.isPython, false)) { findPySparkArchives() } else { Nil - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8119] HeartbeatReceiver should replace executors, not kill
Repository: spark Updated Branches: refs/heads/master d86bbb4e2 - 96aa3340f [SPARK-8119] HeartbeatReceiver should replace executors, not kill **Symptom.** If an executor in an application times out, `HeartbeatReceiver` attempts to kill it. After this happens, however, the application never gets an executor back even when there are cluster resources available. **Cause.** The issue is that `sc.killExecutor` automatically assumes that the application wishes to adjust its resource requirements permanently downwards. This is not the intention in `HeartbeatReceiver`, however, which simply wants a replacement for the expired executor. **Fix.** Differentiate between the intention to kill and the intention to replace an executor with a fresh one. More details can be found in the commit message. Author: Andrew Or and...@databricks.com Closes #7107 from andrewor14/heartbeat-no-kill and squashes the following commits: 1cd2cd7 [Andrew Or] Add regression test for SPARK-8119 25a347d [Andrew Or] Reuse more code in scheduler backend 31ebd40 [Andrew Or] Differentiate between kill and replace Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96aa3340 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96aa3340 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96aa3340 Branch: refs/heads/master Commit: 96aa3340f41d8de4560caec97e8f3de23252c792 Parents: d86bbb4 Author: Andrew Or and...@databricks.com Authored: Thu Jul 16 19:39:54 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Thu Jul 16 19:39:54 2015 -0700 -- .../org/apache/spark/HeartbeatReceiver.scala| 4 +- .../scala/org/apache/spark/SparkContext.scala | 40 - .../cluster/CoarseGrainedSchedulerBackend.scala | 40 +++-- .../apache/spark/HeartbeatReceiverSuite.scala | 147 --- 4 files changed, 194 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/96aa3340/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala -- diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 221b1da..43dd4a1 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -181,7 +181,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { - sc.killExecutor(executorId) + // Note: we want to get an executor back after expiring this one, + // so do not simply call `sc.killExecutor` here (SPARK-8119) + sc.killAndReplaceExecutor(executorId) } }) } http://git-wip-us.apache.org/repos/asf/spark/blob/96aa3340/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index bd1cc33..d00c012 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1419,6 +1419,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * :: DeveloperApi :: * Request that the cluster manager kill the specified executors. + * + * Note: This is an indication to the cluster manager that the application wishes to adjust + * its resource usage downwards. If the application wishes to replace the executors it kills + * through this method with new ones, it should follow up explicitly with a call to + * {{SparkContext#requestExecutors}}. + * * This is currently only supported in YARN mode. Return whether the request is received. */ @DeveloperApi @@ -1436,12 +1442,42 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * :: DeveloperApi :: - * Request that cluster manager the kill the specified executor. - * This is currently only supported in Yarn mode. Return whether the request is received. + * Request that the cluster manager kill the specified executor. + * + * Note: This is an indication to the cluster manager that the application wishes to adjust + * its resource usage downwards. If the application wishes to replace the executor it kills + * through this method with a new one, it should follow up explicitly with a call to + * {{SparkContext#requestExecutors}}. + * +
spark git commit: [SPARK-6284] [MESOS] Add mesos role, principal and secret
Repository: spark Updated Branches: refs/heads/master 49351c7f5 - d86bbb4e2 [SPARK-6284] [MESOS] Add mesos role, principal and secret Mesos supports framework authentication and role to be set per framework, which the role is used to identify the framework's role which impacts the sharing weight of resource allocation and optional authentication information to allow the framework to be connected to the master. Author: Timothy Chen tnac...@gmail.com Closes #4960 from tnachen/mesos_fw_auth and squashes the following commits: 0f9f03e [Timothy Chen] Fix review comments. 8f9488a [Timothy Chen] Fix rebase f7fc2a9 [Timothy Chen] Add mesos role, auth and secret. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d86bbb4e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d86bbb4e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d86bbb4e Branch: refs/heads/master Commit: d86bbb4e286f16f77ba125452b07827684eafeed Parents: 49351c7 Author: Timothy Chen tnac...@gmail.com Authored: Thu Jul 16 19:36:45 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Thu Jul 16 19:37:15 2015 -0700 -- .../mesos/CoarseMesosSchedulerBackend.scala | 35 +++--- .../cluster/mesos/MesosClusterScheduler.scala | 28 ++--- .../cluster/mesos/MesosSchedulerBackend.scala | 118 ++--- .../cluster/mesos/MesosSchedulerUtils.scala | 126 --- .../CoarseMesosSchedulerBackendSuite.scala | 19 ++- .../mesos/MesosSchedulerBackendSuite.scala | 106 +++- docs/running-on-mesos.md| 22 7 files changed, 358 insertions(+), 96 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d86bbb4e/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index cbade13..b7fde0d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -18,8 +18,8 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File -import java.util.{List = JList, Collections} import java.util.concurrent.locks.ReentrantLock +import java.util.{Collections, List = JList} import scala.collection.JavaConversions._ import scala.collection.mutable.{HashMap, HashSet} @@ -27,12 +27,11 @@ import scala.collection.mutable.{HashMap, HashSet} import com.google.common.collect.HashBiMap import org.apache.mesos.Protos.{TaskInfo = MesosTaskInfo, _} import org.apache.mesos.{Scheduler = MScheduler, _} -import org.apache.mesos.Protos.{TaskInfo = MesosTaskInfo, _} -import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState} import org.apache.spark.rpc.RpcAddress import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils +import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState} /** * A SchedulerBackend that runs tasks on Mesos, but uses coarse-grained tasks, where it holds @@ -69,7 +68,7 @@ private[spark] class CoarseMesosSchedulerBackend( /** * The total number of executors we aim to have. Undefined when not using dynamic allocation - * and before the ExecutorAllocatorManager calls [[doRequesTotalExecutors]]. + * and before the ExecutorAllocatorManager calls [[doRequestTotalExecutors]]. */ private var executorLimitOption: Option[Int] = None @@ -103,8 +102,9 @@ private[spark] class CoarseMesosSchedulerBackend( override def start() { super.start() -val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build() -startScheduler(master, CoarseMesosSchedulerBackend.this, fwInfo) +val driver = createSchedulerDriver( + master, CoarseMesosSchedulerBackend.this, sc.sparkUser, sc.appName, sc.conf) +startScheduler(driver) } def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = { @@ -224,24 +224,29 @@ private[spark] class CoarseMesosSchedulerBackend( taskIdToSlaveId(taskId) = slaveId slaveIdsWithExecutors += slaveId coresByTaskId(taskId) = cpusToUse - val task = MesosTaskInfo.newBuilder() + // Gather cpu resources from the available resources and use them in the task. + val (remainingResources, cpuResourcesToUse) = +partitionResources(offer.getResourcesList, cpus, cpusToUse)
spark git commit: [SPARK-6304] [STREAMING] Fix checkpointing doesn't retain driver port issue.
Repository: spark Updated Branches: refs/heads/master fec10f0c6 - 031d7d414 [SPARK-6304] [STREAMING] Fix checkpointing doesn't retain driver port issue. Author: jerryshao saisai.s...@intel.com Author: Saisai Shao saisai.s...@intel.com Closes #5060 from jerryshao/SPARK-6304 and squashes the following commits: 89b01f5 [jerryshao] Update the unit test to add more cases 275d252 [jerryshao] Address the comments 7cc146d [jerryshao] Address the comments 2624723 [jerryshao] Fix rebase conflict 45befaa [Saisai Shao] Update the unit test bbc1c9c [Saisai Shao] Fix checkpointing doesn't retain driver port issue Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/031d7d41 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/031d7d41 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/031d7d41 Branch: refs/heads/master Commit: 031d7d41430ec1f3c3353e33eab4821a9bcd58a5 Parents: fec10f0 Author: jerryshao saisai.s...@intel.com Authored: Thu Jul 16 16:55:46 2015 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Thu Jul 16 16:55:46 2015 -0700 -- .../org/apache/spark/streaming/Checkpoint.scala | 2 + .../spark/streaming/CheckpointSuite.scala | 45 +++- 2 files changed, 46 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/031d7d41/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 5279331..65d4e93 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -48,6 +48,8 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) // Reload properties for the checkpoint application since user wants to set a reload property // or spark had changed its value and user wants to set it back. val propertiesToReload = List( + spark.driver.host, + spark.driver.port, spark.master, spark.yarn.keytab, spark.yarn.principal) http://git-wip-us.apache.org/repos/asf/spark/blob/031d7d41/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 6a94928..d308ac0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -191,8 +191,51 @@ class CheckpointSuite extends TestSuiteBase { } } + // This tests if spark.driver.host and spark.driver.port is set by user, can be recovered + // with correct value. + test(get correct spark.driver.[host|port] from checkpoint) { +val conf = Map(spark.driver.host - localhost, spark.driver.port - ) +conf.foreach(kv = System.setProperty(kv._1, kv._2)) +ssc = new StreamingContext(master, framework, batchDuration) +val originalConf = ssc.conf +assert(originalConf.get(spark.driver.host) === localhost) +assert(originalConf.get(spark.driver.port) === ) + +val cp = new Checkpoint(ssc, Time(1000)) +ssc.stop() + +// Serialize/deserialize to simulate write to storage and reading it back +val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) + +val newCpConf = newCp.createSparkConf() +assert(newCpConf.contains(spark.driver.host)) +assert(newCpConf.contains(spark.driver.port)) +assert(newCpConf.get(spark.driver.host) === localhost) +assert(newCpConf.get(spark.driver.port) === ) + +// Check if all the parameters have been restored +ssc = new StreamingContext(null, newCp, null) +val restoredConf = ssc.conf +assert(restoredConf.get(spark.driver.host) === localhost) +assert(restoredConf.get(spark.driver.port) === ) +ssc.stop() + +// If spark.driver.host and spark.driver.host is not set in system property, these two +// parameters should not be presented in the newly recovered conf. +conf.foreach(kv = System.clearProperty(kv._1)) +val newCpConf1 = newCp.createSparkConf() +assert(!newCpConf1.contains(spark.driver.host)) +assert(!newCpConf1.contains(spark.driver.port)) + +// Spark itself will dispatch a random, not-used port for spark.driver.port if it is not set +// explicitly. +ssc = new StreamingContext(null, newCp, null) +val restoredConf1 = ssc.conf +assert(restoredConf1.get(spark.driver.host) ===
spark git commit: [SPARK-8644] Include call site in SparkException stack traces thrown by job failures
Repository: spark Updated Branches: refs/heads/master 031d7d414 - 57e9b13bf [SPARK-8644] Include call site in SparkException stack traces thrown by job failures Example exception (new part at bottom, clearly demarcated): ``` org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.RuntimeException: uh-oh! at org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37$$anonfun$38$$anonfun$apply$mcJ$sp$2.apply(DAGSchedulerSuite.scala:880) at org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37$$anonfun$38$$anonfun$apply$mcJ$sp$2.apply(DAGSchedulerSuite.scala:880) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1640) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1777) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1777) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1298) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1289) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1288) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1288) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:755) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:755) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:755) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1509) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1470) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1459) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:560) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1744) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1762) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1777) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1791) at org.apache.spark.rdd.RDD.count(RDD.scala:1099) at org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37$$anonfun$38.apply$mcJ$sp(DAGSchedulerSuite.scala:880) at org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37$$anonfun$38.apply(DAGSchedulerSuite.scala:880) at org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37$$anonfun$38.apply(DAGSchedulerSuite.scala:880) at org.scalatest.Assertions$class.intercept(Assertions.scala:997) at org.scalatest.FunSuite.intercept(FunSuite.scala:1555) at org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37.apply$mcV$sp(DAGSchedulerSuite.scala:879) at org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37.apply(DAGSchedulerSuite.scala:878) at org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37.apply(DAGSchedulerSuite.scala:878) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:42) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at