spark git commit: [SPARK-16331][SQL] Reduce code generation time
Repository: spark Updated Branches: refs/heads/master aa6564f37 -> 14cf61e90 [SPARK-16331][SQL] Reduce code generation time ## What changes were proposed in this pull request? During the code generation, a `LocalRelation` often has a huge `Vector` object as `data`. In the simple example below, a `LocalRelation` has a Vector with 100 elements of `UnsafeRow`. ``` val numRows = 100 val ds = (1 to numRows).toDS().persist() benchmark.addCase("filter+reduce") { iter => ds.filter(a => (a & 1) == 0).reduce(_ + _) } ``` At `TreeNode.transformChildren`, all elements of the vector is unnecessarily iterated to check whether any children exist in the vector since `Vector` is Traversable. This part significantly increases code generation time. This patch avoids this overhead by checking the number of children before iterating all elements; `LocalRelation` does not have children since it extends `LeafNode`. The performance of the above example ``` without this patch Java HotSpot(TM) 64-Bit Server VM 1.8.0_91-b14 on Mac OS X 10.11.5 Intel(R) Core(TM) i5-5257U CPU 2.70GHz compilationTime: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative filter+reduce 4426 / 4533 0.2 4426.0 1.0X with this patch compilationTime: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative filter+reduce 3117 / 3391 0.3 3116.6 1.0X ``` ## How was this patch tested? using existing unit tests Author: Hiroshi InoueCloses #14000 from inouehrs/compilation-time-reduction. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14cf61e9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14cf61e9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14cf61e9 Branch: refs/heads/master Commit: 14cf61e909598d9f6b9c3b920de7299e9bc828e0 Parents: aa6564f Author: Hiroshi Inoue Authored: Thu Jun 30 21:47:44 2016 -0700 Committer: Reynold Xin Committed: Thu Jun 30 21:47:44 2016 -0700 -- .../spark/sql/catalyst/trees/TreeNode.scala | 82 ++-- 1 file changed, 43 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/14cf61e9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 072445a..8bce404 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -315,25 +315,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { protected def transformChildren( rule: PartialFunction[BaseType, BaseType], nextOperation: (BaseType, PartialFunction[BaseType, BaseType]) => BaseType): BaseType = { -var changed = false -val newArgs = mapProductIterator { - case arg: TreeNode[_] if containsChild(arg) => -val newChild = nextOperation(arg.asInstanceOf[BaseType], rule) -if (!(newChild fastEquals arg)) { - changed = true - newChild -} else { - arg -} - case Some(arg: TreeNode[_]) if containsChild(arg) => -val newChild = nextOperation(arg.asInstanceOf[BaseType], rule) -if (!(newChild fastEquals arg)) { - changed = true - Some(newChild) -} else { - Some(arg) -} - case m: Map[_, _] => m.mapValues { +if (children.nonEmpty) { + var changed = false + val newArgs = mapProductIterator { case arg: TreeNode[_] if containsChild(arg) => val newChild = nextOperation(arg.asInstanceOf[BaseType], rule) if (!(newChild fastEquals arg)) { @@ -342,33 +326,53 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } else { arg } -case other => other - }.view.force // `mapValues` is lazy and we need to force it to materialize - case d: DataType => d // Avoid unpacking Structs - case args: Traversable[_] => args.map { -case arg: TreeNode[_] if containsChild(arg) => +case Some(arg: TreeNode[_]) if containsChild(arg) => val newChild = nextOperation(arg.asInstanceOf[BaseType], rule)
spark git commit: [SPARK-14608][ML] transformSchema needs better documentation
Repository: spark Updated Branches: refs/heads/branch-2.0 80a7bff89 -> cc3c44b11 [SPARK-14608][ML] transformSchema needs better documentation ## What changes were proposed in this pull request? jira: https://issues.apache.org/jira/browse/SPARK-14608 PipelineStage.transformSchema currently has minimal documentation. It should have more to explain it can: check schema check parameter interactions ## How was this patch tested? unit test Author: Yuhao YangAuthor: Yuhao Yang Closes #12384 from hhbyyh/transformSchemaDoc. (cherry picked from commit aa6564f37f1d8de77c3b7bfa885000252efffea6) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc3c44b1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc3c44b1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc3c44b1 Branch: refs/heads/branch-2.0 Commit: cc3c44b1196c4186c0b55e319460524e9b9f865b Parents: 80a7bff Author: Yuhao Yang Authored: Thu Jun 30 19:34:51 2016 -0700 Committer: Joseph K. Bradley Committed: Thu Jun 30 19:35:06 2016 -0700 -- mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cc3c44b1/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala index 25e56d7..a1d08b3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala @@ -44,7 +44,10 @@ abstract class PipelineStage extends Params with Logging { /** * :: DeveloperApi :: * - * Derives the output schema from the input schema. + * Check transform validity and derive the output schema from the input schema. + * + * Typical implementation should first conduct verification on schema change and parameter + * validity, including complex parameter interaction checks. */ @DeveloperApi def transformSchema(schema: StructType): StructType - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-14608][ML] transformSchema needs better documentation
Repository: spark Updated Branches: refs/heads/master 38f4d6f44 -> aa6564f37 [SPARK-14608][ML] transformSchema needs better documentation ## What changes were proposed in this pull request? jira: https://issues.apache.org/jira/browse/SPARK-14608 PipelineStage.transformSchema currently has minimal documentation. It should have more to explain it can: check schema check parameter interactions ## How was this patch tested? unit test Author: Yuhao YangAuthor: Yuhao Yang Closes #12384 from hhbyyh/transformSchemaDoc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa6564f3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa6564f3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa6564f3 Branch: refs/heads/master Commit: aa6564f37f1d8de77c3b7bfa885000252efffea6 Parents: 38f4d6f Author: Yuhao Yang Authored: Thu Jun 30 19:34:51 2016 -0700 Committer: Joseph K. Bradley Committed: Thu Jun 30 19:34:51 2016 -0700 -- mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aa6564f3/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala index 25e56d7..a1d08b3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala @@ -44,7 +44,10 @@ abstract class PipelineStage extends Params with Logging { /** * :: DeveloperApi :: * - * Derives the output schema from the input schema. + * Check transform validity and derive the output schema from the input schema. + * + * Typical implementation should first conduct verification on schema change and parameter + * validity, including complex parameter interaction checks. */ @DeveloperApi def transformSchema(schema: StructType): StructType - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15820][PYSPARK][SQL] Add Catalog.refreshTable into python API
Repository: spark Updated Branches: refs/heads/branch-2.0 94d61de9c -> 80a7bff89 [SPARK-15820][PYSPARK][SQL] Add Catalog.refreshTable into python API ## What changes were proposed in this pull request? Add Catalog.refreshTable API into python interface for Spark-SQL. ## How was this patch tested? Existing test. Author: WeichenXuCloses #13558 from WeichenXu123/update_python_sql_interface_refreshTable. (cherry picked from commit 5344bade8efb6f12aa43fbfbbbc2e3c0c7d16d98) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80a7bff8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80a7bff8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80a7bff8 Branch: refs/heads/branch-2.0 Commit: 80a7bff897554ce77fe6bc91d62cff8857892322 Parents: 94d61de Author: WeichenXu Authored: Thu Jun 30 23:00:39 2016 +0800 Committer: Cheng Lian Committed: Fri Jul 1 10:18:44 2016 +0800 -- python/pyspark/sql/catalog.py | 5 + .../src/main/scala/org/apache/spark/sql/catalog/Catalog.scala | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/80a7bff8/python/pyspark/sql/catalog.py -- diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 3033f14..4af930a 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -232,6 +232,11 @@ class Catalog(object): """Removes all cached tables from the in-memory cache.""" self._jcatalog.clearCache() +@since(2.0) +def refreshTable(self, tableName): +"""Invalidate and refresh all the cached metadata of the given table.""" +self._jcatalog.refreshTable(tableName) + def _reset(self): """(Internal use only) Drop all existing databases (except "default"), tables, partitions and functions, and set the current database to "default". http://git-wip-us.apache.org/repos/asf/spark/blob/80a7bff8/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 083a63c..91ed9b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -214,7 +214,7 @@ abstract class Catalog { def clearCache(): Unit /** - * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, + * Invalidate and refresh all the cached metadata of the given table. For performance reasons, * Spark SQL or the external data source library it uses might cache certain metadata about a * table, such as the location of blocks. When those change outside of Spark SQL, users should * call this function to invalidate the cache. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15954][SQL] Disable loading test tables in Python tests
Repository: spark Updated Branches: refs/heads/master 4a981dc87 -> 38f4d6f44 [SPARK-15954][SQL] Disable loading test tables in Python tests ## What changes were proposed in this pull request? This patch introduces a flag to disable loading test tables in TestHiveSparkSession and disables that in Python. This fixes an issue in which python/run-tests would fail due to failure to load test tables. Note that these test tables are not used outside of HiveCompatibilitySuite. In the long run we should probably decouple the loading of test tables from the test Hive setup. ## How was this patch tested? This is a test only change. Author: Reynold XinCloses #14005 from rxin/SPARK-15954. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38f4d6f4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38f4d6f4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38f4d6f4 Branch: refs/heads/master Commit: 38f4d6f44eaa03bdc703662e4a7be9c09ba86e16 Parents: 4a981dc Author: Reynold Xin Authored: Thu Jun 30 19:02:35 2016 -0700 Committer: Reynold Xin Committed: Thu Jun 30 19:02:35 2016 -0700 -- python/pyspark/sql/context.py | 2 +- .../apache/spark/sql/hive/test/TestHive.scala | 344 ++- 2 files changed, 185 insertions(+), 161 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/38f4d6f4/python/pyspark/sql/context.py -- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 8c984b3..4cfdf79 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -492,7 +492,7 @@ class HiveContext(SQLContext): confusing error messages. """ jsc = sparkContext._jsc.sc() -jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc) +jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc, False) return cls(sparkContext, jtestHive) def refreshTable(self, tableName): http://git-wip-us.apache.org/repos/asf/spark/blob/38f4d6f4/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index b45be02..7f89204 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -73,8 +73,12 @@ class TestHiveContext( @transient override val sparkSession: TestHiveSparkSession) extends SQLContext(sparkSession) { - def this(sc: SparkContext) { -this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc))) + /** + * If loadTestTables is false, no test tables are loaded. Note that this flag can only be true + * when running in the JVM, i.e. it needs to be false when calling from Python. + */ + def this(sc: SparkContext, loadTestTables: Boolean = true) { +this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc), loadTestTables)) } override def newSession(): TestHiveContext = { @@ -103,13 +107,24 @@ class TestHiveContext( } - +/** + * A [[SparkSession]] used in [[TestHiveContext]]. + * + * @param sc SparkContext + * @param warehousePath path to the Hive warehouse directory + * @param scratchDirPath scratch directory used by Hive's metastore client + * @param metastoreTemporaryConf configuration options for Hive's metastore + * @param existingSharedState optional [[TestHiveSharedState]] + * @param loadTestTables if true, load the test tables. They can only be loaded when running + * in the JVM, i.e when calling from Python this flag has to be false. + */ private[hive] class TestHiveSparkSession( @transient private val sc: SparkContext, val warehousePath: File, scratchDirPath: File, metastoreTemporaryConf: Map[String, String], -@transient private val existingSharedState: Option[TestHiveSharedState]) +@transient private val existingSharedState: Option[TestHiveSharedState], +private val loadTestTables: Boolean) extends SparkSession(sc) with Logging { self => // TODO: We need to set the temp warehouse path to sc's conf. @@ -118,13 +133,14 @@ private[hive] class TestHiveSparkSession( // when we creating metadataHive. This flow is not easy to follow and can introduce // confusion when a developer is debugging an issue. We need to refactor this part // to just set the temp warehouse path in sc's conf. - def this(sc: SparkContext) { + def
spark git commit: [SPARK-15643][DOC][ML] Add breaking changes to ML migration guide
Repository: spark Updated Branches: refs/heads/master dab105161 -> 4a981dc87 [SPARK-15643][DOC][ML] Add breaking changes to ML migration guide This PR adds the breaking changes from [SPARK-14810](https://issues.apache.org/jira/browse/SPARK-14810) to the migration guide. ## How was this patch tested? Built docs locally. Author: Nick PentreathCloses #13924 from MLnick/SPARK-15643-migration-guide. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a981dc8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a981dc8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a981dc8 Branch: refs/heads/master Commit: 4a981dc870a31d8b90aac5f6cb22884e02f6fbc6 Parents: dab1051 Author: Nick Pentreath Authored: Thu Jun 30 17:55:14 2016 -0700 Committer: Joseph K. Bradley Committed: Thu Jun 30 17:55:14 2016 -0700 -- docs/mllib-guide.md | 104 +-- 1 file changed, 101 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4a981dc8/docs/mllib-guide.md -- diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index c28d137..17fd3e1 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -104,9 +104,105 @@ and the migration guide below will explain all changes between releases. ## From 1.6 to 2.0 -The deprecations and changes of behavior in the `spark.mllib` or `spark.ml` packages include: +### Breaking changes -Deprecations: +There were several breaking changes in Spark 2.0, which are outlined below. + +**Linear algebra classes for DataFrame-based APIs** + +Spark's linear algebra dependencies were moved to a new project, `mllib-local` +(see [SPARK-13944](https://issues.apache.org/jira/browse/SPARK-13944)). +As part of this change, the linear algebra classes were copied to a new package, `spark.ml.linalg`. +The DataFrame-based APIs in `spark.ml` now depend on the `spark.ml.linalg` classes, +leading to a few breaking changes, predominantly in various model classes +(see [SPARK-14810](https://issues.apache.org/jira/browse/SPARK-14810) for a full list). + +**Note:** the RDD-based APIs in `spark.mllib` continue to depend on the previous package `spark.mllib.linalg`. + +_Converting vectors and matrices_ + +While most pipeline components support backward compatibility for loading, +some existing `DataFrames` and pipelines in Spark versions prior to 2.0, that contain vector or matrix +columns, may need to be migrated to the new `spark.ml` vector and matrix types. +Utilities for converting `DataFrame` columns from `spark.mllib.linalg` to `spark.ml.linalg` types +(and vice versa) can be found in `spark.mllib.util.MLUtils`. + +There are also utility methods available for converting single instances of +vectors and matrices. Use the `asML` method on a `mllib.linalg.Vector` / `mllib.linalg.Matrix` +for converting to `ml.linalg` types, and +`mllib.linalg.Vectors.fromML` / `mllib.linalg.Matrices.fromML` +for converting to `mllib.linalg` types. + + + + +{% highlight scala %} +import org.apache.spark.mllib.util.MLUtils + +// convert DataFrame columns +val convertedVecDF = MLUtils.convertVectorColumnsToML(vecDF) +val convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF) +// convert a single vector or matrix +val mlVec: org.apache.spark.ml.linalg.Vector = mllibVec.asML +val mlMat: org.apache.spark.ml.linalg.Matrix = mllibMat.asML +{% endhighlight %} + +Refer to the [`MLUtils` Scala docs](api/scala/index.html#org.apache.spark.mllib.util.MLUtils$) for further detail. + + + + +{% highlight java %} +import org.apache.spark.mllib.util.MLUtils; +import org.apache.spark.sql.Dataset; + +// convert DataFrame columns +Dataset convertedVecDF = MLUtils.convertVectorColumnsToML(vecDF); +Dataset convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF); +// convert a single vector or matrix +org.apache.spark.ml.linalg.Vector mlVec = mllibVec.asML(); +org.apache.spark.ml.linalg.Matrix mlMat = mllibMat.asML(); +{% endhighlight %} + +Refer to the [`MLUtils` Java docs](api/java/org/apache/spark/mllib/util/MLUtils.html) for further detail. + + + + +{% highlight python %} +from pyspark.mllib.util import MLUtils + +# convert DataFrame columns +convertedVecDF = MLUtils.convertVectorColumnsToML(vecDF) +convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF) +# convert a single vector or matrix +mlVec = mllibVec.asML() +mlMat = mllibMat.asML() +{% endhighlight %} + +Refer to the [`MLUtils` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.util.MLUtils) for further detail. + + + +**Deprecated methods removed** + +Several deprecated methods were removed in the `spark.mllib` and
spark git commit: [SPARK-15643][DOC][ML] Add breaking changes to ML migration guide
Repository: spark Updated Branches: refs/heads/branch-2.0 d3027c45f -> 79c96c999 [SPARK-15643][DOC][ML] Add breaking changes to ML migration guide This PR adds the breaking changes from [SPARK-14810](https://issues.apache.org/jira/browse/SPARK-14810) to the migration guide. ## How was this patch tested? Built docs locally. Author: Nick PentreathCloses #13924 from MLnick/SPARK-15643-migration-guide. (cherry picked from commit 4a981dc870a31d8b90aac5f6cb22884e02f6fbc6) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79c96c99 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79c96c99 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79c96c99 Branch: refs/heads/branch-2.0 Commit: 79c96c99977b0478c25b13583a3e88cbab541ba6 Parents: d3027c4 Author: Nick Pentreath Authored: Thu Jun 30 17:55:14 2016 -0700 Committer: Joseph K. Bradley Committed: Thu Jun 30 17:55:37 2016 -0700 -- docs/mllib-guide.md | 104 +-- 1 file changed, 101 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/79c96c99/docs/mllib-guide.md -- diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index c28d137..17fd3e1 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -104,9 +104,105 @@ and the migration guide below will explain all changes between releases. ## From 1.6 to 2.0 -The deprecations and changes of behavior in the `spark.mllib` or `spark.ml` packages include: +### Breaking changes -Deprecations: +There were several breaking changes in Spark 2.0, which are outlined below. + +**Linear algebra classes for DataFrame-based APIs** + +Spark's linear algebra dependencies were moved to a new project, `mllib-local` +(see [SPARK-13944](https://issues.apache.org/jira/browse/SPARK-13944)). +As part of this change, the linear algebra classes were copied to a new package, `spark.ml.linalg`. +The DataFrame-based APIs in `spark.ml` now depend on the `spark.ml.linalg` classes, +leading to a few breaking changes, predominantly in various model classes +(see [SPARK-14810](https://issues.apache.org/jira/browse/SPARK-14810) for a full list). + +**Note:** the RDD-based APIs in `spark.mllib` continue to depend on the previous package `spark.mllib.linalg`. + +_Converting vectors and matrices_ + +While most pipeline components support backward compatibility for loading, +some existing `DataFrames` and pipelines in Spark versions prior to 2.0, that contain vector or matrix +columns, may need to be migrated to the new `spark.ml` vector and matrix types. +Utilities for converting `DataFrame` columns from `spark.mllib.linalg` to `spark.ml.linalg` types +(and vice versa) can be found in `spark.mllib.util.MLUtils`. + +There are also utility methods available for converting single instances of +vectors and matrices. Use the `asML` method on a `mllib.linalg.Vector` / `mllib.linalg.Matrix` +for converting to `ml.linalg` types, and +`mllib.linalg.Vectors.fromML` / `mllib.linalg.Matrices.fromML` +for converting to `mllib.linalg` types. + + + + +{% highlight scala %} +import org.apache.spark.mllib.util.MLUtils + +// convert DataFrame columns +val convertedVecDF = MLUtils.convertVectorColumnsToML(vecDF) +val convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF) +// convert a single vector or matrix +val mlVec: org.apache.spark.ml.linalg.Vector = mllibVec.asML +val mlMat: org.apache.spark.ml.linalg.Matrix = mllibMat.asML +{% endhighlight %} + +Refer to the [`MLUtils` Scala docs](api/scala/index.html#org.apache.spark.mllib.util.MLUtils$) for further detail. + + + + +{% highlight java %} +import org.apache.spark.mllib.util.MLUtils; +import org.apache.spark.sql.Dataset; + +// convert DataFrame columns +Dataset convertedVecDF = MLUtils.convertVectorColumnsToML(vecDF); +Dataset convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF); +// convert a single vector or matrix +org.apache.spark.ml.linalg.Vector mlVec = mllibVec.asML(); +org.apache.spark.ml.linalg.Matrix mlMat = mllibMat.asML(); +{% endhighlight %} + +Refer to the [`MLUtils` Java docs](api/java/org/apache/spark/mllib/util/MLUtils.html) for further detail. + + + + +{% highlight python %} +from pyspark.mllib.util import MLUtils + +# convert DataFrame columns +convertedVecDF = MLUtils.convertVectorColumnsToML(vecDF) +convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF) +# convert a single vector or matrix +mlVec = mllibVec.asML() +mlMat = mllibMat.asML() +{% endhighlight %} + +Refer to the [`MLUtils` Python
spark git commit: [SPARK-16328][ML][MLLIB][PYSPARK] Add 'asML' and 'fromML' conversion methods to PySpark linalg
Repository: spark Updated Branches: refs/heads/branch-2.0 17c7522c8 -> d3027c45f [SPARK-16328][ML][MLLIB][PYSPARK] Add 'asML' and 'fromML' conversion methods to PySpark linalg The move to `ml.linalg` created `asML`/`fromML` utility methods in Scala/Java for converting between representations. These are missing in Python, this PR adds them. ## How was this patch tested? New doctests. Author: Nick PentreathCloses #13997 from MLnick/SPARK-16328-python-linalg-convert. (cherry picked from commit dab10516138867b7c4fc6d42168497e82853b539) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3027c45 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3027c45 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3027c45 Branch: refs/heads/branch-2.0 Commit: d3027c45fbe02752d260aefff9dae707ba5c5d4c Parents: 17c7522 Author: Nick Pentreath Authored: Thu Jun 30 17:52:15 2016 -0700 Committer: Joseph K. Bradley Committed: Thu Jun 30 17:52:29 2016 -0700 -- python/pyspark/mllib/linalg/__init__.py | 99 python/pyspark/mllib/tests.py | 69 +++ 2 files changed, 168 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d3027c45/python/pyspark/mllib/linalg/__init__.py -- diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py index 3a345b2..15dc53a 100644 --- a/python/pyspark/mllib/linalg/__init__.py +++ b/python/pyspark/mllib/linalg/__init__.py @@ -39,6 +39,7 @@ else: import numpy as np from pyspark import since +from pyspark.ml import linalg as newlinalg from pyspark.sql.types import UserDefinedType, StructField, StructType, ArrayType, DoubleType, \ IntegerType, ByteType, BooleanType @@ -247,6 +248,15 @@ class Vector(object): """ raise NotImplementedError +def asML(self): +""" +Convert this vector to the new mllib-local representation. +This does NOT copy the data; it copies references. + +:return: :py:class:`pyspark.ml.linalg.Vector` +""" +raise NotImplementedError + class DenseVector(Vector): """ @@ -408,6 +418,17 @@ class DenseVector(Vector): """ return self.array +def asML(self): +""" +Convert this vector to the new mllib-local representation. +This does NOT copy the data; it copies references. + +:return: :py:class:`pyspark.ml.linalg.DenseVector` + +.. versionadded:: 2.0.0 +""" +return newlinalg.DenseVector(self.array) + @property def values(self): """ @@ -737,6 +758,17 @@ class SparseVector(Vector): arr[self.indices] = self.values return arr +def asML(self): +""" +Convert this vector to the new mllib-local representation. +This does NOT copy the data; it copies references. + +:return: :py:class:`pyspark.ml.linalg.SparseVector` + +.. versionadded:: 2.0.0 +""" +return newlinalg.SparseVector(self.size, self.indices, self.values) + def __len__(self): return self.size @@ -846,6 +878,24 @@ class Vectors(object): return DenseVector(elements) @staticmethod +def fromML(vec): +""" +Convert a vector from the new mllib-local representation. +This does NOT copy the data; it copies references. + +:param vec: a :py:class:`pyspark.ml.linalg.Vector` +:return: a :py:class:`pyspark.mllib.linalg.Vector` + +.. versionadded:: 2.0.0 +""" +if isinstance(vec, newlinalg.DenseVector): +return DenseVector(vec.array) +elif isinstance(vec, newlinalg.SparseVector): +return SparseVector(vec.size, vec.indices, vec.values) +else: +raise TypeError("Unsupported vector type %s" % type(vec)) + +@staticmethod def stringify(vector): """ Converts a vector into a string, which can be recognized by @@ -945,6 +995,13 @@ class Matrix(object): """ raise NotImplementedError +def asML(self): +""" +Convert this matrix to the new mllib-local representation. +This does NOT copy the data; it copies references. +""" +raise NotImplementedError + @staticmethod def _convert_to_array(array_like, dtype): """ @@ -1044,6 +1101,17 @@ class DenseMatrix(Matrix): return SparseMatrix(self.numRows, self.numCols, colPtrs, rowIndices, values) +def asML(self): +""" +Convert this matrix to the new mllib-local
spark git commit: [SPARK-16328][ML][MLLIB][PYSPARK] Add 'asML' and 'fromML' conversion methods to PySpark linalg
Repository: spark Updated Branches: refs/heads/master 85f2303ec -> dab105161 [SPARK-16328][ML][MLLIB][PYSPARK] Add 'asML' and 'fromML' conversion methods to PySpark linalg The move to `ml.linalg` created `asML`/`fromML` utility methods in Scala/Java for converting between representations. These are missing in Python, this PR adds them. ## How was this patch tested? New doctests. Author: Nick PentreathCloses #13997 from MLnick/SPARK-16328-python-linalg-convert. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dab10516 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dab10516 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dab10516 Branch: refs/heads/master Commit: dab10516138867b7c4fc6d42168497e82853b539 Parents: 85f2303 Author: Nick Pentreath Authored: Thu Jun 30 17:52:15 2016 -0700 Committer: Joseph K. Bradley Committed: Thu Jun 30 17:52:15 2016 -0700 -- python/pyspark/mllib/linalg/__init__.py | 99 python/pyspark/mllib/tests.py | 69 +++ 2 files changed, 168 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dab10516/python/pyspark/mllib/linalg/__init__.py -- diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py index 3a345b2..15dc53a 100644 --- a/python/pyspark/mllib/linalg/__init__.py +++ b/python/pyspark/mllib/linalg/__init__.py @@ -39,6 +39,7 @@ else: import numpy as np from pyspark import since +from pyspark.ml import linalg as newlinalg from pyspark.sql.types import UserDefinedType, StructField, StructType, ArrayType, DoubleType, \ IntegerType, ByteType, BooleanType @@ -247,6 +248,15 @@ class Vector(object): """ raise NotImplementedError +def asML(self): +""" +Convert this vector to the new mllib-local representation. +This does NOT copy the data; it copies references. + +:return: :py:class:`pyspark.ml.linalg.Vector` +""" +raise NotImplementedError + class DenseVector(Vector): """ @@ -408,6 +418,17 @@ class DenseVector(Vector): """ return self.array +def asML(self): +""" +Convert this vector to the new mllib-local representation. +This does NOT copy the data; it copies references. + +:return: :py:class:`pyspark.ml.linalg.DenseVector` + +.. versionadded:: 2.0.0 +""" +return newlinalg.DenseVector(self.array) + @property def values(self): """ @@ -737,6 +758,17 @@ class SparseVector(Vector): arr[self.indices] = self.values return arr +def asML(self): +""" +Convert this vector to the new mllib-local representation. +This does NOT copy the data; it copies references. + +:return: :py:class:`pyspark.ml.linalg.SparseVector` + +.. versionadded:: 2.0.0 +""" +return newlinalg.SparseVector(self.size, self.indices, self.values) + def __len__(self): return self.size @@ -846,6 +878,24 @@ class Vectors(object): return DenseVector(elements) @staticmethod +def fromML(vec): +""" +Convert a vector from the new mllib-local representation. +This does NOT copy the data; it copies references. + +:param vec: a :py:class:`pyspark.ml.linalg.Vector` +:return: a :py:class:`pyspark.mllib.linalg.Vector` + +.. versionadded:: 2.0.0 +""" +if isinstance(vec, newlinalg.DenseVector): +return DenseVector(vec.array) +elif isinstance(vec, newlinalg.SparseVector): +return SparseVector(vec.size, vec.indices, vec.values) +else: +raise TypeError("Unsupported vector type %s" % type(vec)) + +@staticmethod def stringify(vector): """ Converts a vector into a string, which can be recognized by @@ -945,6 +995,13 @@ class Matrix(object): """ raise NotImplementedError +def asML(self): +""" +Convert this matrix to the new mllib-local representation. +This does NOT copy the data; it copies references. +""" +raise NotImplementedError + @staticmethod def _convert_to_array(array_like, dtype): """ @@ -1044,6 +1101,17 @@ class DenseMatrix(Matrix): return SparseMatrix(self.numRows, self.numCols, colPtrs, rowIndices, values) +def asML(self): +""" +Convert this matrix to the new mllib-local representation. +This does NOT copy the data; it copies references. + +:return: :py:class:`pyspark.ml.linalg.DenseMatrix` +
spark git commit: [SPARK-16276][SQL] Implement elt SQL function
Repository: spark Updated Branches: refs/heads/master 3d75a5b2a -> 85f2303ec [SPARK-16276][SQL] Implement elt SQL function ## What changes were proposed in this pull request? This patch implements the elt function, as it is implemented in Hive. ## How was this patch tested? Added expression unit test in StringExpressionsSuite and end-to-end test in StringFunctionsSuite. Author: petermaxleeCloses #13966 from petermaxlee/SPARK-16276. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85f2303e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85f2303e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85f2303e Branch: refs/heads/master Commit: 85f2303ecadd9bf6d9694a2743dda075654c5ccf Parents: 3d75a5b Author: petermaxlee Authored: Fri Jul 1 07:57:48 2016 +0800 Committer: Wenchen Fan Committed: Fri Jul 1 07:57:48 2016 +0800 -- .../catalyst/analysis/FunctionRegistry.scala| 1 + .../expressions/ExpectsInputTypes.scala | 3 +- .../expressions/stringExpressions.scala | 41 .../expressions/StringExpressionsSuite.scala| 23 +++ .../apache/spark/sql/StringFunctionsSuite.scala | 14 +++ .../spark/sql/hive/HiveSessionCatalog.scala | 2 +- 6 files changed, 82 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/85f2303e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 3fbdb2a..26b0c30 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -267,6 +267,7 @@ object FunctionRegistry { expression[Concat]("concat"), expression[ConcatWs]("concat_ws"), expression[Decode]("decode"), +expression[Elt]("elt"), expression[Encode]("encode"), expression[FindInSet]("find_in_set"), expression[FormatNumber]("format_number"), http://git-wip-us.apache.org/repos/asf/spark/blob/85f2303e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala index c15a2df..98f25a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala @@ -57,7 +57,8 @@ trait ExpectsInputTypes extends Expression { /** - * A mixin for the analyzer to perform implicit type casting using [[ImplicitTypeCasts]]. + * A mixin for the analyzer to perform implicit type casting using + * [[org.apache.spark.sql.catalyst.analysis.TypeCoercion.ImplicitTypeCasts]]. */ trait ImplicitCastInputTypes extends ExpectsInputTypes { // No other methods http://git-wip-us.apache.org/repos/asf/spark/blob/85f2303e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index 44ff7fd..b0df957 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -21,6 +21,7 @@ import java.text.{DecimalFormat, DecimalFormatSymbols} import java.util.{HashMap, Locale, Map => JMap} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.types._ @@ -162,6 +163,46 @@ case class ConcatWs(children: Seq[Expression]) } } +@ExpressionDescription( + usage = "_FUNC_(n, str1, str2, ...) - returns the n-th string, e.g. returns str2 when n is 2", + extended = "> SELECT _FUNC_(1, 'scala', 'java') FROM src LIMIT 1;\n" + "'scala'") +case class Elt(children: Seq[Expression]) + extends Expression with
spark git commit: [SPARK-16313][SQL] Spark should not silently drop exceptions in file listing
Repository: spark Updated Branches: refs/heads/branch-2.0 4dc7d377f -> 17c7522c8 [SPARK-16313][SQL] Spark should not silently drop exceptions in file listing ## What changes were proposed in this pull request? Spark silently drops exceptions during file listing. This is a very bad behavior because it can mask legitimate errors and the resulting plan will silently have 0 rows. This patch changes it to not silently drop the errors. ## How was this patch tested? Manually verified. Author: Reynold XinCloses #13987 from rxin/SPARK-16313. (cherry picked from commit 3d75a5b2a76eba0855d73476dc2fd579c612d521) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17c7522c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17c7522c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17c7522c Branch: refs/heads/branch-2.0 Commit: 17c7522c8cb8f400408cbdc3b8b1251bbca53eec Parents: 4dc7d37 Author: Reynold Xin Authored: Thu Jun 30 16:51:11 2016 -0700 Committer: Reynold Xin Committed: Thu Jun 30 16:51:15 2016 -0700 -- python/pyspark/sql/context.py | 2 +- python/pyspark/sql/streaming.py | 2 +- .../sql/execution/datasources/DataSource.scala | 3 ++- .../datasources/ListingFileCatalog.scala| 22 +++- .../datasources/fileSourceInterfaces.scala | 11 ++ .../sql/streaming/FileStreamSourceSuite.scala | 2 +- 6 files changed, 29 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/17c7522c/python/pyspark/sql/context.py -- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 3503fb9..8c984b3 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -440,7 +440,7 @@ class SQLContext(object): :return: :class:`DataStreamReader` ->>> text_sdf = sqlContext.readStream.text(os.path.join(tempfile.mkdtemp(), 'data')) +>>> text_sdf = sqlContext.readStream.text(tempfile.mkdtemp()) >>> text_sdf.isStreaming True """ http://git-wip-us.apache.org/repos/asf/spark/blob/17c7522c/python/pyspark/sql/streaming.py -- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 8cf7098..bffe398 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -437,7 +437,7 @@ class DataStreamReader(OptionUtils): :param paths: string, or list of strings, for input path(s). ->>> text_sdf = spark.readStream.text(os.path.join(tempfile.mkdtemp(), 'data')) +>>> text_sdf = spark.readStream.text(tempfile.mkdtemp()) >>> text_sdf.isStreaming True >>> "value" in str(text_sdf.schema) http://git-wip-us.apache.org/repos/asf/spark/blob/17c7522c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 557445c..a4110d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -364,7 +364,8 @@ case class DataSource( } val fileCatalog = - new ListingFileCatalog(sparkSession, globbedPaths, options, partitionSchema) + new ListingFileCatalog( +sparkSession, globbedPaths, options, partitionSchema, !checkPathExist) val dataSchema = userSpecifiedSchema.map { schema => val equality = http://git-wip-us.apache.org/repos/asf/spark/blob/17c7522c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index 675e755..706ec6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import java.io.FileNotFoundException + import scala.collection.mutable import scala.util.Try @@ -35,12
spark git commit: [SPARK-16313][SQL] Spark should not silently drop exceptions in file listing
Repository: spark Updated Branches: refs/heads/master fb41670c9 -> 3d75a5b2a [SPARK-16313][SQL] Spark should not silently drop exceptions in file listing ## What changes were proposed in this pull request? Spark silently drops exceptions during file listing. This is a very bad behavior because it can mask legitimate errors and the resulting plan will silently have 0 rows. This patch changes it to not silently drop the errors. ## How was this patch tested? Manually verified. Author: Reynold XinCloses #13987 from rxin/SPARK-16313. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d75a5b2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d75a5b2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d75a5b2 Branch: refs/heads/master Commit: 3d75a5b2a76eba0855d73476dc2fd579c612d521 Parents: fb41670 Author: Reynold Xin Authored: Thu Jun 30 16:51:11 2016 -0700 Committer: Reynold Xin Committed: Thu Jun 30 16:51:11 2016 -0700 -- python/pyspark/sql/context.py | 2 +- python/pyspark/sql/streaming.py | 2 +- .../sql/execution/datasources/DataSource.scala | 3 ++- .../datasources/ListingFileCatalog.scala| 22 +++- .../datasources/fileSourceInterfaces.scala | 11 ++ .../sql/streaming/FileStreamSourceSuite.scala | 2 +- 6 files changed, 29 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3d75a5b2/python/pyspark/sql/context.py -- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 3503fb9..8c984b3 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -440,7 +440,7 @@ class SQLContext(object): :return: :class:`DataStreamReader` ->>> text_sdf = sqlContext.readStream.text(os.path.join(tempfile.mkdtemp(), 'data')) +>>> text_sdf = sqlContext.readStream.text(tempfile.mkdtemp()) >>> text_sdf.isStreaming True """ http://git-wip-us.apache.org/repos/asf/spark/blob/3d75a5b2/python/pyspark/sql/streaming.py -- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 8cf7098..bffe398 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -437,7 +437,7 @@ class DataStreamReader(OptionUtils): :param paths: string, or list of strings, for input path(s). ->>> text_sdf = spark.readStream.text(os.path.join(tempfile.mkdtemp(), 'data')) +>>> text_sdf = spark.readStream.text(tempfile.mkdtemp()) >>> text_sdf.isStreaming True >>> "value" in str(text_sdf.schema) http://git-wip-us.apache.org/repos/asf/spark/blob/3d75a5b2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 557445c..a4110d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -364,7 +364,8 @@ case class DataSource( } val fileCatalog = - new ListingFileCatalog(sparkSession, globbedPaths, options, partitionSchema) + new ListingFileCatalog( +sparkSession, globbedPaths, options, partitionSchema, !checkPathExist) val dataSchema = userSpecifiedSchema.map { schema => val equality = http://git-wip-us.apache.org/repos/asf/spark/blob/3d75a5b2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index 675e755..706ec6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import java.io.FileNotFoundException + import scala.collection.mutable import scala.util.Try @@ -35,12 +37,16 @@ import org.apache.spark.sql.types.StructType * @param paths a list of paths to scan * @param partitionSchema an
spark git commit: [SPARK-16336][SQL] Suggest doing table refresh upon FileNotFoundException
Repository: spark Updated Branches: refs/heads/branch-2.0 03008e049 -> 4dc7d377f [SPARK-16336][SQL] Suggest doing table refresh upon FileNotFoundException ## What changes were proposed in this pull request? This patch appends a message to suggest users running refresh table or reloading data frames when Spark sees a FileNotFoundException due to stale, cached metadata. ## How was this patch tested? Added a unit test for this in MetadataCacheSuite. Author: petermaxleeCloses #14003 from petermaxlee/SPARK-16336. (cherry picked from commit fb41670c9263a89ec233861cc91a19cf1bb19073) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4dc7d377 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4dc7d377 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4dc7d377 Branch: refs/heads/branch-2.0 Commit: 4dc7d377fba39147d8820a5a2866a2fbcb73db98 Parents: 03008e0 Author: petermaxlee Authored: Thu Jun 30 16:49:59 2016 -0700 Committer: Reynold Xin Committed: Thu Jun 30 16:50:06 2016 -0700 -- .../sql/execution/datasources/FileScanRDD.scala | 15 +++- .../apache/spark/sql/MetadataCacheSuite.scala | 88 2 files changed, 102 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4dc7d377/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index f7f68b1..1314c94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -111,7 +111,20 @@ class FileScanRDD( currentFile = files.next() logInfo(s"Reading File $currentFile") InputFileNameHolder.setInputFileName(currentFile.filePath) - currentIterator = readFunction(currentFile) + + try { +currentIterator = readFunction(currentFile) + } catch { +case e: java.io.FileNotFoundException => + throw new java.io.FileNotFoundException( +e.getMessage + "\n" + + "It is possible the underlying files have been updated. " + + "You can explicitly invalidate the cache in Spark by " + + "running 'REFRESH TABLE tableName' command in SQL or " + + "by recreating the Dataset/DataFrame involved." + ) + } + hasNext } else { currentFile = null http://git-wip-us.apache.org/repos/asf/spark/blob/4dc7d377/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala new file mode 100644 index 000..d872f4b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import org.apache.spark.SparkException +import org.apache.spark.sql.test.SharedSQLContext + +/** + * Test suite to handle metadata cache related. + */ +class MetadataCacheSuite extends QueryTest with SharedSQLContext { + + /** Removes one data file in the given directory. */ + private def deleteOneFileInDirectory(dir: File): Unit = { +assert(dir.isDirectory) +val oneFile = dir.listFiles().find { file => + !file.getName.startsWith("_") && !file.getName.startsWith(".") +} +assert(oneFile.isDefined) +oneFile.foreach(_.delete()) + } + +
spark git commit: [SPARK-16336][SQL] Suggest doing table refresh upon FileNotFoundException
Repository: spark Updated Branches: refs/heads/master 5d00a7bc1 -> fb41670c9 [SPARK-16336][SQL] Suggest doing table refresh upon FileNotFoundException ## What changes were proposed in this pull request? This patch appends a message to suggest users running refresh table or reloading data frames when Spark sees a FileNotFoundException due to stale, cached metadata. ## How was this patch tested? Added a unit test for this in MetadataCacheSuite. Author: petermaxleeCloses #14003 from petermaxlee/SPARK-16336. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb41670c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb41670c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb41670c Branch: refs/heads/master Commit: fb41670c9263a89ec233861cc91a19cf1bb19073 Parents: 5d00a7b Author: petermaxlee Authored: Thu Jun 30 16:49:59 2016 -0700 Committer: Reynold Xin Committed: Thu Jun 30 16:49:59 2016 -0700 -- .../sql/execution/datasources/FileScanRDD.scala | 15 +++- .../apache/spark/sql/MetadataCacheSuite.scala | 88 2 files changed, 102 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fb41670c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 1443057..c66da3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -117,7 +117,20 @@ class FileScanRDD( currentFile = files.next() logInfo(s"Reading File $currentFile") InputFileNameHolder.setInputFileName(currentFile.filePath) - currentIterator = readFunction(currentFile) + + try { +currentIterator = readFunction(currentFile) + } catch { +case e: java.io.FileNotFoundException => + throw new java.io.FileNotFoundException( +e.getMessage + "\n" + + "It is possible the underlying files have been updated. " + + "You can explicitly invalidate the cache in Spark by " + + "running 'REFRESH TABLE tableName' command in SQL or " + + "by recreating the Dataset/DataFrame involved." + ) + } + hasNext } else { currentFile = null http://git-wip-us.apache.org/repos/asf/spark/blob/fb41670c/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala new file mode 100644 index 000..d872f4b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import org.apache.spark.SparkException +import org.apache.spark.sql.test.SharedSQLContext + +/** + * Test suite to handle metadata cache related. + */ +class MetadataCacheSuite extends QueryTest with SharedSQLContext { + + /** Removes one data file in the given directory. */ + private def deleteOneFileInDirectory(dir: File): Unit = { +assert(dir.isDirectory) +val oneFile = dir.listFiles().find { file => + !file.getName.startsWith("_") && !file.getName.startsWith(".") +} +assert(oneFile.isDefined) +oneFile.foreach(_.delete()) + } + + test("SPARK-16336 Suggest doing table refresh when encountering FileNotFoundException") { +withTempPath { (location: File) => +
spark git commit: [SPARK-16256][DOCS] Fix window operation diagram
Repository: spark Updated Branches: refs/heads/branch-2.0 f17ffef38 -> 03008e049 [SPARK-16256][DOCS] Fix window operation diagram Author: Tathagata DasCloses #14001 from tdas/SPARK-16256-2. (cherry picked from commit 5d00a7bc19ddeb1b5247733b55095a03ee7b1a30) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03008e04 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03008e04 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03008e04 Branch: refs/heads/branch-2.0 Commit: 03008e049a366bc7a63b3915b42ee50320ac6f34 Parents: f17ffef Author: Tathagata Das Authored: Thu Jun 30 14:01:34 2016 -0700 Committer: Tathagata Das Committed: Thu Jun 30 14:01:56 2016 -0700 -- docs/img/structured-streaming-late-data.png| Bin 138931 -> 138226 bytes docs/img/structured-streaming-window.png | Bin 128930 -> 132875 bytes docs/img/structured-streaming.pptx | Bin 1105315 -> 1105413 bytes docs/structured-streaming-programming-guide.md | 2 +- 4 files changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/03008e04/docs/img/structured-streaming-late-data.png -- diff --git a/docs/img/structured-streaming-late-data.png b/docs/img/structured-streaming-late-data.png index 5276b47..2283f67 100644 Binary files a/docs/img/structured-streaming-late-data.png and b/docs/img/structured-streaming-late-data.png differ http://git-wip-us.apache.org/repos/asf/spark/blob/03008e04/docs/img/structured-streaming-window.png -- diff --git a/docs/img/structured-streaming-window.png b/docs/img/structured-streaming-window.png index be9d3fb..c1842b1 100644 Binary files a/docs/img/structured-streaming-window.png and b/docs/img/structured-streaming-window.png differ http://git-wip-us.apache.org/repos/asf/spark/blob/03008e04/docs/img/structured-streaming.pptx -- diff --git a/docs/img/structured-streaming.pptx b/docs/img/structured-streaming.pptx index c278323..6aad2ed 100644 Binary files a/docs/img/structured-streaming.pptx and b/docs/img/structured-streaming.pptx differ http://git-wip-us.apache.org/repos/asf/spark/blob/03008e04/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 5932566..7949396 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -620,7 +620,7 @@ df.groupBy("type").count() ### Window Operations on Event Time Aggregations over a sliding event-time window are straightforward with Structured Streaming. The key idea to understand about window-based aggregations are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let's understand this with an illustration. -Imagine the quick example is modified and the stream contains lines along with the time when the line was generated. Instead of running word counts, we want to count words within 10 minute windows, updating every 5 minutes. That is, word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived after 12:00 but before 12:10. Now, consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 - 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping key (i.e. the word) and the window (can be calculated from the event-time). +Imagine our quick example is modified and the stream now contains lines along with the time when the line was generated. Instead of running word counts, we want to count words within 10 minute windows, updating every 5 minutes. That is, word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived after 12:00 but before 12:10. Now, consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 - 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping key (i.e. the word) and the window (can be
spark git commit: [SPARK-16256][DOCS] Fix window operation diagram
Repository: spark Updated Branches: refs/heads/master c62263340 -> 5d00a7bc1 [SPARK-16256][DOCS] Fix window operation diagram Author: Tathagata DasCloses #14001 from tdas/SPARK-16256-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d00a7bc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d00a7bc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d00a7bc Branch: refs/heads/master Commit: 5d00a7bc19ddeb1b5247733b55095a03ee7b1a30 Parents: c622633 Author: Tathagata Das Authored: Thu Jun 30 14:01:34 2016 -0700 Committer: Tathagata Das Committed: Thu Jun 30 14:01:34 2016 -0700 -- docs/img/structured-streaming-late-data.png| Bin 138931 -> 138226 bytes docs/img/structured-streaming-window.png | Bin 128930 -> 132875 bytes docs/img/structured-streaming.pptx | Bin 1105315 -> 1105413 bytes docs/structured-streaming-programming-guide.md | 2 +- 4 files changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5d00a7bc/docs/img/structured-streaming-late-data.png -- diff --git a/docs/img/structured-streaming-late-data.png b/docs/img/structured-streaming-late-data.png index 5276b47..2283f67 100644 Binary files a/docs/img/structured-streaming-late-data.png and b/docs/img/structured-streaming-late-data.png differ http://git-wip-us.apache.org/repos/asf/spark/blob/5d00a7bc/docs/img/structured-streaming-window.png -- diff --git a/docs/img/structured-streaming-window.png b/docs/img/structured-streaming-window.png index be9d3fb..c1842b1 100644 Binary files a/docs/img/structured-streaming-window.png and b/docs/img/structured-streaming-window.png differ http://git-wip-us.apache.org/repos/asf/spark/blob/5d00a7bc/docs/img/structured-streaming.pptx -- diff --git a/docs/img/structured-streaming.pptx b/docs/img/structured-streaming.pptx index c278323..6aad2ed 100644 Binary files a/docs/img/structured-streaming.pptx and b/docs/img/structured-streaming.pptx differ http://git-wip-us.apache.org/repos/asf/spark/blob/5d00a7bc/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 5932566..7949396 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -620,7 +620,7 @@ df.groupBy("type").count() ### Window Operations on Event Time Aggregations over a sliding event-time window are straightforward with Structured Streaming. The key idea to understand about window-based aggregations are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let's understand this with an illustration. -Imagine the quick example is modified and the stream contains lines along with the time when the line was generated. Instead of running word counts, we want to count words within 10 minute windows, updating every 5 minutes. That is, word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived after 12:00 but before 12:10. Now, consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 - 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping key (i.e. the word) and the window (can be calculated from the event-time). +Imagine our quick example is modified and the stream now contains lines along with the time when the line was generated. Instead of running word counts, we want to count words within 10 minute windows, updating every 5 minutes. That is, word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived after 12:00 but before 12:10. Now, consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 - 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping key (i.e. the word) and the window (can be calculated from the event-time). The result tables would look something like the following.
spark git commit: [SPARK-16212][STREAMING][KAFKA] code cleanup from review feedback
Repository: spark Updated Branches: refs/heads/master 46395db80 -> c62263340 [SPARK-16212][STREAMING][KAFKA] code cleanup from review feedback ## What changes were proposed in this pull request? code cleanup in kafka-0-8 to match suggested changes for kafka-0-10 branch ## How was this patch tested? unit tests Author: cody koeningerCloses #13908 from koeninger/kafka-0-8-cleanup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6226334 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6226334 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6226334 Branch: refs/heads/master Commit: c62263340edb6976a10f274e716fde6cd2c5bf34 Parents: 46395db Author: cody koeninger Authored: Thu Jun 30 13:16:58 2016 -0700 Committer: Tathagata Das Committed: Thu Jun 30 13:16:58 2016 -0700 -- .../spark/streaming/kafka/DirectKafkaInputDStream.scala | 12 ++-- .../org/apache/spark/streaming/kafka/KafkaRDD.scala | 9 ++--- .../streaming/kafka/JavaDirectKafkaStreamSuite.java | 5 - 3 files changed, 12 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c6226334/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala -- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index fb58ed7..c3c7993 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -34,7 +34,7 @@ import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} import org.apache.spark.streaming.scheduler.rate.RateEstimator /** - * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * A stream of [[KafkaRDD]] where * each given Kafka topic/partition corresponds to an RDD partition. * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number * of messages @@ -43,7 +43,7 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator * and this DStream is not responsible for committing offsets, * so that you can control exactly-once semantics. * For an easy interface to Kafka-managed offsets, - * see {@link org.apache.spark.streaming.kafka.KafkaCluster} + * see [[KafkaCluster]] * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;> * configuration parameters. * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), @@ -132,7 +132,7 @@ class DirectKafkaInputDStream[ if (retries <= 0) { throw new SparkException(err) } else { -log.error(err) +logError(err) Thread.sleep(kc.config.refreshLeaderBackoffMs) latestLeaderOffsets(retries - 1) } @@ -194,7 +194,7 @@ class DirectKafkaInputDStream[ data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]] } -override def update(time: Time) { +override def update(time: Time): Unit = { batchForTime.clear() generatedRDDs.foreach { kv => val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray @@ -202,9 +202,9 @@ class DirectKafkaInputDStream[ } } -override def cleanup(time: Time) { } +override def cleanup(time: Time): Unit = { } -override def restore() { +override def restore(): Unit = { // this is assuming that the topics don't change during execution, which is true currently val topics = fromOffsets.keySet val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics)) http://git-wip-us.apache.org/repos/asf/spark/blob/c6226334/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala -- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala index d4881b1..2b92577 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala @@ -129,7 +129,7 @@ class KafkaRDD[ val part = thePart.asInstanceOf[KafkaRDDPartition] assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) if (part.fromOffset == part.untilOffset) { -
spark git commit: [SPARK-16289][SQL] Implement posexplode table generating function
Repository: spark Updated Branches: refs/heads/master fdf9f94f8 -> 46395db80 [SPARK-16289][SQL] Implement posexplode table generating function ## What changes were proposed in this pull request? This PR implements `posexplode` table generating function. Currently, master branch raises the following exception for `map` argument. It's different from Hive. **Before** ```scala scala> sql("select posexplode(map('a', 1, 'b', 2))").show org.apache.spark.sql.AnalysisException: No handler for Hive UDF ... posexplode() takes an array as a parameter; line 1 pos 7 ``` **After** ```scala scala> sql("select posexplode(map('a', 1, 'b', 2))").show +---+---+-+ |pos|key|value| +---+---+-+ | 0| a|1| | 1| b|2| +---+---+-+ ``` For `array` argument, `after` is the same with `before`. ``` scala> sql("select posexplode(array(1, 2, 3))").show +---+---+ |pos|col| +---+---+ | 0| 1| | 1| 2| | 2| 3| +---+---+ ``` ## How was this patch tested? Pass the Jenkins tests with newly added testcases. Author: Dongjoon HyunCloses #13971 from dongjoon-hyun/SPARK-16289. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46395db8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46395db8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46395db8 Branch: refs/heads/master Commit: 46395db80e3304e3f3a1ebdc8aadb8f2819b48b4 Parents: fdf9f94 Author: Dongjoon Hyun Authored: Thu Jun 30 12:03:54 2016 -0700 Committer: Reynold Xin Committed: Thu Jun 30 12:03:54 2016 -0700 -- R/pkg/NAMESPACE | 1 + R/pkg/R/functions.R | 17 R/pkg/R/generics.R | 4 + R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 +- python/pyspark/sql/functions.py | 21 + .../catalyst/analysis/FunctionRegistry.scala| 1 + .../sql/catalyst/expressions/generators.scala | 66 +++--- .../analysis/ExpressionTypeCheckingSuite.scala | 2 + .../expressions/GeneratorExpressionSuite.scala | 71 +++ .../scala/org/apache/spark/sql/Column.scala | 1 + .../scala/org/apache/spark/sql/functions.scala | 8 ++ .../spark/sql/ColumnExpressionSuite.scala | 60 - .../spark/sql/GeneratorFunctionSuite.scala | 92 .../spark/sql/hive/HiveSessionCatalog.scala | 2 +- 14 files changed, 276 insertions(+), 72 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/46395db8/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index e0ffde9..abc6588 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -234,6 +234,7 @@ exportMethods("%in%", "over", "percent_rank", "pmod", + "posexplode", "quarter", "rand", "randn", http://git-wip-us.apache.org/repos/asf/spark/blob/46395db8/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 09e5afa..52d46f9 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2934,3 +2934,20 @@ setMethod("sort_array", jc <- callJStatic("org.apache.spark.sql.functions", "sort_array", x@jc, asc) column(jc) }) + +#' posexplode +#' +#' Creates a new row for each element with position in the given array or map column. +#' +#' @rdname posexplode +#' @name posexplode +#' @family collection_funcs +#' @export +#' @examples \dontrun{posexplode(df$c)} +#' @note posexplode since 2.1.0 +setMethod("posexplode", + signature(x = "Column"), + function(x) { +jc <- callJStatic("org.apache.spark.sql.functions", "posexplode", x@jc) +column(jc) + }) http://git-wip-us.apache.org/repos/asf/spark/blob/46395db8/R/pkg/R/generics.R -- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 0e4350f..d9080b6 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1050,6 +1050,10 @@ setGeneric("percent_rank", function(x) { standardGeneric("percent_rank") }) #' @export setGeneric("pmod", function(y, x) { standardGeneric("pmod") }) +#' @rdname posexplode +#' @export +setGeneric("posexplode", function(x) { standardGeneric("posexplode") }) + #' @rdname quarter #' @export setGeneric("quarter", function(x) { standardGeneric("quarter") }) http://git-wip-us.apache.org/repos/asf/spark/blob/46395db8/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git
spark git commit: [SPARK-15865][CORE] Blacklist should not result in job hanging with less than 4 executors
Repository: spark Updated Branches: refs/heads/master 07f46afc7 -> fdf9f94f8 [SPARK-15865][CORE] Blacklist should not result in job hanging with less than 4 executors ## What changes were proposed in this pull request? Before this change, when you turn on blacklisting with `spark.scheduler.executorTaskBlacklistTime`, but you have fewer than `spark.task.maxFailures` executors, you can end with a job "hung" after some task failures. Whenever a taskset is unable to schedule anything on resourceOfferSingleTaskSet, we check whether the last pending task can be scheduled on *any* known executor. If not, the taskset (and any corresponding jobs) are failed. * Worst case, this is O(maxTaskFailures + numTasks). But unless many executors are bad, this should be small * This does not fail as fast as possible -- when a task becomes unschedulable, we keep scheduling other tasks. This is to avoid an O(numPendingTasks * numExecutors) operation * Also, it is conceivable this fails too quickly. You may be 1 millisecond away from unblacklisting a place for a task to run, or acquiring a new executor. ## How was this patch tested? Added unit test which failed before the change, ran new test 5k times manually, ran all scheduler tests manually, and the full suite via jenkins. Author: Imran RashidCloses #13603 from squito/progress_w_few_execs_and_blacklist. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdf9f94f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdf9f94f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdf9f94f Branch: refs/heads/master Commit: fdf9f94f8c8861a00cd8415073f842b857c397f7 Parents: 07f46af Author: Imran Rashid Authored: Thu Jun 30 13:36:06 2016 -0500 Committer: Imran Rashid Committed: Thu Jun 30 13:36:06 2016 -0500 -- .../org/apache/spark/scheduler/TaskInfo.scala | 4 + .../spark/scheduler/TaskSchedulerImpl.scala | 3 + .../apache/spark/scheduler/TaskSetManager.scala | 54 +++- .../apache/spark/executor/ExecutorSuite.scala | 2 +- .../scheduler/BlacklistIntegrationSuite.scala | 33 ++- .../org/apache/spark/scheduler/FakeTask.scala | 5 +- .../org/apache/spark/scheduler/PoolSuite.scala | 2 +- .../scheduler/SchedulerIntegrationSuite.scala | 14 +-- .../scheduler/TaskSchedulerImplSuite.scala | 92 9 files changed, 194 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fdf9f94f/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 2d89232..eeb7963 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -30,6 +30,10 @@ import org.apache.spark.annotation.DeveloperApi @DeveloperApi class TaskInfo( val taskId: Long, +/** + * The index of this task within its task set. Not necessarily the same as the ID of the RDD + * partition that the task is computing. + */ val index: Int, val attemptNumber: Int, val launchTime: Long, http://git-wip-us.apache.org/repos/asf/spark/blob/fdf9f94f/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 821e3ee..2ce49ca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -279,6 +279,9 @@ private[spark] class TaskSchedulerImpl( } } } +if (!launchedTask) { + taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys) +} return launchedTask } http://git-wip-us.apache.org/repos/asf/spark/blob/fdf9f94f/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 2eedd20..2fef447 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -83,7 +83,7 @@ private[spark] class TaskSetManager( val copiesRunning = new Array[Int](numTasks) val successful = new Array[Boolean](numTasks) private val
spark git commit: [SPARK-13850] Force the sorter to Spill when number of elements in th…
Repository: spark Updated Branches: refs/heads/branch-2.0 98056a1f8 -> f17ffef38 [SPARK-13850] Force the sorter to Spill when number of elements in th⦠Force the sorter to Spill when number of elements in the pointer array reach a certain size. This is to workaround the issue of timSort failing on large buffer size. Tested by running a job which was failing without this change due to TimSort bug. Author: Sital KediaCloses #13107 from sitalkedia/fix_TimSort. (cherry picked from commit 07f46afc733b1718d528a6ea5c0d774f047024fa) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f17ffef3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f17ffef3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f17ffef3 Branch: refs/heads/branch-2.0 Commit: f17ffef38b4749b6b801c198ec207434a4db0c38 Parents: 98056a1 Author: Sital Kedia Authored: Thu Jun 30 10:53:18 2016 -0700 Committer: Davies Liu Committed: Thu Jun 30 10:54:37 2016 -0700 -- .../shuffle/sort/ShuffleExternalSorter.java | 10 ++--- .../unsafe/sort/UnsafeExternalSorter.java | 23 +--- .../unsafe/sort/UnsafeExternalSorterSuite.java | 3 +++ .../sql/execution/UnsafeExternalRowSorter.java | 2 ++ .../UnsafeFixedWidthAggregationMap.java | 3 +++ .../sql/execution/UnsafeKVExternalSorter.java | 8 +-- .../apache/spark/sql/execution/WindowExec.scala | 2 ++ .../execution/datasources/WriterContainer.scala | 5 - .../execution/joins/CartesianProductExec.scala | 2 ++ .../execution/streaming/FileStreamSink.scala| 5 - .../execution/UnsafeKVExternalSorterSuite.scala | 4 +++- .../spark/sql/hive/hiveWriterContainers.scala | 5 - 12 files changed, 60 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f17ffef3/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java -- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index 014aef8..696ee73 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -72,7 +72,10 @@ final class ShuffleExternalSorter extends MemoryConsumer { private final TaskContext taskContext; private final ShuffleWriteMetrics writeMetrics; - /** Force this sorter to spill when there are this many elements in memory. For testing only */ + /** + * Force this sorter to spill when there are this many elements in memory. The default value is + * 1024 * 1024 * 1024, which allows the maximum size of the pointer array to be 8G. + */ private final long numElementsForSpillThreshold; /** The buffer size to use when writing spills using DiskBlockObjectWriter */ @@ -114,7 +117,7 @@ final class ShuffleExternalSorter extends MemoryConsumer { // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; this.numElementsForSpillThreshold = - conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MAX_VALUE); + conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 * 1024 * 1024); this.writeMetrics = writeMetrics; this.inMemSorter = new ShuffleInMemorySorter( this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true)); @@ -372,7 +375,8 @@ final class ShuffleExternalSorter extends MemoryConsumer { // for tests assert(inMemSorter != null); -if (inMemSorter.numRecords() > numElementsForSpillThreshold) { +if (inMemSorter.numRecords() >= numElementsForSpillThreshold) { + logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold); spill(); } http://git-wip-us.apache.org/repos/asf/spark/blob/f17ffef3/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java -- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index e14a23f..8a980d4 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -27,6 +27,7
spark git commit: [SPARK-13850] Force the sorter to Spill when number of elements in th…
Repository: spark Updated Branches: refs/heads/master 5344bade8 -> 07f46afc7 [SPARK-13850] Force the sorter to Spill when number of elements in th⦠## What changes were proposed in this pull request? Force the sorter to Spill when number of elements in the pointer array reach a certain size. This is to workaround the issue of timSort failing on large buffer size. ## How was this patch tested? Tested by running a job which was failing without this change due to TimSort bug. Author: Sital KediaCloses #13107 from sitalkedia/fix_TimSort. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07f46afc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07f46afc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07f46afc Branch: refs/heads/master Commit: 07f46afc733b1718d528a6ea5c0d774f047024fa Parents: 5344bad Author: Sital Kedia Authored: Thu Jun 30 10:53:18 2016 -0700 Committer: Davies Liu Committed: Thu Jun 30 10:53:18 2016 -0700 -- .../shuffle/sort/ShuffleExternalSorter.java | 10 ++--- .../unsafe/sort/UnsafeExternalSorter.java | 23 +--- .../unsafe/sort/UnsafeExternalSorterSuite.java | 3 +++ .../sql/execution/UnsafeExternalRowSorter.java | 2 ++ .../UnsafeFixedWidthAggregationMap.java | 3 +++ .../sql/execution/UnsafeKVExternalSorter.java | 8 +-- .../apache/spark/sql/execution/WindowExec.scala | 2 ++ .../execution/datasources/WriterContainer.scala | 5 - .../execution/joins/CartesianProductExec.scala | 2 ++ .../execution/streaming/FileStreamSink.scala| 5 - .../execution/UnsafeKVExternalSorterSuite.scala | 4 +++- .../spark/sql/hive/hiveWriterContainers.scala | 5 - 12 files changed, 60 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java -- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index 014aef8..696ee73 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -72,7 +72,10 @@ final class ShuffleExternalSorter extends MemoryConsumer { private final TaskContext taskContext; private final ShuffleWriteMetrics writeMetrics; - /** Force this sorter to spill when there are this many elements in memory. For testing only */ + /** + * Force this sorter to spill when there are this many elements in memory. The default value is + * 1024 * 1024 * 1024, which allows the maximum size of the pointer array to be 8G. + */ private final long numElementsForSpillThreshold; /** The buffer size to use when writing spills using DiskBlockObjectWriter */ @@ -114,7 +117,7 @@ final class ShuffleExternalSorter extends MemoryConsumer { // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; this.numElementsForSpillThreshold = - conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MAX_VALUE); + conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 * 1024 * 1024); this.writeMetrics = writeMetrics; this.inMemSorter = new ShuffleInMemorySorter( this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true)); @@ -372,7 +375,8 @@ final class ShuffleExternalSorter extends MemoryConsumer { // for tests assert(inMemSorter != null); -if (inMemSorter.numRecords() > numElementsForSpillThreshold) { +if (inMemSorter.numRecords() >= numElementsForSpillThreshold) { + logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold); spill(); } http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java -- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index ec15f0b..d6a255e 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -27,6 +27,7 @@ import
spark git commit: [SPARK-15820][PYSPARK][SQL] Add Catalog.refreshTable into python API
Repository: spark Updated Branches: refs/heads/master 5320adc86 -> 5344bade8 [SPARK-15820][PYSPARK][SQL] Add Catalog.refreshTable into python API ## What changes were proposed in this pull request? Add Catalog.refreshTable API into python interface for Spark-SQL. ## How was this patch tested? Existing test. Author: WeichenXuCloses #13558 from WeichenXu123/update_python_sql_interface_refreshTable. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5344bade Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5344bade Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5344bade Branch: refs/heads/master Commit: 5344bade8efb6f12aa43fbfbbbc2e3c0c7d16d98 Parents: 5320adc Author: WeichenXu Authored: Thu Jun 30 23:00:39 2016 +0800 Committer: Cheng Lian Committed: Thu Jun 30 23:00:39 2016 +0800 -- python/pyspark/sql/catalog.py | 5 + .../src/main/scala/org/apache/spark/sql/catalog/Catalog.scala | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5344bade/python/pyspark/sql/catalog.py -- diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 3033f14..4af930a 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -232,6 +232,11 @@ class Catalog(object): """Removes all cached tables from the in-memory cache.""" self._jcatalog.clearCache() +@since(2.0) +def refreshTable(self, tableName): +"""Invalidate and refresh all the cached metadata of the given table.""" +self._jcatalog.refreshTable(tableName) + def _reset(self): """(Internal use only) Drop all existing databases (except "default"), tables, partitions and functions, and set the current database to "default". http://git-wip-us.apache.org/repos/asf/spark/blob/5344bade/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 083a63c..91ed9b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -214,7 +214,7 @@ abstract class Catalog { def clearCache(): Unit /** - * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, + * Invalidate and refresh all the cached metadata of the given table. For performance reasons, * Spark SQL or the external data source library it uses might cache certain metadata about a * table, such as the location of blocks. When those change outside of Spark SQL, users should * call this function to invalidate the cache. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [BUILD] Fix version in poms related to kafka-0-10
Repository: spark Updated Branches: refs/heads/branch-2.0 56207fc3b -> 98056a1f8 [BUILD] Fix version in poms related to kafka-0-10 self explanatory Author: Tathagata DasCloses #13994 from tdas/SPARK-12177-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98056a1f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98056a1f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98056a1f Branch: refs/heads/branch-2.0 Commit: 98056a1f8683385599f194a4b963769e3342bff3 Parents: 56207fc Author: Tathagata Das Authored: Thu Jun 30 22:10:56 2016 +0800 Committer: Cheng Lian Committed: Thu Jun 30 22:10:56 2016 +0800 -- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/98056a1f/external/kafka-0-10-assembly/pom.xml -- diff --git a/external/kafka-0-10-assembly/pom.xml b/external/kafka-0-10-assembly/pom.xml index f2468d1..59f41f1 100644 --- a/external/kafka-0-10-assembly/pom.xml +++ b/external/kafka-0-10-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.0.0-SNAPSHOT +2.0.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/98056a1f/external/kafka-0-10/pom.xml -- diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml index 50395f6..2696561 100644 --- a/external/kafka-0-10/pom.xml +++ b/external/kafka-0-10/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.0.0-SNAPSHOT +2.0.1-SNAPSHOT ../../pom.xml - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16071][SQL] Checks size limit when doubling the array size in BufferHolder
Repository: spark Updated Branches: refs/heads/branch-2.0 6a4f4c1d7 -> 56207fc3b [SPARK-16071][SQL] Checks size limit when doubling the array size in BufferHolder ## What changes were proposed in this pull request? This PR Checks the size limit when doubling the array size in BufferHolder to avoid integer overflow. ## How was this patch tested? Manual test. Author: Sean ZhongCloses #13829 from clockfly/SPARK-16071_2. (cherry picked from commit 5320adc863ca85b489cef79f156392b9da36e53f) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56207fc3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56207fc3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56207fc3 Branch: refs/heads/branch-2.0 Commit: 56207fc3b26cdb8cb50ce460eeab32c06a81bb44 Parents: 6a4f4c1 Author: Sean Zhong Authored: Thu Jun 30 21:56:34 2016 +0800 Committer: Wenchen Fan Committed: Thu Jun 30 21:56:55 2016 +0800 -- .../expressions/codegen/BufferHolder.java | 16 +++- .../expressions/codegen/BufferHolderSuite.scala | 39 2 files changed, 53 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/56207fc3/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java index af61e20..0e4264f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java @@ -45,7 +45,13 @@ public class BufferHolder { } public BufferHolder(UnsafeRow row, int initialSize) { -this.fixedSize = UnsafeRow.calculateBitSetWidthInBytes(row.numFields()) + 8 * row.numFields(); +int bitsetWidthInBytes = UnsafeRow.calculateBitSetWidthInBytes(row.numFields()); +if (row.numFields() > (Integer.MAX_VALUE - initialSize - bitsetWidthInBytes) / 8) { + throw new UnsupportedOperationException( +"Cannot create BufferHolder for input UnsafeRow because there are " + + "too many fields (number of fields: " + row.numFields() + ")"); +} +this.fixedSize = bitsetWidthInBytes + 8 * row.numFields(); this.buffer = new byte[fixedSize + initialSize]; this.row = row; this.row.pointTo(buffer, buffer.length); @@ -55,10 +61,16 @@ public class BufferHolder { * Grows the buffer by at least neededSize and points the row to the buffer. */ public void grow(int neededSize) { +if (neededSize > Integer.MAX_VALUE - totalSize()) { + throw new UnsupportedOperationException( +"Cannot grow BufferHolder by size " + neededSize + " because the size after growing " + + "exceeds size limitation " + Integer.MAX_VALUE); +} final int length = totalSize() + neededSize; if (buffer.length < length) { // This will not happen frequently, because the buffer is re-used. - final byte[] tmp = new byte[length * 2]; + int newLength = length < Integer.MAX_VALUE / 2 ? length * 2 : Integer.MAX_VALUE; + final byte[] tmp = new byte[newLength]; Platform.copyMemory( buffer, Platform.BYTE_ARRAY_OFFSET, http://git-wip-us.apache.org/repos/asf/spark/blob/56207fc3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala new file mode 100644 index 000..c7c386b --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the
spark git commit: [SPARK-16071][SQL] Checks size limit when doubling the array size in BufferHolder
Repository: spark Updated Branches: refs/heads/master de8ab313e -> 5320adc86 [SPARK-16071][SQL] Checks size limit when doubling the array size in BufferHolder ## What changes were proposed in this pull request? This PR Checks the size limit when doubling the array size in BufferHolder to avoid integer overflow. ## How was this patch tested? Manual test. Author: Sean ZhongCloses #13829 from clockfly/SPARK-16071_2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5320adc8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5320adc8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5320adc8 Branch: refs/heads/master Commit: 5320adc863ca85b489cef79f156392b9da36e53f Parents: de8ab31 Author: Sean Zhong Authored: Thu Jun 30 21:56:34 2016 +0800 Committer: Wenchen Fan Committed: Thu Jun 30 21:56:34 2016 +0800 -- .../expressions/codegen/BufferHolder.java | 16 +++- .../expressions/codegen/BufferHolderSuite.scala | 39 2 files changed, 53 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5320adc8/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java index af61e20..0e4264f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java @@ -45,7 +45,13 @@ public class BufferHolder { } public BufferHolder(UnsafeRow row, int initialSize) { -this.fixedSize = UnsafeRow.calculateBitSetWidthInBytes(row.numFields()) + 8 * row.numFields(); +int bitsetWidthInBytes = UnsafeRow.calculateBitSetWidthInBytes(row.numFields()); +if (row.numFields() > (Integer.MAX_VALUE - initialSize - bitsetWidthInBytes) / 8) { + throw new UnsupportedOperationException( +"Cannot create BufferHolder for input UnsafeRow because there are " + + "too many fields (number of fields: " + row.numFields() + ")"); +} +this.fixedSize = bitsetWidthInBytes + 8 * row.numFields(); this.buffer = new byte[fixedSize + initialSize]; this.row = row; this.row.pointTo(buffer, buffer.length); @@ -55,10 +61,16 @@ public class BufferHolder { * Grows the buffer by at least neededSize and points the row to the buffer. */ public void grow(int neededSize) { +if (neededSize > Integer.MAX_VALUE - totalSize()) { + throw new UnsupportedOperationException( +"Cannot grow BufferHolder by size " + neededSize + " because the size after growing " + + "exceeds size limitation " + Integer.MAX_VALUE); +} final int length = totalSize() + neededSize; if (buffer.length < length) { // This will not happen frequently, because the buffer is re-used. - final byte[] tmp = new byte[length * 2]; + int newLength = length < Integer.MAX_VALUE / 2 ? length * 2 : Integer.MAX_VALUE; + final byte[] tmp = new byte[newLength]; Platform.copyMemory( buffer, Platform.BYTE_ARRAY_OFFSET, http://git-wip-us.apache.org/repos/asf/spark/blob/5320adc8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala new file mode 100644 index 000..c7c386b --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the
spark git commit: [SPARK-12177][TEST] Removed test to avoid compilation issue in scala 2.10
Repository: spark Updated Branches: refs/heads/branch-2.0 1d274455c -> 6a4f4c1d7 [SPARK-12177][TEST] Removed test to avoid compilation issue in scala 2.10 ## What changes were proposed in this pull request? The commented lines failed scala 2.10 build. This is because of change in behavior of case classes between 2.10 and 2.11. In scala 2.10, if companion object of a case class has explicitly defined apply(), then the implicit apply method is not generated. In scala 2.11 it is generated. Hence, the lines compile fine in 2.11 but not in 2.10. This simply comments the tests to fix broken build. Correct solution is pending. Author: Tathagata DasCloses #13992 from tdas/SPARK-12177. (cherry picked from commit de8ab313e1fe59f849a62e59349224581ff0b40a) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a4f4c1d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a4f4c1d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a4f4c1d Branch: refs/heads/branch-2.0 Commit: 6a4f4c1d751db9542ba49755e859b55b42be3236 Parents: 1d27445 Author: Tathagata Das Authored: Thu Jun 30 18:06:04 2016 +0800 Committer: Cheng Lian Committed: Thu Jun 30 18:06:20 2016 +0800 -- .../spark/streaming/kafka010/JavaConsumerStrategySuite.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6a4f4c1d/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java -- diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java index aba45f5..8d7c05b 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java @@ -50,8 +50,8 @@ public class JavaConsumerStrategySuite implements Serializable { JavaConverters.mapAsScalaMapConverter(offsets).asScala(); // make sure constructors can be called from java -final ConsumerStrategy sub0 = - Subscribe. apply(topics, kafkaParams, offsets); +// final ConsumerStrategy sub0 = // does not compile in Scala 2.10 +// Subscribe. apply(topics, kafkaParams, offsets); final ConsumerStrategy sub1 = Subscribe. apply(sTopics, sKafkaParams, sOffsets); final ConsumerStrategy sub2 = @@ -65,8 +65,8 @@ public class JavaConsumerStrategySuite implements Serializable { sub1.executorKafkaParams().get("bootstrap.servers"), sub3.executorKafkaParams().get("bootstrap.servers")); -final ConsumerStrategy asn0 = - Assign. apply(parts, kafkaParams, offsets); +// final ConsumerStrategy asn0 = // does not compile in Scala 2.10 +// Assign. apply(parts, kafkaParams, offsets); final ConsumerStrategy asn1 = Assign. apply(sParts, sKafkaParams, sOffsets); final ConsumerStrategy asn2 = - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12177][TEST] Removed test to avoid compilation issue in scala 2.10
Repository: spark Updated Branches: refs/heads/master b30a2dc7c -> de8ab313e [SPARK-12177][TEST] Removed test to avoid compilation issue in scala 2.10 ## What changes were proposed in this pull request? The commented lines failed scala 2.10 build. This is because of change in behavior of case classes between 2.10 and 2.11. In scala 2.10, if companion object of a case class has explicitly defined apply(), then the implicit apply method is not generated. In scala 2.11 it is generated. Hence, the lines compile fine in 2.11 but not in 2.10. This simply comments the tests to fix broken build. Correct solution is pending. Author: Tathagata DasCloses #13992 from tdas/SPARK-12177. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de8ab313 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de8ab313 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de8ab313 Branch: refs/heads/master Commit: de8ab313e1fe59f849a62e59349224581ff0b40a Parents: b30a2dc Author: Tathagata Das Authored: Thu Jun 30 18:06:04 2016 +0800 Committer: Cheng Lian Committed: Thu Jun 30 18:06:04 2016 +0800 -- .../spark/streaming/kafka010/JavaConsumerStrategySuite.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/de8ab313/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java -- diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java index aba45f5..8d7c05b 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java @@ -50,8 +50,8 @@ public class JavaConsumerStrategySuite implements Serializable { JavaConverters.mapAsScalaMapConverter(offsets).asScala(); // make sure constructors can be called from java -final ConsumerStrategy sub0 = - Subscribe. apply(topics, kafkaParams, offsets); +// final ConsumerStrategy sub0 = // does not compile in Scala 2.10 +// Subscribe. apply(topics, kafkaParams, offsets); final ConsumerStrategy sub1 = Subscribe. apply(sTopics, sKafkaParams, sOffsets); final ConsumerStrategy sub2 = @@ -65,8 +65,8 @@ public class JavaConsumerStrategySuite implements Serializable { sub1.executorKafkaParams().get("bootstrap.servers"), sub3.executorKafkaParams().get("bootstrap.servers")); -final ConsumerStrategy asn0 = - Assign. apply(parts, kafkaParams, offsets); +// final ConsumerStrategy asn0 = // does not compile in Scala 2.10 +// Assign. apply(parts, kafkaParams, offsets); final ConsumerStrategy asn1 = Assign. apply(sParts, sKafkaParams, sOffsets); final ConsumerStrategy asn2 = - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16241][ML] model loading backward compatibility for ml NaiveBayes
Repository: spark Updated Branches: refs/heads/master 2c3d96134 -> b30a2dc7c [SPARK-16241][ML] model loading backward compatibility for ml NaiveBayes ## What changes were proposed in this pull request? model loading backward compatibility for ml NaiveBayes ## How was this patch tested? existing ut and manual test for loading models saved by Spark 1.6. Author: zlpmichelleCloses #13940 from zlpmichelle/naivebayes. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b30a2dc7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b30a2dc7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b30a2dc7 Branch: refs/heads/master Commit: b30a2dc7c50bfb70bd2b57be70530a9a9fa94a7a Parents: 2c3d961 Author: zlpmichelle Authored: Thu Jun 30 00:50:14 2016 -0700 Committer: Yanbo Liang Committed: Thu Jun 30 00:50:14 2016 -0700 -- .../org/apache/spark/ml/classification/NaiveBayes.scala | 11 +++ 1 file changed, 7 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b30a2dc7/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala index 7c34031..c99ae30 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala @@ -28,8 +28,9 @@ import org.apache.spark.ml.util._ import org.apache.spark.mllib.classification.{NaiveBayes => OldNaiveBayes} import org.apache.spark.mllib.classification.{NaiveBayesModel => OldNaiveBayesModel} import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint} +import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Dataset +import org.apache.spark.sql.{Dataset, Row} /** * Params for Naive Bayes Classifiers. @@ -275,9 +276,11 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString - val data = sparkSession.read.parquet(dataPath).select("pi", "theta").head() - val pi = data.getAs[Vector](0) - val theta = data.getAs[Matrix](1) + val data = sparkSession.read.parquet(dataPath) + val vecConverted = MLUtils.convertVectorColumnsToML(data, "pi") + val Row(pi: Vector, theta: Matrix) = MLUtils.convertMatrixColumnsToML(vecConverted, "theta") +.select("pi", "theta") +.head() val model = new NaiveBayesModel(metadata.uid, pi, theta) DefaultParamsReader.getAndSetParams(model, metadata) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16241][ML] model loading backward compatibility for ml NaiveBayes
Repository: spark Updated Branches: refs/heads/branch-2.0 c8a7c2305 -> 1d274455c [SPARK-16241][ML] model loading backward compatibility for ml NaiveBayes ## What changes were proposed in this pull request? model loading backward compatibility for ml NaiveBayes ## How was this patch tested? existing ut and manual test for loading models saved by Spark 1.6. Author: zlpmichelleCloses #13940 from zlpmichelle/naivebayes. (cherry picked from commit b30a2dc7c50bfb70bd2b57be70530a9a9fa94a7a) Signed-off-by: Yanbo Liang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d274455 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d274455 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d274455 Branch: refs/heads/branch-2.0 Commit: 1d274455cfa45bc63aee6ecf8dbb1f170ee16af2 Parents: c8a7c23 Author: zlpmichelle Authored: Thu Jun 30 00:50:14 2016 -0700 Committer: Yanbo Liang Committed: Thu Jun 30 00:50:35 2016 -0700 -- .../org/apache/spark/ml/classification/NaiveBayes.scala | 11 +++ 1 file changed, 7 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1d274455/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala index 7c34031..c99ae30 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala @@ -28,8 +28,9 @@ import org.apache.spark.ml.util._ import org.apache.spark.mllib.classification.{NaiveBayes => OldNaiveBayes} import org.apache.spark.mllib.classification.{NaiveBayesModel => OldNaiveBayesModel} import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint} +import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Dataset +import org.apache.spark.sql.{Dataset, Row} /** * Params for Naive Bayes Classifiers. @@ -275,9 +276,11 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString - val data = sparkSession.read.parquet(dataPath).select("pi", "theta").head() - val pi = data.getAs[Vector](0) - val theta = data.getAs[Matrix](1) + val data = sparkSession.read.parquet(dataPath) + val vecConverted = MLUtils.convertVectorColumnsToML(data, "pi") + val Row(pi: Vector, theta: Matrix) = MLUtils.convertMatrixColumnsToML(vecConverted, "theta") +.select("pi", "theta") +.head() val model = new NaiveBayesModel(metadata.uid, pi, theta) DefaultParamsReader.getAndSetParams(model, metadata) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16257][BUILD] Update spark_ec2.py to support Spark 1.6.2 and 1.6.3.
Repository: spark Updated Branches: refs/heads/branch-1.6 1ac830aca -> ccc7fa357 [SPARK-16257][BUILD] Update spark_ec2.py to support Spark 1.6.2 and 1.6.3. ## What changes were proposed in this pull request? - Adds 1.6.2 and 1.6.3 as supported Spark versions within the bundled spark-ec2 script. - Makes the default Spark version 1.6.3 to keep in sync with the upcoming release. - Does not touch the newer spark-ec2 scripts in the separate amplabs repository. ## How was this patch tested? - Manual script execution: export AWS_SECRET_ACCESS_KEY=_snip_ export AWS_ACCESS_KEY_ID=_snip_ $SPARK_HOME/ec2/spark-ec2 \ --key-pair=_snip_ \ --identity-file=_snip_ \ --region=us-east-1 \ --vpc-id=_snip_ \ --slaves=1 \ --instance-type=t1.micro \ --spark-version=1.6.2 \ --hadoop-major-version=yarn \ launch test-cluster - Result: Successful creation of a 1.6.2-based Spark cluster. This contribution is my original work and I license the work to the project under the project's open source license. Author: Brian UriCloses #13947 from briuri/branch-1.6-bug-spark-16257. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccc7fa35 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccc7fa35 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccc7fa35 Branch: refs/heads/branch-1.6 Commit: ccc7fa357099e0f621cfc02448ba20d3f6fabc14 Parents: 1ac830a Author: Brian Uri Authored: Thu Jun 30 07:52:28 2016 +0100 Committer: Sean Owen Committed: Thu Jun 30 07:52:28 2016 +0100 -- ec2/spark_ec2.py | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ccc7fa35/ec2/spark_ec2.py -- diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 76c09f0..b28b4c5 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -51,7 +51,7 @@ else: raw_input = input xrange = range -SPARK_EC2_VERSION = "1.6.1" +SPARK_EC2_VERSION = "1.6.3" SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__)) VALID_SPARK_VERSIONS = set([ @@ -77,6 +77,8 @@ VALID_SPARK_VERSIONS = set([ "1.5.2", "1.6.0", "1.6.1", +"1.6.2", +"1.6.3", ]) SPARK_TACHYON_MAP = { @@ -96,6 +98,8 @@ SPARK_TACHYON_MAP = { "1.5.2": "0.7.1", "1.6.0": "0.8.2", "1.6.1": "0.8.2", +"1.6.2": "0.8.2", +"1.6.3": "0.8.2", } DEFAULT_SPARK_VERSION = SPARK_EC2_VERSION @@ -103,7 +107,7 @@ DEFAULT_SPARK_GITHUB_REPO = "https://github.com/apache/spark; # Default location to get the spark-ec2 scripts (and ami-list) from DEFAULT_SPARK_EC2_GITHUB_REPO = "https://github.com/amplab/spark-ec2; -DEFAULT_SPARK_EC2_BRANCH = "branch-1.5" +DEFAULT_SPARK_EC2_BRANCH = "branch-1.6" def setup_external_libs(libs): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16256][DOCS] Minor fixes on the Structured Streaming Programming Guide
Repository: spark Updated Branches: refs/heads/branch-2.0 3134f116a -> c8a7c2305 [SPARK-16256][DOCS] Minor fixes on the Structured Streaming Programming Guide Author: Tathagata DasCloses #13978 from tdas/SPARK-16256-1. (cherry picked from commit 2c3d96134dcc0428983eea087db7e91072215aea) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8a7c230 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8a7c230 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8a7c230 Branch: refs/heads/branch-2.0 Commit: c8a7c23054209db5474d96de2a7e2d8a6f8cc0da Parents: 3134f11 Author: Tathagata Das Authored: Wed Jun 29 23:38:19 2016 -0700 Committer: Tathagata Das Committed: Wed Jun 29 23:38:35 2016 -0700 -- docs/structured-streaming-programming-guide.md | 44 +++-- 1 file changed, 23 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c8a7c230/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 9ed06be..5932566 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -459,7 +459,7 @@ val csvDF = spark .readStream .option("sep", ";") .schema(userSchema) // Specify schema of the parquet files -.csv("/path/to/directory")// Equivalent to format("cv").load("/path/to/directory") +.csv("/path/to/directory")// Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} @@ -486,7 +486,7 @@ Dataset[Row] csvDF = spark .readStream() .option("sep", ";") .schema(userSchema) // Specify schema of the parquet files -.csv("/path/to/directory");// Equivalent to format("cv").load("/path/to/directory") +.csv("/path/to/directory");// Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} @@ -513,7 +513,7 @@ csvDF = spark \ .readStream() \ .option("sep", ";") \ .schema(userSchema) \ -.csv("/path/to/directory")# Equivalent to format("cv").load("/path/to/directory") +.csv("/path/to/directory")# Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} @@ -522,10 +522,10 @@ csvDF = spark \ These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like `map`, `flatMap`, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the SQL Programming Guide for more details. Additionally, more details on the supported streaming sources are discussed later in the document. ## Operations on streaming DataFrames/Datasets -You can apply all kinds of operations on streaming DataFrames/Datasets - ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the SQL programming guide for more details. Letâs take a look at a few example operations that you can use. +You can apply all kinds of operations on streaming DataFrames/Datasets - ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the [SQL programming guide](sql-programming-guide.html) for more details. Letâs take a look at a few example operations that you can use. ### Basic Operations - Selection, Projection, Aggregation -Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are discussed later in this section. +Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are [discussed later](#unsupported-operations) in this section. @@ -618,7 +618,7 @@ df.groupBy("type").count() ### Window Operations on Event Time -Aggregations over a sliding event-time window are straightforward with Structured Streaming. The key idea to understand about window-based aggregations are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of, window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let's understand this
spark git commit: [SPARK-16256][DOCS] Minor fixes on the Structured Streaming Programming Guide
Repository: spark Updated Branches: refs/heads/master dedbceec1 -> 2c3d96134 [SPARK-16256][DOCS] Minor fixes on the Structured Streaming Programming Guide Author: Tathagata DasCloses #13978 from tdas/SPARK-16256-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c3d9613 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c3d9613 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c3d9613 Branch: refs/heads/master Commit: 2c3d96134dcc0428983eea087db7e91072215aea Parents: dedbcee Author: Tathagata Das Authored: Wed Jun 29 23:38:19 2016 -0700 Committer: Tathagata Das Committed: Wed Jun 29 23:38:19 2016 -0700 -- docs/structured-streaming-programming-guide.md | 44 +++-- 1 file changed, 23 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2c3d9613/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 9ed06be..5932566 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -459,7 +459,7 @@ val csvDF = spark .readStream .option("sep", ";") .schema(userSchema) // Specify schema of the parquet files -.csv("/path/to/directory")// Equivalent to format("cv").load("/path/to/directory") +.csv("/path/to/directory")// Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} @@ -486,7 +486,7 @@ Dataset[Row] csvDF = spark .readStream() .option("sep", ";") .schema(userSchema) // Specify schema of the parquet files -.csv("/path/to/directory");// Equivalent to format("cv").load("/path/to/directory") +.csv("/path/to/directory");// Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} @@ -513,7 +513,7 @@ csvDF = spark \ .readStream() \ .option("sep", ";") \ .schema(userSchema) \ -.csv("/path/to/directory")# Equivalent to format("cv").load("/path/to/directory") +.csv("/path/to/directory")# Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} @@ -522,10 +522,10 @@ csvDF = spark \ These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like `map`, `flatMap`, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the SQL Programming Guide for more details. Additionally, more details on the supported streaming sources are discussed later in the document. ## Operations on streaming DataFrames/Datasets -You can apply all kinds of operations on streaming DataFrames/Datasets - ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the SQL programming guide for more details. Letâs take a look at a few example operations that you can use. +You can apply all kinds of operations on streaming DataFrames/Datasets - ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the [SQL programming guide](sql-programming-guide.html) for more details. Letâs take a look at a few example operations that you can use. ### Basic Operations - Selection, Projection, Aggregation -Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are discussed later in this section. +Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are [discussed later](#unsupported-operations) in this section. @@ -618,7 +618,7 @@ df.groupBy("type").count() ### Window Operations on Event Time -Aggregations over a sliding event-time window are straightforward with Structured Streaming. The key idea to understand about window-based aggregations are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of, window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let's understand this with an illustration. +Aggregations over a sliding event-time window are straightforward with Structured Streaming. The key idea to
[1/2] spark git commit: [SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API
Repository: spark Updated Branches: refs/heads/branch-2.0 a54852350 -> 3134f116a http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java -- diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java new file mode 100644 index 000..aba45f5 --- /dev/null +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010; + +import java.io.Serializable; +import java.util.*; + +import scala.collection.JavaConverters; + +import org.apache.kafka.common.TopicPartition; + +import org.junit.Assert; +import org.junit.Test; + +public class JavaConsumerStrategySuite implements Serializable { + + @Test + public void testConsumerStrategyConstructors() { +final String topic1 = "topic1"; +final Collection topics = Arrays.asList(topic1); +final scala.collection.Iterable sTopics = + JavaConverters.collectionAsScalaIterableConverter(topics).asScala(); +final TopicPartition tp1 = new TopicPartition(topic1, 0); +final TopicPartition tp2 = new TopicPartition(topic1, 1); +final Collection parts = Arrays.asList(tp1, tp2); +final scala.collection.Iterable sParts = + JavaConverters.collectionAsScalaIterableConverter(parts).asScala(); +final MapkafkaParams = new HashMap (); +kafkaParams.put("bootstrap.servers", "not used"); +final scala.collection.Map sKafkaParams = + JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(); +final Map offsets = new HashMap<>(); +offsets.put(tp1, 23L); +final scala.collection.Map sOffsets = + JavaConverters.mapAsScalaMapConverter(offsets).asScala(); + +// make sure constructors can be called from java +final ConsumerStrategy sub0 = + Subscribe. apply(topics, kafkaParams, offsets); +final ConsumerStrategy sub1 = + Subscribe. apply(sTopics, sKafkaParams, sOffsets); +final ConsumerStrategy sub2 = + Subscribe. apply(sTopics, sKafkaParams); +final ConsumerStrategy sub3 = + Subscribe. create(topics, kafkaParams, offsets); +final ConsumerStrategy sub4 = + Subscribe. create(topics, kafkaParams); + +Assert.assertEquals( + sub1.executorKafkaParams().get("bootstrap.servers"), + sub3.executorKafkaParams().get("bootstrap.servers")); + +final ConsumerStrategy asn0 = + Assign. apply(parts, kafkaParams, offsets); +final ConsumerStrategy asn1 = + Assign. apply(sParts, sKafkaParams, sOffsets); +final ConsumerStrategy asn2 = + Assign. apply(sParts, sKafkaParams); +final ConsumerStrategy asn3 = + Assign. create(parts, kafkaParams, offsets); +final ConsumerStrategy asn4 = + Assign. create(parts, kafkaParams); + +Assert.assertEquals( + asn1.executorKafkaParams().get("bootstrap.servers"), + asn3.executorKafkaParams().get("bootstrap.servers")); + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java -- diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java new file mode 100644 index 000..e57ede7 --- /dev/null +++
[2/2] spark git commit: [SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API
[SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API ## What changes were proposed in this pull request? New Kafka consumer api for the released 0.10 version of Kafka ## How was this patch tested? Unit tests, manual tests Author: cody koeningerCloses #11863 from koeninger/kafka-0.9. (cherry picked from commit dedbceec1ef33ccd88101016de969a1ef3e3e142) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3134f116 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3134f116 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3134f116 Branch: refs/heads/branch-2.0 Commit: 3134f116a3565c3a299fa2e7094acd7304d64280 Parents: a548523 Author: cody koeninger Authored: Wed Jun 29 23:21:03 2016 -0700 Committer: Tathagata Das Committed: Wed Jun 29 23:21:15 2016 -0700 -- external/kafka-0-10-assembly/pom.xml| 176 ++ external/kafka-0-10/pom.xml | 98 +++ .../kafka010/CachedKafkaConsumer.scala | 189 ++ .../streaming/kafka010/ConsumerStrategy.scala | 314 ++ .../kafka010/DirectKafkaInputDStream.scala | 318 ++ .../spark/streaming/kafka010/KafkaRDD.scala | 232 +++ .../streaming/kafka010/KafkaRDDPartition.scala | 45 ++ .../streaming/kafka010/KafkaTestUtils.scala | 277 + .../spark/streaming/kafka010/KafkaUtils.scala | 175 ++ .../streaming/kafka010/LocationStrategy.scala | 77 +++ .../spark/streaming/kafka010/OffsetRange.scala | 153 + .../spark/streaming/kafka010/package-info.java | 21 + .../spark/streaming/kafka010/package.scala | 23 + .../kafka010/JavaConsumerStrategySuite.java | 84 +++ .../kafka010/JavaDirectKafkaStreamSuite.java| 180 ++ .../streaming/kafka010/JavaKafkaRDDSuite.java | 122 .../kafka010/JavaLocationStrategySuite.java | 58 ++ .../src/test/resources/log4j.properties | 28 + .../kafka010/DirectKafkaStreamSuite.scala | 612 +++ .../streaming/kafka010/KafkaRDDSuite.scala | 169 + pom.xml | 2 + project/SparkBuild.scala| 12 +- 22 files changed, 3359 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10-assembly/pom.xml -- diff --git a/external/kafka-0-10-assembly/pom.xml b/external/kafka-0-10-assembly/pom.xml new file mode 100644 index 000..f2468d1 --- /dev/null +++ b/external/kafka-0-10-assembly/pom.xml @@ -0,0 +1,176 @@ + + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + 4.0.0 + +org.apache.spark +spark-parent_2.11 +2.0.0-SNAPSHOT +../../pom.xml + + + org.apache.spark + spark-streaming-kafka-0-10-assembly_2.11 + jar + Spark Integration for Kafka 0.10 Assembly + http://spark.apache.org/ + + +streaming-kafka-0-10-assembly + + + + + org.apache.spark + spark-streaming-kafka-0-10_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + provided + + + + commons-codec + commons-codec + provided + + + commons-lang + commons-lang + provided + + + com.google.protobuf + protobuf-java + provided + + + net.jpountz.lz4 + lz4 + provided + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.avro + avro-mapred + ${avro.mapred.classifier} + provided + + + org.apache.curator + curator-recipes + provided + + + org.apache.zookeeper + zookeeper + provided + + + log4j + log4j + provided + + + net.java.dev.jets3t + jets3t + provided + + + org.scala-lang + scala-library + provided + + + org.slf4j + slf4j-api + provided + + + org.slf4j + slf4j-log4j12 + provided + + + org.xerial.snappy + snappy-java + provided + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.apache.maven.plugins + maven-shade-plugin + +false + + +*:* + + + + +*:* + +
[1/2] spark git commit: [SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API
Repository: spark Updated Branches: refs/heads/master bde1d6a61 -> dedbceec1 http://git-wip-us.apache.org/repos/asf/spark/blob/dedbceec/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java -- diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java new file mode 100644 index 000..aba45f5 --- /dev/null +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010; + +import java.io.Serializable; +import java.util.*; + +import scala.collection.JavaConverters; + +import org.apache.kafka.common.TopicPartition; + +import org.junit.Assert; +import org.junit.Test; + +public class JavaConsumerStrategySuite implements Serializable { + + @Test + public void testConsumerStrategyConstructors() { +final String topic1 = "topic1"; +final Collection topics = Arrays.asList(topic1); +final scala.collection.Iterable sTopics = + JavaConverters.collectionAsScalaIterableConverter(topics).asScala(); +final TopicPartition tp1 = new TopicPartition(topic1, 0); +final TopicPartition tp2 = new TopicPartition(topic1, 1); +final Collection parts = Arrays.asList(tp1, tp2); +final scala.collection.Iterable sParts = + JavaConverters.collectionAsScalaIterableConverter(parts).asScala(); +final MapkafkaParams = new HashMap (); +kafkaParams.put("bootstrap.servers", "not used"); +final scala.collection.Map sKafkaParams = + JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(); +final Map offsets = new HashMap<>(); +offsets.put(tp1, 23L); +final scala.collection.Map sOffsets = + JavaConverters.mapAsScalaMapConverter(offsets).asScala(); + +// make sure constructors can be called from java +final ConsumerStrategy sub0 = + Subscribe. apply(topics, kafkaParams, offsets); +final ConsumerStrategy sub1 = + Subscribe. apply(sTopics, sKafkaParams, sOffsets); +final ConsumerStrategy sub2 = + Subscribe. apply(sTopics, sKafkaParams); +final ConsumerStrategy sub3 = + Subscribe. create(topics, kafkaParams, offsets); +final ConsumerStrategy sub4 = + Subscribe. create(topics, kafkaParams); + +Assert.assertEquals( + sub1.executorKafkaParams().get("bootstrap.servers"), + sub3.executorKafkaParams().get("bootstrap.servers")); + +final ConsumerStrategy asn0 = + Assign. apply(parts, kafkaParams, offsets); +final ConsumerStrategy asn1 = + Assign. apply(sParts, sKafkaParams, sOffsets); +final ConsumerStrategy asn2 = + Assign. apply(sParts, sKafkaParams); +final ConsumerStrategy asn3 = + Assign. create(parts, kafkaParams, offsets); +final ConsumerStrategy asn4 = + Assign. create(parts, kafkaParams); + +Assert.assertEquals( + asn1.executorKafkaParams().get("bootstrap.servers"), + asn3.executorKafkaParams().get("bootstrap.servers")); + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/dedbceec/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java -- diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java new file mode 100644 index 000..e57ede7 --- /dev/null +++
[2/2] spark git commit: [SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API
[SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API ## What changes were proposed in this pull request? New Kafka consumer api for the released 0.10 version of Kafka ## How was this patch tested? Unit tests, manual tests Author: cody koeningerCloses #11863 from koeninger/kafka-0.9. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dedbceec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dedbceec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dedbceec Branch: refs/heads/master Commit: dedbceec1ef33ccd88101016de969a1ef3e3e142 Parents: bde1d6a Author: cody koeninger Authored: Wed Jun 29 23:21:03 2016 -0700 Committer: Tathagata Das Committed: Wed Jun 29 23:21:03 2016 -0700 -- external/kafka-0-10-assembly/pom.xml| 176 ++ external/kafka-0-10/pom.xml | 98 +++ .../kafka010/CachedKafkaConsumer.scala | 189 ++ .../streaming/kafka010/ConsumerStrategy.scala | 314 ++ .../kafka010/DirectKafkaInputDStream.scala | 318 ++ .../spark/streaming/kafka010/KafkaRDD.scala | 232 +++ .../streaming/kafka010/KafkaRDDPartition.scala | 45 ++ .../streaming/kafka010/KafkaTestUtils.scala | 277 + .../spark/streaming/kafka010/KafkaUtils.scala | 175 ++ .../streaming/kafka010/LocationStrategy.scala | 77 +++ .../spark/streaming/kafka010/OffsetRange.scala | 153 + .../spark/streaming/kafka010/package-info.java | 21 + .../spark/streaming/kafka010/package.scala | 23 + .../kafka010/JavaConsumerStrategySuite.java | 84 +++ .../kafka010/JavaDirectKafkaStreamSuite.java| 180 ++ .../streaming/kafka010/JavaKafkaRDDSuite.java | 122 .../kafka010/JavaLocationStrategySuite.java | 58 ++ .../src/test/resources/log4j.properties | 28 + .../kafka010/DirectKafkaStreamSuite.scala | 612 +++ .../streaming/kafka010/KafkaRDDSuite.scala | 169 + pom.xml | 2 + project/SparkBuild.scala| 12 +- 22 files changed, 3359 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dedbceec/external/kafka-0-10-assembly/pom.xml -- diff --git a/external/kafka-0-10-assembly/pom.xml b/external/kafka-0-10-assembly/pom.xml new file mode 100644 index 000..f2468d1 --- /dev/null +++ b/external/kafka-0-10-assembly/pom.xml @@ -0,0 +1,176 @@ + + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + 4.0.0 + +org.apache.spark +spark-parent_2.11 +2.0.0-SNAPSHOT +../../pom.xml + + + org.apache.spark + spark-streaming-kafka-0-10-assembly_2.11 + jar + Spark Integration for Kafka 0.10 Assembly + http://spark.apache.org/ + + +streaming-kafka-0-10-assembly + + + + + org.apache.spark + spark-streaming-kafka-0-10_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + provided + + + + commons-codec + commons-codec + provided + + + commons-lang + commons-lang + provided + + + com.google.protobuf + protobuf-java + provided + + + net.jpountz.lz4 + lz4 + provided + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.avro + avro-mapred + ${avro.mapred.classifier} + provided + + + org.apache.curator + curator-recipes + provided + + + org.apache.zookeeper + zookeeper + provided + + + log4j + log4j + provided + + + net.java.dev.jets3t + jets3t + provided + + + org.scala-lang + scala-library + provided + + + org.slf4j + slf4j-api + provided + + + org.slf4j + slf4j-log4j12 + provided + + + org.xerial.snappy + snappy-java + provided + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.apache.maven.plugins + maven-shade-plugin + +false + + +*:* + + + + +*:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + +