spark git commit: [SPARK-15885][WEB UI] Provide links to executor logs from stage details page in UI
Repository: spark Updated Branches: refs/heads/master 4b5a72c7d -> ce3ea9698 [SPARK-15885][WEB UI] Provide links to executor logs from stage details page in UI ## What changes were proposed in this pull request? This moves over old PR https://github.com/apache/spark/pull/13664 to target master rather than branch-1.6. Added links to logs (or an indication that there are no logs) for entries which list an executor in the stage details page of the UI. This helps streamline the workflow where a user views a stage details page and determines that they would like to see the associated executor log for further examination. Previously, a user would have to cross reference the executor id listed on the stage details page with the corresponding entry on the executors tab. Link to the JIRA: https://issues.apache.org/jira/browse/SPARK-15885 ## How was this patch tested? Ran existing unit tests. Ran test queries on a platform which did not record executor logs and again on a platform which did record executor logs and verified that the new table column was empty and links to the logs (which were verified as linking to the appropriate files), respectively. Attached is a screenshot of the UI page with no links, with the new columns highlighted. Additional screenshot of these columns with the populated links. Without links: ![updated without logs](https://cloud.githubusercontent.com/assets/1450821/16059721/2b69dbaa-3239-11e6-9eed-e539764ca159.png) With links: ![updated with logs](https://cloud.githubusercontent.com/assets/1450821/16059725/32c6e316-3239-11e6-90bd-2553f43f7779.png) This contribution is my original work and I license the work to the project under the Apache Spark project's open source license. Author: Tom Magrino Closes #13861 from tmagrino/uilogstweak. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce3ea969 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce3ea969 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce3ea969 Branch: refs/heads/master Commit: ce3ea96980e4b31ee0e26d3054c9be94be6f2003 Parents: 4b5a72c Author: Tom Magrino Authored: Thu Jul 7 00:02:39 2016 -0700 Committer: Reynold Xin Committed: Thu Jul 7 00:02:39 2016 -0700 -- .../apache/spark/ui/jobs/ExecutorTable.scala| 12 ++- .../org/apache/spark/ui/jobs/StagePage.scala| 33 +++- .../org/apache/spark/ui/jobs/StagesTab.scala| 1 + .../org/apache/spark/ui/StagePageSuite.scala| 4 +++ 4 files changed, 42 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ce3ea969/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 293f143..133c3b1 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -114,7 +114,17 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage case Some(stageData: StageUIData) => stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) => -{k} + + {k} + + { +val logs = parent.executorsListener.executorToLogUrls.getOrElse(k, Map.empty) +logs.map { + case (logName, logUrl) => {logName} +} + } + + {executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")} {UIUtils.formatDuration(v.taskTime)} {v.failedTasks + v.succeededTasks + v.killedTasks} http://git-wip-us.apache.org/repos/asf/spark/blob/ce3ea969/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index a5e2a20..ea7acc4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -30,6 +30,7 @@ import org.apache.spark.SparkConf import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo, TaskLocality} import org.apache.spark.ui._ +import org.apache.spark.ui.exec.ExecutorsListener import org.apache.spark.ui.jobs.UIData._ import org.apache.spark.util.{Distribution, Utils} @@ -39,6 +40,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { private val progressListener = parent.progressListe
spark git commit: [SPARK-16368][SQL] Fix Strange Errors When Creating View With Unmatched Column Num
Repository: spark Updated Branches: refs/heads/master ce3ea9698 -> ab05db0b4 [SPARK-16368][SQL] Fix Strange Errors When Creating View With Unmatched Column Num What changes were proposed in this pull request? When creating a view, a common user error is the number of columns produced by the `SELECT` clause does not match the number of column names specified by `CREATE VIEW`. For example, given Table `t1` only has 3 columns ```SQL create view v1(col2, col4, col3, col5) as select * from t1 ``` Currently, Spark SQL reports the following error: ``` requirement failed java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:212) at org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:90) ``` This error message is very confusing. This PR is to detect the error and issue a meaningful error message. How was this patch tested? Added test cases Author: gatorsmile Closes #14047 from gatorsmile/viewMismatchedColumns. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab05db0b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab05db0b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab05db0b Branch: refs/heads/master Commit: ab05db0b48f395543cd7d91e2ad9dd760516868b Parents: ce3ea96 Author: gatorsmile Authored: Thu Jul 7 00:07:25 2016 -0700 Committer: Reynold Xin Committed: Thu Jul 7 00:07:25 2016 -0700 -- .../spark/sql/execution/command/views.scala | 6 - .../spark/sql/execution/command/DDLSuite.scala | 23 .../spark/sql/hive/execution/HiveDDLSuite.scala | 23 3 files changed, 51 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ab05db0b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 007fa46..16b333a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -88,7 +88,11 @@ case class CreateViewCommand( qe.assertAnalyzed() val analyzedPlan = qe.analyzed -require(tableDesc.schema == Nil || tableDesc.schema.length == analyzedPlan.output.length) +if (tableDesc.schema != Nil && tableDesc.schema.length != analyzedPlan.output.length) { + throw new AnalysisException(s"The number of columns produced by the SELECT clause " + +s"(num: `${analyzedPlan.output.length}`) does not match the number of column names " + +s"specified by CREATE VIEW (num: `${tableDesc.schema.length}`).") +} val sessionState = sparkSession.sessionState if (isTemporary) { http://git-wip-us.apache.org/repos/asf/spark/blob/ab05db0b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 0ee8d17..7d1f1d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1314,6 +1314,29 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("create temporary view with mismatched schema") { +withTable("tab1") { + spark.range(10).write.saveAsTable("tab1") + withView("view1") { +val e = intercept[AnalysisException] { + sql("CREATE TEMPORARY VIEW view1 (col1, col3) AS SELECT * FROM tab1") +}.getMessage +assert(e.contains("the SELECT clause (num: `1`) does not match") + && e.contains("CREATE VIEW (num: `2`)")) + } +} + } + + test("create temporary view with specified schema") { +withView("view1") { + sql("CREATE TEMPORARY VIEW view1 (col1, col2) AS SELECT 1, 2") + checkAnswer( +sql("SELECT * FROM view1"), +Row(1, 2) :: Nil + ) +} + } + test("truncate table - external table, temporary table, view (not allowed)") { import testImplicits._ val path = Utils.createTempDir().getAbsolutePath http://git-wip-us.apache.org/repos/asf/spark/blob/ab05db0b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scal
spark git commit: [SPARK-16368][SQL] Fix Strange Errors When Creating View With Unmatched Column Num
Repository: spark Updated Branches: refs/heads/branch-2.0 920162a1e -> d63428af6 [SPARK-16368][SQL] Fix Strange Errors When Creating View With Unmatched Column Num What changes were proposed in this pull request? When creating a view, a common user error is the number of columns produced by the `SELECT` clause does not match the number of column names specified by `CREATE VIEW`. For example, given Table `t1` only has 3 columns ```SQL create view v1(col2, col4, col3, col5) as select * from t1 ``` Currently, Spark SQL reports the following error: ``` requirement failed java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:212) at org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:90) ``` This error message is very confusing. This PR is to detect the error and issue a meaningful error message. How was this patch tested? Added test cases Author: gatorsmile Closes #14047 from gatorsmile/viewMismatchedColumns. (cherry picked from commit ab05db0b48f395543cd7d91e2ad9dd760516868b) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d63428af Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d63428af Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d63428af Branch: refs/heads/branch-2.0 Commit: d63428af6d1c7c0a0533567a0a7ccb5817a65de3 Parents: 920162a Author: gatorsmile Authored: Thu Jul 7 00:07:25 2016 -0700 Committer: Reynold Xin Committed: Thu Jul 7 00:07:31 2016 -0700 -- .../spark/sql/execution/command/views.scala | 6 - .../spark/sql/execution/command/DDLSuite.scala | 23 .../spark/sql/hive/execution/HiveDDLSuite.scala | 23 3 files changed, 51 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d63428af/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 088f684..6533d79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -88,7 +88,11 @@ case class CreateViewCommand( qe.assertAnalyzed() val analyzedPlan = qe.analyzed -require(tableDesc.schema == Nil || tableDesc.schema.length == analyzedPlan.output.length) +if (tableDesc.schema != Nil && tableDesc.schema.length != analyzedPlan.output.length) { + throw new AnalysisException(s"The number of columns produced by the SELECT clause " + +s"(num: `${analyzedPlan.output.length}`) does not match the number of column names " + +s"specified by CREATE VIEW (num: `${tableDesc.schema.length}`).") +} val sessionState = sparkSession.sessionState if (isTemporary) { http://git-wip-us.apache.org/repos/asf/spark/blob/d63428af/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 0ee8d17..7d1f1d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1314,6 +1314,29 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("create temporary view with mismatched schema") { +withTable("tab1") { + spark.range(10).write.saveAsTable("tab1") + withView("view1") { +val e = intercept[AnalysisException] { + sql("CREATE TEMPORARY VIEW view1 (col1, col3) AS SELECT * FROM tab1") +}.getMessage +assert(e.contains("the SELECT clause (num: `1`) does not match") + && e.contains("CREATE VIEW (num: `2`)")) + } +} + } + + test("create temporary view with specified schema") { +withView("view1") { + sql("CREATE TEMPORARY VIEW view1 (col1, col2) AS SELECT 1, 2") + checkAnswer( +sql("SELECT * FROM view1"), +Row(1, 2) :: Nil + ) +} + } + test("truncate table - external table, temporary table, view (not allowed)") { import testImplicits._ val path = Utils.createTempDir().getAbsolutePath http://git-wip-us.apache.org/repos/asf/spark/blob/d63428af/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
spark git commit: [SPARK-16400][SQL] Remove InSet filter pushdown from Parquet
Repository: spark Updated Branches: refs/heads/master ab05db0b4 -> 986b25140 [SPARK-16400][SQL] Remove InSet filter pushdown from Parquet ## What changes were proposed in this pull request? This patch removes InSet filter pushdown from Parquet data source, since row-based pushdown is not beneficial to Spark and brings extra complexity to the code base. ## How was this patch tested? N/A Author: Reynold Xin Closes #14076 from rxin/SPARK-16400. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/986b2514 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/986b2514 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/986b2514 Branch: refs/heads/master Commit: 986b2514013ed9ebab526f2cf3dc714cc9e480bf Parents: ab05db0 Author: Reynold Xin Authored: Thu Jul 7 18:09:18 2016 +0800 Committer: Cheng Lian Committed: Thu Jul 7 18:09:18 2016 +0800 -- .../org/apache/spark/sql/types/StructType.scala | 7 ++- .../datasources/parquet/ParquetFilters.scala| 57 +--- .../parquet/ParquetFilterSuite.scala| 30 --- 3 files changed, 18 insertions(+), 76 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/986b2514/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 0284ecc..0c2ebb0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -22,7 +22,7 @@ import scala.util.Try import org.json4s.JsonDSL._ -import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.SparkException import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering} import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser} @@ -389,6 +389,11 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru object StructType extends AbstractDataType { + /** + * A key used in field metadata to indicate that the field comes from the result of merging + * two different StructTypes that do not always contain the field. That is to say, the field + * might be missing (optional) from one of the StructTypes. + */ private[sql] val metadataKeyForOptionalField = "_OPTIONAL_" override private[sql] def defaultConcreteType: DataType = new StructType http://git-wip-us.apache.org/repos/asf/spark/blob/986b2514/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index e0a113a..426263f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.io.Serializable - import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary @@ -26,18 +24,10 @@ import org.apache.parquet.io.api.Binary import org.apache.spark.sql.sources import org.apache.spark.sql.types._ +/** + * Some utility function to convert Spark data source filters to Parquet filters. + */ private[sql] object ParquetFilters { - case class SetInFilter[T <: Comparable[T]]( -valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable { - -override def keep(value: T): Boolean = { - value != null && valueSet.contains(value) -} - -override def canDrop(statistics: Statistics[T]): Boolean = false - -override def inverseCanDrop(statistics: Statistics[T]): Boolean = false - } private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { case BooleanType => @@ -154,36 +144,16 @@ private[sql] object ParquetFilters { FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) } - private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = { -case IntegerType => - (n: String, v: Set[Any]) => -FilterApi.userDefined(intColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Integer]])) -case LongType => - (n
411ED44345
411ED44345.docm Description: application/vnd.ms-word.document.macroenabled.12 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16372][MLLIB] Retag RDD to tallSkinnyQR of RowMatrix
Repository: spark Updated Branches: refs/heads/branch-1.6 2588776ad -> 45dda9221 [SPARK-16372][MLLIB] Retag RDD to tallSkinnyQR of RowMatrix ## What changes were proposed in this pull request? The following Java code because of type erasing: ```Java JavaRDD rows = jsc.parallelize(...); RowMatrix mat = new RowMatrix(rows.rdd()); QRDecomposition result = mat.tallSkinnyQR(true); ``` We should use retag to restore the type to prevent the following exception: ```Java java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.spark.mllib.linalg.Vector; ``` ## How was this patch tested? Java unit test Author: Xusen Yin Closes #14051 from yinxusen/SPARK-16372. (cherry picked from commit 4c6f00d09c016dfc1d2de6e694dff219c9027fa0) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45dda922 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45dda922 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45dda922 Branch: refs/heads/branch-1.6 Commit: 45dda92214191310a56333a2085e2343eba170cd Parents: 2588776 Author: Xusen Yin Authored: Thu Jul 7 11:28:04 2016 +0100 Committer: Sean Owen Committed: Thu Jul 7 11:28:29 2016 +0100 -- .../spark/mllib/api/python/PythonMLLibAPI.scala | 2 +- .../mllib/linalg/distributed/RowMatrix.scala| 2 +- .../linalg/distributed/JavaRowMatrixSuite.java | 44 3 files changed, 46 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/45dda922/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 1714983..a059e38 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1110,7 +1110,7 @@ private[python] class PythonMLLibAPI extends Serializable { * Wrapper around RowMatrix constructor. */ def createRowMatrix(rows: JavaRDD[Vector], numRows: Long, numCols: Int): RowMatrix = { -new RowMatrix(rows.rdd.retag(classOf[Vector]), numRows, numCols) +new RowMatrix(rows.rdd, numRows, numCols) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/45dda922/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 52c0f19..b941d1f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -526,7 +526,7 @@ class RowMatrix @Since("1.0.0") ( def tallSkinnyQR(computeQ: Boolean = false): QRDecomposition[RowMatrix, Matrix] = { val col = numCols().toInt // split rows horizontally into smaller matrices, and compute QR for each of them -val blockQRs = rows.glom().map { partRows => +val blockQRs = rows.retag(classOf[Vector]).glom().map { partRows => val bdm = BDM.zeros[Double](partRows.length, col) var i = 0 partRows.foreach { row => http://git-wip-us.apache.org/repos/asf/spark/blob/45dda922/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java -- diff --git a/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java b/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java new file mode 100644 index 000..c01af40 --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing
spark git commit: [SPARK-16372][MLLIB] Retag RDD to tallSkinnyQR of RowMatrix
Repository: spark Updated Branches: refs/heads/master 986b25140 -> 4c6f00d09 [SPARK-16372][MLLIB] Retag RDD to tallSkinnyQR of RowMatrix ## What changes were proposed in this pull request? The following Java code because of type erasing: ```Java JavaRDD rows = jsc.parallelize(...); RowMatrix mat = new RowMatrix(rows.rdd()); QRDecomposition result = mat.tallSkinnyQR(true); ``` We should use retag to restore the type to prevent the following exception: ```Java java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.spark.mllib.linalg.Vector; ``` ## How was this patch tested? Java unit test Author: Xusen Yin Closes #14051 from yinxusen/SPARK-16372. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c6f00d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c6f00d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c6f00d0 Branch: refs/heads/master Commit: 4c6f00d09c016dfc1d2de6e694dff219c9027fa0 Parents: 986b251 Author: Xusen Yin Authored: Thu Jul 7 11:28:04 2016 +0100 Committer: Sean Owen Committed: Thu Jul 7 11:28:04 2016 +0100 -- .../spark/mllib/api/python/PythonMLLibAPI.scala | 2 +- .../mllib/linalg/distributed/RowMatrix.scala| 2 +- .../linalg/distributed/JavaRowMatrixSuite.java | 44 3 files changed, 46 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4c6f00d0/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index f4819f7..a80cca7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1127,7 +1127,7 @@ private[python] class PythonMLLibAPI extends Serializable { * Wrapper around RowMatrix constructor. */ def createRowMatrix(rows: JavaRDD[Vector], numRows: Long, numCols: Int): RowMatrix = { -new RowMatrix(rows.rdd.retag(classOf[Vector]), numRows, numCols) +new RowMatrix(rows.rdd, numRows, numCols) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/4c6f00d0/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index cd5209d..1c94479 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -537,7 +537,7 @@ class RowMatrix @Since("1.0.0") ( def tallSkinnyQR(computeQ: Boolean = false): QRDecomposition[RowMatrix, Matrix] = { val col = numCols().toInt // split rows horizontally into smaller matrices, and compute QR for each of them -val blockQRs = rows.glom().map { partRows => +val blockQRs = rows.retag(classOf[Vector]).glom().map { partRows => val bdm = BDM.zeros[Double](partRows.length, col) var i = 0 partRows.foreach { row => http://git-wip-us.apache.org/repos/asf/spark/blob/4c6f00d0/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java -- diff --git a/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java b/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java new file mode 100644 index 000..c01af40 --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg.distri
spark git commit: [SPARK-16372][MLLIB] Retag RDD to tallSkinnyQR of RowMatrix
Repository: spark Updated Branches: refs/heads/branch-2.0 d63428af6 -> 24933355c [SPARK-16372][MLLIB] Retag RDD to tallSkinnyQR of RowMatrix ## What changes were proposed in this pull request? The following Java code because of type erasing: ```Java JavaRDD rows = jsc.parallelize(...); RowMatrix mat = new RowMatrix(rows.rdd()); QRDecomposition result = mat.tallSkinnyQR(true); ``` We should use retag to restore the type to prevent the following exception: ```Java java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.spark.mllib.linalg.Vector; ``` ## How was this patch tested? Java unit test Author: Xusen Yin Closes #14051 from yinxusen/SPARK-16372. (cherry picked from commit 4c6f00d09c016dfc1d2de6e694dff219c9027fa0) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24933355 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24933355 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24933355 Branch: refs/heads/branch-2.0 Commit: 24933355c7211bbf6bc5bebfad91ed783d6b6a51 Parents: d63428a Author: Xusen Yin Authored: Thu Jul 7 11:28:04 2016 +0100 Committer: Sean Owen Committed: Thu Jul 7 11:28:13 2016 +0100 -- .../spark/mllib/api/python/PythonMLLibAPI.scala | 2 +- .../mllib/linalg/distributed/RowMatrix.scala| 2 +- .../linalg/distributed/JavaRowMatrixSuite.java | 44 3 files changed, 46 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/24933355/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index f4819f7..a80cca7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1127,7 +1127,7 @@ private[python] class PythonMLLibAPI extends Serializable { * Wrapper around RowMatrix constructor. */ def createRowMatrix(rows: JavaRDD[Vector], numRows: Long, numCols: Int): RowMatrix = { -new RowMatrix(rows.rdd.retag(classOf[Vector]), numRows, numCols) +new RowMatrix(rows.rdd, numRows, numCols) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/24933355/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index cd5209d..1c94479 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -537,7 +537,7 @@ class RowMatrix @Since("1.0.0") ( def tallSkinnyQR(computeQ: Boolean = false): QRDecomposition[RowMatrix, Matrix] = { val col = numCols().toInt // split rows horizontally into smaller matrices, and compute QR for each of them -val blockQRs = rows.glom().map { partRows => +val blockQRs = rows.retag(classOf[Vector]).glom().map { partRows => val bdm = BDM.zeros[Double](partRows.length, col) var i = 0 partRows.foreach { row => http://git-wip-us.apache.org/repos/asf/spark/blob/24933355/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java -- diff --git a/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java b/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java new file mode 100644 index 000..c01af40 --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing
spark git commit: [SPARK-16399][PYSPARK] Force PYSPARK_PYTHON to python
Repository: spark Updated Branches: refs/heads/master 4c6f00d09 -> 6343f6655 [SPARK-16399][PYSPARK] Force PYSPARK_PYTHON to python ## What changes were proposed in this pull request? I would like to change ```bash if hash python2.7 2>/dev/null; then # Attempt to use Python 2.7, if installed: DEFAULT_PYTHON="python2.7" else DEFAULT_PYTHON="python" fi ``` to just ```DEFAULT_PYTHON="python"``` I'm not sure if it is a great assumption that python2.7 is used by default, when python points to something else. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: MechCoder Closes #14016 from MechCoder/followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6343f665 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6343f665 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6343f665 Branch: refs/heads/master Commit: 6343f66557434ce889a25a7889d76d0d24188ced Parents: 4c6f00d Author: MechCoder Authored: Thu Jul 7 11:31:10 2016 +0100 Committer: Sean Owen Committed: Thu Jul 7 11:31:10 2016 +0100 -- bin/pyspark | 14 +++--- 1 file changed, 3 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6343f665/bin/pyspark -- diff --git a/bin/pyspark b/bin/pyspark index ac8aa04..a0d7e22 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -30,14 +30,6 @@ export _SPARK_CMD_USAGE="Usage: ./bin/pyspark [options]" # (e.g. PYSPARK_DRIVER_PYTHON_OPTS='notebook'). This supports full customization of the IPython # and executor Python executables. -# Determine the Python executable to use if PYSPARK_PYTHON or PYSPARK_DRIVER_PYTHON isn't set: -if hash python2.7 2>/dev/null; then - # Attempt to use Python 2.7, if installed: - DEFAULT_PYTHON="python2.7" -else - DEFAULT_PYTHON="python" -fi - # Fail noisily if removed options are set if [[ -n "$IPYTHON" || -n "$IPYTHON_OPTS" ]]; then echo "Error in pyspark startup:" @@ -47,10 +39,10 @@ fi # Default to standard python interpreter unless told otherwise if [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then - PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"$DEFAULT_PYTHON"}" + PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"python"}" fi -WORKS_WITH_IPYTHON=$($DEFAULT_PYTHON -c 'import sys; print(sys.version_info >= (2, 7, 0))') +WORKS_WITH_IPYTHON=$(python -c 'import sys; print(sys.version_info >= (2, 7, 0))') # Determine the Python executable to use for the executors: if [[ -z "$PYSPARK_PYTHON" ]]; then @@ -58,7 +50,7 @@ if [[ -z "$PYSPARK_PYTHON" ]]; then echo "IPython requires Python 2.7+; please install python2.7 or set PYSPARK_PYTHON" 1>&2 exit 1 else -PYSPARK_PYTHON="$DEFAULT_PYTHON" +PYSPARK_PYTHON=python fi fi export PYSPARK_PYTHON - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16174][SQL] Improve `OptimizeIn` optimizer to remove literal repetitions
Repository: spark Updated Branches: refs/heads/master 6343f6655 -> a04cab8f1 [SPARK-16174][SQL] Improve `OptimizeIn` optimizer to remove literal repetitions ## What changes were proposed in this pull request? This PR improves `OptimizeIn` optimizer to remove the literal repetitions from SQL `IN` predicates. This optimizer prevents user mistakes and also can optimize some queries like [TPCDS-36](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q36.sql#L19). **Before** ```scala scala> sql("select state from (select explode(array('CA','TN')) state) where state in ('TN','TN','TN','TN','TN','TN','TN')").explain == Physical Plan == *Filter state#6 IN (TN,TN,TN,TN,TN,TN,TN) +- Generate explode([CA,TN]), false, false, [state#6] +- Scan OneRowRelation[] ``` **After** ```scala scala> sql("select state from (select explode(array('CA','TN')) state) where state in ('TN','TN','TN','TN','TN','TN','TN')").explain == Physical Plan == *Filter state#6 IN (TN) +- Generate explode([CA,TN]), false, false, [state#6] +- Scan OneRowRelation[] ``` ## How was this patch tested? Pass the Jenkins tests (including a new testcase). Author: Dongjoon Hyun Closes #13876 from dongjoon-hyun/SPARK-16174. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a04cab8f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a04cab8f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a04cab8f Branch: refs/heads/master Commit: a04cab8f17fcac05f86d2c472558ab98923f91e3 Parents: 6343f66 Author: Dongjoon Hyun Authored: Thu Jul 7 19:45:43 2016 +0800 Committer: Wenchen Fan Committed: Thu Jul 7 19:45:43 2016 +0800 -- .../sql/catalyst/expressions/predicates.scala | 1 + .../sql/catalyst/optimizer/Optimizer.scala | 20 +++- .../catalyst/optimizer/OptimizeInSuite.scala| 24 3 files changed, 39 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a04cab8f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index a3b098a..734bacf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -132,6 +132,7 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate } override def children: Seq[Expression] = value +: list + lazy val inSetConvertible = list.forall(_.isInstanceOf[Literal]) override def nullable: Boolean = children.exists(_.nullable) override def foldable: Boolean = children.forall(_.foldable) http://git-wip-us.apache.org/repos/asf/spark/blob/a04cab8f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9ee1735..03d15ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -820,16 +820,24 @@ object ConstantFolding extends Rule[LogicalPlan] { } /** - * Replaces [[In (value, seq[Literal])]] with optimized version[[InSet (value, HashSet[Literal])]] - * which is much faster + * Optimize IN predicates: + * 1. Removes literal repetitions. + * 2. Replaces [[In (value, seq[Literal])]] with optimized version + *[[InSet (value, HashSet[Literal])]] which is much faster. */ case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) && - list.size > conf.optimizerInSetConversionThreshold => -val hSet = list.map(e => e.eval(EmptyRow)) -InSet(v, HashSet() ++ hSet) + case expr @ In(v, list) if expr.inSetConvertible => +val newList = ExpressionSet(list).toSeq +if (newList.size > conf.optimizerInSetConversionThreshold) { + val hSet = newList.map(e => e.eval(EmptyRow)) + InSet(v, HashSet() ++ hSet) +} else if (newList.size < list.size) { + expr.copy(list = newList) +} else { // newList.length == list.length + expr +} } } } htt
spark git commit: Revert "[SPARK-16372][MLLIB] Retag RDD to tallSkinnyQR of RowMatrix"
Repository: spark Updated Branches: refs/heads/branch-1.6 45dda9221 -> bb92788f9 Revert "[SPARK-16372][MLLIB] Retag RDD to tallSkinnyQR of RowMatrix" This reverts commit 45dda92214191310a56333a2085e2343eba170cd. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb92788f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb92788f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb92788f Branch: refs/heads/branch-1.6 Commit: bb92788f96426e57555ba5771e256c6425e0e75e Parents: 45dda92 Author: Shixiong Zhu Authored: Thu Jul 7 10:34:50 2016 -0700 Committer: Shixiong Zhu Committed: Thu Jul 7 10:34:50 2016 -0700 -- .../spark/mllib/api/python/PythonMLLibAPI.scala | 2 +- .../mllib/linalg/distributed/RowMatrix.scala| 2 +- .../linalg/distributed/JavaRowMatrixSuite.java | 44 3 files changed, 2 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bb92788f/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index a059e38..1714983 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1110,7 +1110,7 @@ private[python] class PythonMLLibAPI extends Serializable { * Wrapper around RowMatrix constructor. */ def createRowMatrix(rows: JavaRDD[Vector], numRows: Long, numCols: Int): RowMatrix = { -new RowMatrix(rows.rdd, numRows, numCols) +new RowMatrix(rows.rdd.retag(classOf[Vector]), numRows, numCols) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/bb92788f/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index b941d1f..52c0f19 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -526,7 +526,7 @@ class RowMatrix @Since("1.0.0") ( def tallSkinnyQR(computeQ: Boolean = false): QRDecomposition[RowMatrix, Matrix] = { val col = numCols().toInt // split rows horizontally into smaller matrices, and compute QR for each of them -val blockQRs = rows.retag(classOf[Vector]).glom().map { partRows => +val blockQRs = rows.glom().map { partRows => val bdm = BDM.zeros[Double](partRows.length, col) var i = 0 partRows.foreach { row => http://git-wip-us.apache.org/repos/asf/spark/blob/bb92788f/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java -- diff --git a/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java b/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java deleted file mode 100644 index c01af40..000 --- a/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.linalg.distributed; - -import java.util.Arrays; - -import org.junit.Test; - -import org.apache.spark.SharedSparkSession; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.mllib.linalg.Matrix; -import org.apache.spark.mllib.linalg.QRDecomposition; -import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.mllib.linalg.Vectors; - -public class JavaRowMatrixSuite extends SharedSparkSession { - - @Test - public void rowMatrixQRDecomposition() {
spark git commit: [SPARK-16350][SQL] Fix support for incremental planning in wirteStream.foreach()
Repository: spark Updated Branches: refs/heads/master a04cab8f1 -> 0f7175def [SPARK-16350][SQL] Fix support for incremental planning in wirteStream.foreach() ## What changes were proposed in this pull request? There are cases where `complete` output mode does not output updated aggregated value; for details please refer to [SPARK-16350](https://issues.apache.org/jira/browse/SPARK-16350). The cause is that, as we do `data.as[T].foreachPartition { iter => ... }` in `ForeachSink.addBatch()`, `foreachPartition()` does not support incremental planning for now. This patches makes `foreachPartition()` support incremental planning in `ForeachSink`, by making a special version of `Dataset` with its `rdd()` method supporting incremental planning. ## How was this patch tested? Added a unit test which failed before the change Author: Liwei Lin Closes #14030 from lw-lin/fix-foreach-complete. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f7175de Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f7175de Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f7175de Branch: refs/heads/master Commit: 0f7175def985a7f1e37198680f893e749612ab76 Parents: a04cab8 Author: Liwei Lin Authored: Thu Jul 7 10:40:42 2016 -0700 Committer: Shixiong Zhu Committed: Thu Jul 7 10:40:42 2016 -0700 -- .../sql/execution/streaming/ForeachSink.scala | 40 - .../streaming/IncrementalExecution.scala| 4 +- .../execution/streaming/ForeachSinkSuite.scala | 86 ++-- 3 files changed, 117 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0f7175de/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala index 14b9b1c..082664a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Encoder, ForeachWriter} +import org.apache.spark.sql.catalyst.plans.logical.CatalystSerde /** * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by @@ -30,7 +32,41 @@ import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { override def addBatch(batchId: Long, data: DataFrame): Unit = { -data.as[T].foreachPartition { iter => +// TODO: Refine this method when SPARK-16264 is resolved; see comments below. + +// This logic should've been as simple as: +// ``` +// data.as[T].foreachPartition { iter => ... } +// ``` +// +// Unfortunately, doing that would just break the incremental planing. The reason is, +// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` just +// does not support `IncrementalExecution`. +// +// So as a provisional fix, below we've made a special version of `Dataset` with its `rdd()` +// method supporting incremental planning. But in the long run, we should generally make newly +// created Datasets use `IncrementalExecution` where necessary (which is SPARK-16264 tries to +// resolve). + +val datasetWithIncrementalExecution = + new Dataset(data.sparkSession, data.logicalPlan, implicitly[Encoder[T]]) { +override lazy val rdd: RDD[T] = { + val objectType = exprEnc.deserializer.dataType + val deserialized = CatalystSerde.deserialize[T](logicalPlan) + + // was originally: sparkSession.sessionState.executePlan(deserialized) ... + val incrementalExecution = new IncrementalExecution( +this.sparkSession, +deserialized, +data.queryExecution.asInstanceOf[IncrementalExecution].outputMode, + data.queryExecution.asInstanceOf[IncrementalExecution].checkpointLocation, + data.queryExecution.asInstanceOf[IncrementalExecution].currentBatchId) + incrementalExecution.toRdd.mapPartitions { rows => +rows.map(_.get(0, objectType)) + }.asInstanceOf[RDD[T]] +} + } +datasetWithIncrementalExecution.foreachPartition { iter => if (writer.open(TaskContext.getPartitionId(), batchId)) { var isFailed = fals
spark git commit: [SPARK-16350][SQL] Fix support for incremental planning in wirteStream.foreach()
Repository: spark Updated Branches: refs/heads/branch-2.0 24933355c -> cbfd94eac [SPARK-16350][SQL] Fix support for incremental planning in wirteStream.foreach() ## What changes were proposed in this pull request? There are cases where `complete` output mode does not output updated aggregated value; for details please refer to [SPARK-16350](https://issues.apache.org/jira/browse/SPARK-16350). The cause is that, as we do `data.as[T].foreachPartition { iter => ... }` in `ForeachSink.addBatch()`, `foreachPartition()` does not support incremental planning for now. This patches makes `foreachPartition()` support incremental planning in `ForeachSink`, by making a special version of `Dataset` with its `rdd()` method supporting incremental planning. ## How was this patch tested? Added a unit test which failed before the change Author: Liwei Lin Closes #14030 from lw-lin/fix-foreach-complete. (cherry picked from commit 0f7175def985a7f1e37198680f893e749612ab76) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbfd94ea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbfd94ea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbfd94ea Branch: refs/heads/branch-2.0 Commit: cbfd94eacf46b61011f1bd8d30f0c134cab37b09 Parents: 2493335 Author: Liwei Lin Authored: Thu Jul 7 10:40:42 2016 -0700 Committer: Shixiong Zhu Committed: Thu Jul 7 10:40:52 2016 -0700 -- .../sql/execution/streaming/ForeachSink.scala | 40 - .../streaming/IncrementalExecution.scala| 4 +- .../execution/streaming/ForeachSinkSuite.scala | 86 ++-- 3 files changed, 117 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cbfd94ea/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala index 14b9b1c..082664a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Encoder, ForeachWriter} +import org.apache.spark.sql.catalyst.plans.logical.CatalystSerde /** * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by @@ -30,7 +32,41 @@ import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { override def addBatch(batchId: Long, data: DataFrame): Unit = { -data.as[T].foreachPartition { iter => +// TODO: Refine this method when SPARK-16264 is resolved; see comments below. + +// This logic should've been as simple as: +// ``` +// data.as[T].foreachPartition { iter => ... } +// ``` +// +// Unfortunately, doing that would just break the incremental planing. The reason is, +// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` just +// does not support `IncrementalExecution`. +// +// So as a provisional fix, below we've made a special version of `Dataset` with its `rdd()` +// method supporting incremental planning. But in the long run, we should generally make newly +// created Datasets use `IncrementalExecution` where necessary (which is SPARK-16264 tries to +// resolve). + +val datasetWithIncrementalExecution = + new Dataset(data.sparkSession, data.logicalPlan, implicitly[Encoder[T]]) { +override lazy val rdd: RDD[T] = { + val objectType = exprEnc.deserializer.dataType + val deserialized = CatalystSerde.deserialize[T](logicalPlan) + + // was originally: sparkSession.sessionState.executePlan(deserialized) ... + val incrementalExecution = new IncrementalExecution( +this.sparkSession, +deserialized, +data.queryExecution.asInstanceOf[IncrementalExecution].outputMode, + data.queryExecution.asInstanceOf[IncrementalExecution].checkpointLocation, + data.queryExecution.asInstanceOf[IncrementalExecution].currentBatchId) + incrementalExecution.toRdd.mapPartitions { rows => +rows.map(_.get(0, objectType)) + }.asInstanceOf[RDD[T]] +} + } +datasetWithIncrementalExecution.foreachPartit
spark git commit: [SPARK-16415][SQL] fix catalog string error
Repository: spark Updated Branches: refs/heads/branch-2.0 cbfd94eac -> 30cb3f1d3 [SPARK-16415][SQL] fix catalog string error ## What changes were proposed in this pull request? In #13537 we truncate `simpleString` if it is a long `StructType`. But sometimes we need `catalogString` to reconstruct `TypeInfo`, for example in description of [SPARK-16415 ](https://issues.apache.org/jira/browse/SPARK-16415). So we need to keep the implementation of `catalogString` not affected by our truncate. ## How was this patch tested? added a test case. Author: Daoyuan Wang Closes #14089 from adrian-wang/catalogstring. (cherry picked from commit 28710b42b0d18a55bd64d597558649537259b127) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30cb3f1d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30cb3f1d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30cb3f1d Branch: refs/heads/branch-2.0 Commit: 30cb3f1d3a1d413568d586e6b8df56f74f05d80e Parents: cbfd94e Author: Daoyuan Wang Authored: Thu Jul 7 11:08:06 2016 -0700 Committer: Reynold Xin Committed: Thu Jul 7 11:08:12 2016 -0700 -- .../scala/org/apache/spark/sql/types/StructType.scala | 6 ++ .../spark/sql/hive/HiveMetastoreCatalogSuite.scala| 14 +++--- 2 files changed, 17 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/30cb3f1d/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index effef54..55fdfbe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -298,6 +298,12 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru Utils.truncatedString(fieldTypes, "struct<", ",", ">") } + override def catalogString: String = { +// in catalogString, we should not truncate +val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.catalogString}") +s"struct<${fieldTypes.mkString(",")}>" + } + override def sql: String = { val fieldTypes = fields.map(f => s"${quoteIdentifier(f.name)}: ${f.dataType.sql}") s"STRUCT<${fieldTypes.mkString(", ")}>" http://git-wip-us.apache.org/repos/asf/spark/blob/30cb3f1d/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index b420781..754aabb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -26,15 +26,15 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils} -import org.apache.spark.sql.types.{DecimalType, StringType, StructType} +import org.apache.spark.sql.types.{DecimalType, StringType, StructField, StructType} class HiveMetastoreCatalogSuite extends TestHiveSingleton { import spark.implicits._ test("struct field should accept underscore in sub-column name") { val hiveTypeStr = "struct" -val dateType = CatalystSqlParser.parseDataType(hiveTypeStr) -assert(dateType.isInstanceOf[StructType]) +val dataType = CatalystSqlParser.parseDataType(hiveTypeStr) +assert(dataType.isInstanceOf[StructType]) } test("udt to metastore type conversion") { @@ -49,6 +49,14 @@ class HiveMetastoreCatalogSuite extends TestHiveSingleton { logInfo(df.queryExecution.toString) df.as('a).join(df.as('b), $"a.key" === $"b.key") } + + test("should not truncate struct type catalog string") { +def field(n: Int): StructField = { + StructField("col" + n, StringType) +} +val dataType = StructType((1 to 100).map(field)) +assert(CatalystSqlParser.parseDataType(dataType.catalogString) == dataType) + } } class DataSourceWithHiveMetastoreCatalogSuite - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16415][SQL] fix catalog string error
Repository: spark Updated Branches: refs/heads/master 0f7175def -> 28710b42b [SPARK-16415][SQL] fix catalog string error ## What changes were proposed in this pull request? In #13537 we truncate `simpleString` if it is a long `StructType`. But sometimes we need `catalogString` to reconstruct `TypeInfo`, for example in description of [SPARK-16415 ](https://issues.apache.org/jira/browse/SPARK-16415). So we need to keep the implementation of `catalogString` not affected by our truncate. ## How was this patch tested? added a test case. Author: Daoyuan Wang Closes #14089 from adrian-wang/catalogstring. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28710b42 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28710b42 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28710b42 Branch: refs/heads/master Commit: 28710b42b0d18a55bd64d597558649537259b127 Parents: 0f7175d Author: Daoyuan Wang Authored: Thu Jul 7 11:08:06 2016 -0700 Committer: Reynold Xin Committed: Thu Jul 7 11:08:06 2016 -0700 -- .../scala/org/apache/spark/sql/types/StructType.scala | 6 ++ .../spark/sql/hive/HiveMetastoreCatalogSuite.scala| 14 +++--- 2 files changed, 17 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/28710b42/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 0c2ebb0..dd4c88c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -333,6 +333,12 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru Utils.truncatedString(fieldTypes, "struct<", ",", ">") } + override def catalogString: String = { +// in catalogString, we should not truncate +val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.catalogString}") +s"struct<${fieldTypes.mkString(",")}>" + } + override def sql: String = { val fieldTypes = fields.map(f => s"${quoteIdentifier(f.name)}: ${f.dataType.sql}") s"STRUCT<${fieldTypes.mkString(", ")}>" http://git-wip-us.apache.org/repos/asf/spark/blob/28710b42/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index b420781..754aabb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -26,15 +26,15 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils} -import org.apache.spark.sql.types.{DecimalType, StringType, StructType} +import org.apache.spark.sql.types.{DecimalType, StringType, StructField, StructType} class HiveMetastoreCatalogSuite extends TestHiveSingleton { import spark.implicits._ test("struct field should accept underscore in sub-column name") { val hiveTypeStr = "struct" -val dateType = CatalystSqlParser.parseDataType(hiveTypeStr) -assert(dateType.isInstanceOf[StructType]) +val dataType = CatalystSqlParser.parseDataType(hiveTypeStr) +assert(dataType.isInstanceOf[StructType]) } test("udt to metastore type conversion") { @@ -49,6 +49,14 @@ class HiveMetastoreCatalogSuite extends TestHiveSingleton { logInfo(df.queryExecution.toString) df.as('a).join(df.as('b), $"a.key" === $"b.key") } + + test("should not truncate struct type catalog string") { +def field(n: Int): StructField = { + StructField("col" + n, StringType) +} +val dataType = StructType((1 to 100).map(field)) +assert(CatalystSqlParser.parseDataType(dataType.catalogString) == dataType) + } } class DataSourceWithHiveMetastoreCatalogSuite - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16310][SPARKR] R na.string-like default for csv source
Repository: spark Updated Branches: refs/heads/branch-2.0 30cb3f1d3 -> 5828da41c [SPARK-16310][SPARKR] R na.string-like default for csv source ## What changes were proposed in this pull request? Apply default "NA" as null string for R, like R read.csv na.string parameter. https://stat.ethz.ch/R-manual/R-devel/library/utils/html/read.table.html na.strings = "NA" An user passing a csv file with NA value should get the same behavior with SparkR read.df(... source = "csv") (couldn't open JIRA, will do that later) ## How was this patch tested? unit tests shivaram Author: Felix Cheung Closes #13984 from felixcheung/rcsvnastring. (cherry picked from commit f4767bcc7a9d1bdd301f054776aa45e7c9f344a7) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5828da41 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5828da41 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5828da41 Branch: refs/heads/branch-2.0 Commit: 5828da41cb2d815708191bd9a5cf3bd82795aa41 Parents: 30cb3f1 Author: Felix Cheung Authored: Thu Jul 7 15:21:57 2016 -0700 Committer: Shivaram Venkataraman Committed: Thu Jul 7 15:22:06 2016 -0700 -- R/pkg/R/SQLContext.R | 10 ++-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 32 +- 2 files changed, 34 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5828da41/R/pkg/R/SQLContext.R -- diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 8df73db..bc0daa2 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -714,11 +714,14 @@ dropTempView <- function(viewName) { #' #' The data source is specified by the `source` and a set of options(...). #' If `source` is not specified, the default data source configured by -#' "spark.sql.sources.default" will be used. +#' "spark.sql.sources.default" will be used. \cr +#' Similar to R read.csv, when `source` is "csv", by default, a value of "NA" will be interpreted +#' as NA. #' #' @param path The path of files to load #' @param source The name of external data source #' @param schema The data schema defined in structType +#' @param na.strings Default string value for NA when source is "csv" #' @return SparkDataFrame #' @rdname read.df #' @name read.df @@ -735,7 +738,7 @@ dropTempView <- function(viewName) { #' @name read.df #' @method read.df default #' @note read.df since 1.4.0 -read.df.default <- function(path = NULL, source = NULL, schema = NULL, ...) { +read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.strings = "NA", ...) { sparkSession <- getSparkSession() options <- varargsToEnv(...) if (!is.null(path)) { @@ -744,6 +747,9 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, ...) { if (is.null(source)) { source <- getDefaultSqlSource() } + if (source == "csv" && is.null(options[["nullValue"]])) { +options[["nullValue"]] <- na.strings + } if (!is.null(schema)) { stopifnot(class(schema) == "structType") sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession, source, http://git-wip-us.apache.org/repos/asf/spark/blob/5828da41/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index d22baf6..003fcce 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -213,15 +213,35 @@ test_that("read csv as DataFrame", { mockLinesCsv <- c("year,make,model,comment,blank", "\"2012\",\"Tesla\",\"S\",\"No comment\",", "1997,Ford,E350,\"Go get one now they are going fast\",", - "2015,Chevy,Volt") + "2015,Chevy,Volt", + "NA,Dummy,Placeholder") writeLines(mockLinesCsv, csvPath) - # default "header" is false - df <- read.df(csvPath, "csv", header = "true") - expect_equal(count(df), 3) + # default "header" is false, inferSchema to handle "year" as "int" + df <- read.df(csvPath, "csv", header = "true", inferSchema = "true") + expect_equal(count(df), 4) expect_equal(columns(df), c("year", "make", "model", "comment", "blank")) - expect_equal(sort(unlist(collect(where(df, df$year == "2015", - sort(unlist(list(year = "2015", make = "Chevy", model = "Volt" + expect_equal(sort(unlist(collect(where(df, df$year == 2015, + sort(unlist(list(year = 2015, make = "Chevy", model = "Volt" + + # since "year" is "int", let's skip the NA values + withoutna <- na.omit(df, how = "any", co
spark git commit: [SPARK-16310][SPARKR] R na.string-like default for csv source
Repository: spark Updated Branches: refs/heads/master 28710b42b -> f4767bcc7 [SPARK-16310][SPARKR] R na.string-like default for csv source ## What changes were proposed in this pull request? Apply default "NA" as null string for R, like R read.csv na.string parameter. https://stat.ethz.ch/R-manual/R-devel/library/utils/html/read.table.html na.strings = "NA" An user passing a csv file with NA value should get the same behavior with SparkR read.df(... source = "csv") (couldn't open JIRA, will do that later) ## How was this patch tested? unit tests shivaram Author: Felix Cheung Closes #13984 from felixcheung/rcsvnastring. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f4767bcc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f4767bcc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f4767bcc Branch: refs/heads/master Commit: f4767bcc7a9d1bdd301f054776aa45e7c9f344a7 Parents: 28710b4 Author: Felix Cheung Authored: Thu Jul 7 15:21:57 2016 -0700 Committer: Shivaram Venkataraman Committed: Thu Jul 7 15:21:57 2016 -0700 -- R/pkg/R/SQLContext.R | 10 ++-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 32 +- 2 files changed, 34 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f4767bcc/R/pkg/R/SQLContext.R -- diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 8df73db..bc0daa2 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -714,11 +714,14 @@ dropTempView <- function(viewName) { #' #' The data source is specified by the `source` and a set of options(...). #' If `source` is not specified, the default data source configured by -#' "spark.sql.sources.default" will be used. +#' "spark.sql.sources.default" will be used. \cr +#' Similar to R read.csv, when `source` is "csv", by default, a value of "NA" will be interpreted +#' as NA. #' #' @param path The path of files to load #' @param source The name of external data source #' @param schema The data schema defined in structType +#' @param na.strings Default string value for NA when source is "csv" #' @return SparkDataFrame #' @rdname read.df #' @name read.df @@ -735,7 +738,7 @@ dropTempView <- function(viewName) { #' @name read.df #' @method read.df default #' @note read.df since 1.4.0 -read.df.default <- function(path = NULL, source = NULL, schema = NULL, ...) { +read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.strings = "NA", ...) { sparkSession <- getSparkSession() options <- varargsToEnv(...) if (!is.null(path)) { @@ -744,6 +747,9 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, ...) { if (is.null(source)) { source <- getDefaultSqlSource() } + if (source == "csv" && is.null(options[["nullValue"]])) { +options[["nullValue"]] <- na.strings + } if (!is.null(schema)) { stopifnot(class(schema) == "structType") sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession, source, http://git-wip-us.apache.org/repos/asf/spark/blob/f4767bcc/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index a3aa26d..a0ab719 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -213,15 +213,35 @@ test_that("read csv as DataFrame", { mockLinesCsv <- c("year,make,model,comment,blank", "\"2012\",\"Tesla\",\"S\",\"No comment\",", "1997,Ford,E350,\"Go get one now they are going fast\",", - "2015,Chevy,Volt") + "2015,Chevy,Volt", + "NA,Dummy,Placeholder") writeLines(mockLinesCsv, csvPath) - # default "header" is false - df <- read.df(csvPath, "csv", header = "true") - expect_equal(count(df), 3) + # default "header" is false, inferSchema to handle "year" as "int" + df <- read.df(csvPath, "csv", header = "true", inferSchema = "true") + expect_equal(count(df), 4) expect_equal(columns(df), c("year", "make", "model", "comment", "blank")) - expect_equal(sort(unlist(collect(where(df, df$year == "2015", - sort(unlist(list(year = "2015", make = "Chevy", model = "Volt" + expect_equal(sort(unlist(collect(where(df, df$year == 2015, + sort(unlist(list(year = 2015, make = "Chevy", model = "Volt" + + # since "year" is "int", let's skip the NA values + withoutna <- na.omit(df, how = "any", cols = "year") + expect_equal(count(withoutna), 3) + + unlink(csvPath) + csvPath <- tempfile(pattern = "sparkr-test
spark git commit: [SPARK-16425][R] `describe()` should not fail with non-numeric columns
Repository: spark Updated Branches: refs/heads/master f4767bcc7 -> 6aa7d09f4 [SPARK-16425][R] `describe()` should not fail with non-numeric columns ## What changes were proposed in this pull request? This PR prevents ERRORs when `summary(df)` is called for `SparkDataFrame` with not-numeric columns. This failure happens only in `SparkR`. **Before** ```r > df <- createDataFrame(faithful) > df <- withColumn(df, "boolean", df$waiting==79) > summary(df) 16/07/07 14:15:16 ERROR RBackendHandler: describe on 34 failed Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : org.apache.spark.sql.AnalysisException: cannot resolve 'avg(`boolean`)' due to data type mismatch: function average requires numeric types, not BooleanType; ``` **After** ```r > df <- createDataFrame(faithful) > df <- withColumn(df, "boolean", df$waiting==79) > summary(df) SparkDataFrame[summary:string, eruptions:string, waiting:string] ``` ## How was this patch tested? Pass the Jenkins with a updated testcase. Author: Dongjoon Hyun Closes #14096 from dongjoon-hyun/SPARK-16425. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6aa7d09f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6aa7d09f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6aa7d09f Branch: refs/heads/master Commit: 6aa7d09f4e126f42e41085dec169c813379ed354 Parents: f4767bc Author: Dongjoon Hyun Authored: Thu Jul 7 17:47:29 2016 -0700 Committer: Shivaram Venkataraman Committed: Thu Jul 7 17:47:29 2016 -0700 -- R/pkg/R/DataFrame.R | 3 +-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 8 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6aa7d09f/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 5944bbc..a18eee3 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2622,8 +2622,7 @@ setMethod("describe", setMethod("describe", signature(x = "SparkDataFrame"), function(x) { -colList <- as.list(c(columns(x))) -sdf <- callJMethod(x@sdf, "describe", colList) +sdf <- callJMethod(x@sdf, "describe", list()) dataFrame(sdf) }) http://git-wip-us.apache.org/repos/asf/spark/blob/6aa7d09f/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index a0ab719..e2a1da0 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1824,13 +1824,17 @@ test_that("describe() and summarize() on a DataFrame", { expect_equal(collect(stats)[2, "age"], "24.5") expect_equal(collect(stats)[3, "age"], "7.7781745930520225") stats <- describe(df) - expect_equal(collect(stats)[4, "name"], "Andy") + expect_equal(collect(stats)[4, "name"], NULL) expect_equal(collect(stats)[5, "age"], "30") stats2 <- summary(df) - expect_equal(collect(stats2)[4, "name"], "Andy") + expect_equal(collect(stats2)[4, "name"], NULL) expect_equal(collect(stats2)[5, "age"], "30") + # SPARK-16425: SparkR summary() fails on column of type logical + df <- withColumn(df, "boolean", df$age == 30) + summary(df) + # Test base::summary is working expect_equal(length(summary(attenu, digits = 4)), 35) }) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16425][R] `describe()` should not fail with non-numeric columns
Repository: spark Updated Branches: refs/heads/branch-2.0 5828da41c -> 73c764a04 [SPARK-16425][R] `describe()` should not fail with non-numeric columns ## What changes were proposed in this pull request? This PR prevents ERRORs when `summary(df)` is called for `SparkDataFrame` with not-numeric columns. This failure happens only in `SparkR`. **Before** ```r > df <- createDataFrame(faithful) > df <- withColumn(df, "boolean", df$waiting==79) > summary(df) 16/07/07 14:15:16 ERROR RBackendHandler: describe on 34 failed Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : org.apache.spark.sql.AnalysisException: cannot resolve 'avg(`boolean`)' due to data type mismatch: function average requires numeric types, not BooleanType; ``` **After** ```r > df <- createDataFrame(faithful) > df <- withColumn(df, "boolean", df$waiting==79) > summary(df) SparkDataFrame[summary:string, eruptions:string, waiting:string] ``` ## How was this patch tested? Pass the Jenkins with a updated testcase. Author: Dongjoon Hyun Closes #14096 from dongjoon-hyun/SPARK-16425. (cherry picked from commit 6aa7d09f4e126f42e41085dec169c813379ed354) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73c764a0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73c764a0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73c764a0 Branch: refs/heads/branch-2.0 Commit: 73c764a047f795c85909c7a7ea4324f286d2aafa Parents: 5828da4 Author: Dongjoon Hyun Authored: Thu Jul 7 17:47:29 2016 -0700 Committer: Shivaram Venkataraman Committed: Thu Jul 7 17:47:38 2016 -0700 -- R/pkg/R/DataFrame.R | 3 +-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 8 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/73c764a0/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 17474d4..ec09aab 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2617,8 +2617,7 @@ setMethod("describe", setMethod("describe", signature(x = "SparkDataFrame"), function(x) { -colList <- as.list(c(columns(x))) -sdf <- callJMethod(x@sdf, "describe", colList) +sdf <- callJMethod(x@sdf, "describe", list()) dataFrame(sdf) }) http://git-wip-us.apache.org/repos/asf/spark/blob/73c764a0/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 003fcce..755aded 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1816,13 +1816,17 @@ test_that("describe() and summarize() on a DataFrame", { expect_equal(collect(stats)[2, "age"], "24.5") expect_equal(collect(stats)[3, "age"], "7.7781745930520225") stats <- describe(df) - expect_equal(collect(stats)[4, "name"], "Andy") + expect_equal(collect(stats)[4, "name"], NULL) expect_equal(collect(stats)[5, "age"], "30") stats2 <- summary(df) - expect_equal(collect(stats2)[4, "name"], "Andy") + expect_equal(collect(stats2)[4, "name"], NULL) expect_equal(collect(stats2)[5, "age"], "30") + # SPARK-16425: SparkR summary() fails on column of type logical + df <- withColumn(df, "boolean", df$age == 30) + summary(df) + # Test base::summary is working expect_equal(length(summary(attenu, digits = 4)), 35) }) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16276][SQL] Implement elt SQL function
Repository: spark Updated Branches: refs/heads/branch-2.0 73c764a04 -> 88603bd4f [SPARK-16276][SQL] Implement elt SQL function This patch implements the elt function, as it is implemented in Hive. Added expression unit test in StringExpressionsSuite and end-to-end test in StringFunctionsSuite. Author: petermaxlee Closes #13966 from petermaxlee/SPARK-16276. (cherry picked from commit 85f2303ecadd9bf6d9694a2743dda075654c5ccf) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88603bd4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88603bd4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88603bd4 Branch: refs/heads/branch-2.0 Commit: 88603bd4f9a665ad02df40ed8a0dd78b65c9d152 Parents: 73c764a Author: petermaxlee Authored: Fri Jul 1 07:57:48 2016 +0800 Committer: Reynold Xin Committed: Thu Jul 7 21:00:53 2016 -0700 -- .../catalyst/analysis/FunctionRegistry.scala| 1 + .../expressions/ExpectsInputTypes.scala | 3 +- .../expressions/stringExpressions.scala | 41 .../expressions/StringExpressionsSuite.scala| 23 +++ .../apache/spark/sql/StringFunctionsSuite.scala | 14 +++ .../spark/sql/hive/HiveSessionCatalog.scala | 2 +- 6 files changed, 82 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/88603bd4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 0bde48c..95be0d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -265,6 +265,7 @@ object FunctionRegistry { expression[Concat]("concat"), expression[ConcatWs]("concat_ws"), expression[Decode]("decode"), +expression[Elt]("elt"), expression[Encode]("encode"), expression[FindInSet]("find_in_set"), expression[FormatNumber]("format_number"), http://git-wip-us.apache.org/repos/asf/spark/blob/88603bd4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala index c15a2df..98f25a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala @@ -57,7 +57,8 @@ trait ExpectsInputTypes extends Expression { /** - * A mixin for the analyzer to perform implicit type casting using [[ImplicitTypeCasts]]. + * A mixin for the analyzer to perform implicit type casting using + * [[org.apache.spark.sql.catalyst.analysis.TypeCoercion.ImplicitTypeCasts]]. */ trait ImplicitCastInputTypes extends ExpectsInputTypes { // No other methods http://git-wip-us.apache.org/repos/asf/spark/blob/88603bd4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index 44ff7fd..b0df957 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -21,6 +21,7 @@ import java.text.{DecimalFormat, DecimalFormatSymbols} import java.util.{HashMap, Locale, Map => JMap} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.types._ @@ -162,6 +163,46 @@ case class ConcatWs(children: Seq[Expression]) } } +@ExpressionDescription( + usage = "_FUNC_(n, str1, str2, ...) - returns the n-th string, e.g. returns str2 when n is 2", + extended = "> SELECT _FUNC_(1, 'scala', 'java') FROM src LIMIT 1;\n" + "'scala'") +case class Elt(children: Seq[Expression]) + extends Expression with ImplicitCastInputTypes with CodegenFallback { + + private
spark git commit: [SPARK-16278][SPARK-16279][SQL] Implement map_keys/map_values SQL functions
Repository: spark Updated Branches: refs/heads/branch-2.0 88603bd4f -> 7ef1d1c61 [SPARK-16278][SPARK-16279][SQL] Implement map_keys/map_values SQL functions This PR adds `map_keys` and `map_values` SQL functions in order to remove Hive fallback. Pass the Jenkins tests including new testcases. Author: Dongjoon Hyun Closes #13967 from dongjoon-hyun/SPARK-16278. (cherry picked from commit 54b27c1797fcd32b3f3e9d44e1a149ae396a61e6) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ef1d1c6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ef1d1c6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ef1d1c6 Branch: refs/heads/branch-2.0 Commit: 7ef1d1c618100313dbbdb6f615d9f87ff67e895d Parents: 88603bd Author: Dongjoon Hyun Authored: Sun Jul 3 16:59:40 2016 +0800 Committer: Reynold Xin Committed: Thu Jul 7 21:02:50 2016 -0700 -- .../catalyst/analysis/FunctionRegistry.scala| 2 + .../expressions/collectionOperations.scala | 48 .../expressions/CollectionFunctionsSuite.scala | 13 ++ .../spark/sql/DataFrameFunctionsSuite.scala | 16 +++ .../spark/sql/hive/HiveSessionCatalog.scala | 1 - 5 files changed, 79 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7ef1d1c6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 95be0d6..27c3a09 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -170,6 +170,8 @@ object FunctionRegistry { expression[IsNotNull]("isnotnull"), expression[Least]("least"), expression[CreateMap]("map"), +expression[MapKeys]("map_keys"), +expression[MapValues]("map_values"), expression[CreateNamedStruct]("named_struct"), expression[NaNvl]("nanvl"), expression[NullIf]("nullif"), http://git-wip-us.apache.org/repos/asf/spark/blob/7ef1d1c6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index c71cb73..2e8ea11 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -44,6 +44,54 @@ case class Size(child: Expression) extends UnaryExpression with ExpectsInputType } /** + * Returns an unordered array containing the keys of the map. + */ +@ExpressionDescription( + usage = "_FUNC_(map) - Returns an unordered array containing the keys of the map.", + extended = " > SELECT _FUNC_(map(1, 'a', 2, 'b'));\n [1,2]") +case class MapKeys(child: Expression) + extends UnaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(MapType) + + override def dataType: DataType = ArrayType(child.dataType.asInstanceOf[MapType].keyType) + + override def nullSafeEval(map: Any): Any = { +map.asInstanceOf[MapData].keyArray() + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, c => s"${ev.value} = ($c).keyArray();") + } + + override def prettyName: String = "map_keys" +} + +/** + * Returns an unordered array containing the values of the map. + */ +@ExpressionDescription( + usage = "_FUNC_(map) - Returns an unordered array containing the values of the map.", + extended = " > SELECT _FUNC_(map(1, 'a', 2, 'b'));\n [\"a\",\"b\"]") +case class MapValues(child: Expression) + extends UnaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(MapType) + + override def dataType: DataType = ArrayType(child.dataType.asInstanceOf[MapType].valueType) + + override def nullSafeEval(map: Any): Any = { +map.asInstanceOf[MapData].valueArray() + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, c => s"${ev.value} = ($c).valueArray();") + } + + override def prettyName: String = "map_values" +} + +/** * Sorts the input array in ascending / descending order according to the natural ordering of * the array eleme
spark git commit: [SPARK-16289][SQL] Implement posexplode table generating function
Repository: spark Updated Branches: refs/heads/branch-2.0 7ef1d1c61 -> a04975457 [SPARK-16289][SQL] Implement posexplode table generating function This PR implements `posexplode` table generating function. Currently, master branch raises the following exception for `map` argument. It's different from Hive. **Before** ```scala scala> sql("select posexplode(map('a', 1, 'b', 2))").show org.apache.spark.sql.AnalysisException: No handler for Hive UDF ... posexplode() takes an array as a parameter; line 1 pos 7 ``` **After** ```scala scala> sql("select posexplode(map('a', 1, 'b', 2))").show +---+---+-+ |pos|key|value| +---+---+-+ | 0| a|1| | 1| b|2| +---+---+-+ ``` For `array` argument, `after` is the same with `before`. ``` scala> sql("select posexplode(array(1, 2, 3))").show +---+---+ |pos|col| +---+---+ | 0| 1| | 1| 2| | 2| 3| +---+---+ ``` Pass the Jenkins tests with newly added testcases. Author: Dongjoon Hyun Closes #13971 from dongjoon-hyun/SPARK-16289. (cherry picked from commit 46395db80e3304e3f3a1ebdc8aadb8f2819b48b4) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0497545 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0497545 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0497545 Branch: refs/heads/branch-2.0 Commit: a049754577aa78a5a26b38821233861a4dfd8e8a Parents: 7ef1d1c Author: Dongjoon Hyun Authored: Thu Jun 30 12:03:54 2016 -0700 Committer: Reynold Xin Committed: Thu Jul 7 21:05:31 2016 -0700 -- R/pkg/NAMESPACE | 1 + R/pkg/R/functions.R | 17 R/pkg/R/generics.R | 4 + R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 +- python/pyspark/sql/functions.py | 21 + .../catalyst/analysis/FunctionRegistry.scala| 1 + .../sql/catalyst/expressions/generators.scala | 66 +++--- .../analysis/ExpressionTypeCheckingSuite.scala | 2 + .../expressions/GeneratorExpressionSuite.scala | 71 +++ .../scala/org/apache/spark/sql/Column.scala | 1 + .../scala/org/apache/spark/sql/functions.scala | 8 ++ .../spark/sql/ColumnExpressionSuite.scala | 60 - .../spark/sql/GeneratorFunctionSuite.scala | 92 .../spark/sql/hive/HiveSessionCatalog.scala | 2 +- 14 files changed, 276 insertions(+), 72 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a0497545/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 9fd2568..bc3aceb 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -235,6 +235,7 @@ exportMethods("%in%", "over", "percent_rank", "pmod", + "posexplode", "quarter", "rand", "randn", http://git-wip-us.apache.org/repos/asf/spark/blob/a0497545/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 09e5afa..52d46f9 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2934,3 +2934,20 @@ setMethod("sort_array", jc <- callJStatic("org.apache.spark.sql.functions", "sort_array", x@jc, asc) column(jc) }) + +#' posexplode +#' +#' Creates a new row for each element with position in the given array or map column. +#' +#' @rdname posexplode +#' @name posexplode +#' @family collection_funcs +#' @export +#' @examples \dontrun{posexplode(df$c)} +#' @note posexplode since 2.1.0 +setMethod("posexplode", + signature(x = "Column"), + function(x) { +jc <- callJStatic("org.apache.spark.sql.functions", "posexplode", x@jc) +column(jc) + }) http://git-wip-us.apache.org/repos/asf/spark/blob/a0497545/R/pkg/R/generics.R -- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index b0f25de..e4ec508 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1054,6 +1054,10 @@ setGeneric("percent_rank", function(x) { standardGeneric("percent_rank") }) #' @export setGeneric("pmod", function(y, x) { standardGeneric("pmod") }) +#' @rdname posexplode +#' @export +setGeneric("posexplode", function(x) { standardGeneric("posexplode") }) + #' @rdname quarter #' @export setGeneric("quarter", function(x) { standardGeneric("quarter") }) http://git-wip-us.apache.org/repos/asf/spark/blob/a0497545/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/test
spark git commit: [SPARK-16271][SQL] Implement Hive's UDFXPathUtil
Repository: spark Updated Branches: refs/heads/branch-2.0 a04975457 -> 144aa84ce [SPARK-16271][SQL] Implement Hive's UDFXPathUtil This patch ports Hive's UDFXPathUtil over to Spark, which can be used to implement xpath functionality in Spark in the near future. Added two new test suites UDFXPathUtilSuite and ReusableStringReaderSuite. They have been ported over from Hive (but rewritten in Scala in order to leverage ScalaTest). Author: petermaxlee Closes #13961 from petermaxlee/xpath. (cherry picked from commit 153c2f9ac12846367a09684fd875c496d350a603) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/144aa84c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/144aa84c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/144aa84c Branch: refs/heads/branch-2.0 Commit: 144aa84ce0f3463d95c06c78df6e9996ad42240a Parents: a049754 Author: petermaxlee Authored: Tue Jun 28 21:07:52 2016 -0700 Committer: Reynold Xin Committed: Thu Jul 7 21:07:03 2016 -0700 -- .../catalyst/expressions/xml/UDFXPathUtil.java | 192 +++ .../xml/ReusableStringReaderSuite.scala | 103 ++ .../expressions/xml/UDFXPathUtilSuite.scala | 99 ++ 3 files changed, 394 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/144aa84c/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java new file mode 100644 index 000..01a11f9 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.xml; + +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; + +import javax.xml.namespace.QName; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpression; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; + +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.InputSource; + +/** + * Utility class for all XPath UDFs. Each UDF instance should keep an instance of this class. + * + * This is based on Hive's UDFXPathUtil implementation. + */ +public class UDFXPathUtil { + private XPath xpath = XPathFactory.newInstance().newXPath(); + private ReusableStringReader reader = new ReusableStringReader(); + private InputSource inputSource = new InputSource(reader); + private XPathExpression expression = null; + private String oldPath = null; + + public Object eval(String xml, String path, QName qname) { +if (xml == null || path == null || qname == null) { + return null; +} + +if (xml.length() == 0 || path.length() == 0) { + return null; +} + +if (!path.equals(oldPath)) { + try { +expression = xpath.compile(path); + } catch (XPathExpressionException e) { +expression = null; + } + oldPath = path; +} + +if (expression == null) { + return null; +} + +reader.set(xml); + +try { + return expression.evaluate(inputSource, qname); +} catch (XPathExpressionException e) { + throw new RuntimeException ("Invalid expression '" + oldPath + "'", e); +} + } + + public Boolean evalBoolean(String xml, String path) { +return (Boolean) eval(xml, path, XPathConstants.BOOLEAN); + } + + public String evalString(String xml, String path) { +return (String) eval(xml, path, XPathConstants.STRING); + } + + public Double evalNumber(String xml, String path) { +return (Double) eval(xml, path, XPathConstants.NUMBER); + } + + public Node evalNode(String xml, String path) { +return (Node) eval(x
spark git commit: [SPARK-16274][SQL] Implement xpath_boolean
Repository: spark Updated Branches: refs/heads/branch-2.0 144aa84ce -> bb4b0419b [SPARK-16274][SQL] Implement xpath_boolean This patch implements xpath_boolean expression for Spark SQL, a xpath function that returns true or false. The implementation is modelled after Hive's xpath_boolean, except that how the expression handles null inputs. Hive throws a NullPointerException at runtime if either of the input is null. This implementation returns null if either of the input is null. Created two new test suites. One for unit tests covering the expression, and the other for end-to-end test in SQL. Author: petermaxlee Closes #13964 from petermaxlee/SPARK-16274. (cherry picked from commit d3af6731fa270842818ed91d6b4d14708ddae2db) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb4b0419 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb4b0419 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb4b0419 Branch: refs/heads/branch-2.0 Commit: bb4b0419b1dcd2b1926a829488a5a1d1b43756e0 Parents: 144aa84 Author: petermaxlee Authored: Thu Jun 30 09:27:48 2016 +0800 Committer: Reynold Xin Committed: Thu Jul 7 21:07:33 2016 -0700 -- .../catalyst/analysis/FunctionRegistry.scala| 2 + .../catalyst/expressions/xml/XPathBoolean.scala | 58 +++ .../expressions/xml/XPathExpressionSuite.scala | 61 .../apache/spark/sql/XmlFunctionsSuite.scala| 32 ++ .../spark/sql/hive/HiveSessionCatalog.scala | 2 +- 5 files changed, 154 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bb4b0419/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 346cdd8..e7f335f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.expressions.xml._ import org.apache.spark.sql.catalyst.util.StringKeyHashMap @@ -305,6 +306,7 @@ object FunctionRegistry { expression[UnBase64]("unbase64"), expression[Unhex]("unhex"), expression[Upper]("upper"), +expression[XPathBoolean]("xpath_boolean"), // datetime functions expression[AddMonths]("add_months"), http://git-wip-us.apache.org/repos/asf/spark/blob/bb4b0419/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XPathBoolean.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XPathBoolean.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XPathBoolean.scala new file mode 100644 index 000..2a5256c --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XPathBoolean.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.xml + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.types.{AbstractDataType, BooleanType, DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String + + +@ExpressionDescription( + usage = "_FUNC_(xml, xpath) - Evaluates a boolean xpath expression.", + extended = "> SELECT _FUNC_('1','a/b');\ntrue") +case class XPathBoolean(xml:
spark git commit: [SPARK-16288][SQL] Implement inline table generating function
Repository: spark Updated Branches: refs/heads/branch-2.0 bb4b0419b -> e32c29d86 [SPARK-16288][SQL] Implement inline table generating function This PR implements `inline` table generating function. Pass the Jenkins tests with new testcase. Author: Dongjoon Hyun Closes #13976 from dongjoon-hyun/SPARK-16288. (cherry picked from commit 88134e736829f5f93a82879c08cb191f175ff8af) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e32c29d8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e32c29d8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e32c29d8 Branch: refs/heads/branch-2.0 Commit: e32c29d86d4cc7ebe8e485c4221b5a10366b3d7d Parents: bb4b041 Author: Dongjoon Hyun Authored: Mon Jul 4 01:57:45 2016 +0800 Committer: Reynold Xin Committed: Thu Jul 7 21:08:45 2016 -0700 -- .../catalyst/analysis/FunctionRegistry.scala| 1 + .../sql/catalyst/expressions/generators.scala | 35 .../expressions/GeneratorExpressionSuite.scala | 59 +-- .../spark/sql/GeneratorFunctionSuite.scala | 60 .../spark/sql/hive/HiveSessionCatalog.scala | 5 +- 5 files changed, 124 insertions(+), 36 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e32c29d8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index e7f335f..021bec7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -165,6 +165,7 @@ object FunctionRegistry { expression[Explode]("explode"), expression[Greatest]("greatest"), expression[If]("if"), +expression[Inline]("inline"), expression[IsNaN]("isnan"), expression[IfNull]("ifnull"), expression[IsNull]("isnull"), http://git-wip-us.apache.org/repos/asf/spark/blob/e32c29d8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 4e91cc5..99b97c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -195,3 +195,38 @@ case class Explode(child: Expression) extends ExplodeBase(child, position = fals extended = "> SELECT _FUNC_(array(10,20));\n 0\t10\n 1\t20") // scalastyle:on line.size.limit case class PosExplode(child: Expression) extends ExplodeBase(child, position = true) + +/** + * Explodes an array of structs into a table. + */ +@ExpressionDescription( + usage = "_FUNC_(a) - Explodes an array of structs into a table.", + extended = "> SELECT _FUNC_(array(struct(1, 'a'), struct(2, 'b')));\n [1,a]\n [2,b]") +case class Inline(child: Expression) extends UnaryExpression with Generator with CodegenFallback { + + override def children: Seq[Expression] = child :: Nil + + override def checkInputDataTypes(): TypeCheckResult = child.dataType match { +case ArrayType(et, _) if et.isInstanceOf[StructType] => + TypeCheckResult.TypeCheckSuccess +case _ => + TypeCheckResult.TypeCheckFailure( +s"input to function $prettyName should be array of struct type, not ${child.dataType}") + } + + override def elementSchema: StructType = child.dataType match { +case ArrayType(et : StructType, _) => et + } + + private lazy val numFields = elementSchema.fields.length + + override def eval(input: InternalRow): TraversableOnce[InternalRow] = { +val inputArray = child.eval(input).asInstanceOf[ArrayData] +if (inputArray == null) { + Nil +} else { + for (i <- 0 until inputArray.numElements()) +yield inputArray.getStruct(i, numFields) +} + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/e32c29d8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala index 2aba841..e79f89b 100644 -
spark git commit: [SPARK-16286][SQL] Implement stack table generating function
Repository: spark Updated Branches: refs/heads/branch-2.0 e32c29d86 -> 565e18cf7 [SPARK-16286][SQL] Implement stack table generating function This PR implements `stack` table generating function. Pass the Jenkins tests including new testcases. Author: Dongjoon Hyun Closes #14033 from dongjoon-hyun/SPARK-16286. (cherry picked from commit d0d28507cacfca5919dbfb4269892d58b62e8662) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/565e18cf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/565e18cf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/565e18cf Branch: refs/heads/branch-2.0 Commit: 565e18cf7670231b1fa9db84f907654da79e6cef Parents: e32c29d Author: Dongjoon Hyun Authored: Wed Jul 6 10:54:43 2016 +0800 Committer: Reynold Xin Committed: Thu Jul 7 21:09:09 2016 -0700 -- .../catalyst/analysis/FunctionRegistry.scala| 1 + .../sql/catalyst/expressions/generators.scala | 53 .../expressions/GeneratorExpressionSuite.scala | 18 +++ .../spark/sql/GeneratorFunctionSuite.scala | 53 .../spark/sql/hive/HiveSessionCatalog.scala | 2 +- 5 files changed, 126 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/565e18cf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 021bec7..f6ebcae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -182,6 +182,7 @@ object FunctionRegistry { expression[PosExplode]("posexplode"), expression[Rand]("rand"), expression[Randn]("randn"), +expression[Stack]("stack"), expression[CreateStruct]("struct"), expression[CaseWhen]("when"), http://git-wip-us.apache.org/repos/asf/spark/blob/565e18cf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 99b97c8..9d5c856 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -94,6 +94,59 @@ case class UserDefinedGenerator( } /** + * Separate v1, ..., vk into n rows. Each row will have k/n columns. n must be constant. + * {{{ + * SELECT stack(2, 1, 2, 3) -> + * 1 2 + * 3 NULL + * }}} + */ +@ExpressionDescription( + usage = "_FUNC_(n, v1, ..., vk) - Separate v1, ..., vk into n rows.", + extended = "> SELECT _FUNC_(2, 1, 2, 3);\n [1,2]\n [3,null]") +case class Stack(children: Seq[Expression]) +extends Expression with Generator with CodegenFallback { + + private lazy val numRows = children.head.eval().asInstanceOf[Int] + private lazy val numFields = Math.ceil((children.length - 1.0) / numRows).toInt + + override def checkInputDataTypes(): TypeCheckResult = { +if (children.length <= 1) { + TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least 2 arguments.") +} else if (children.head.dataType != IntegerType || !children.head.foldable || numRows < 1) { + TypeCheckResult.TypeCheckFailure("The number of rows must be a positive constant integer.") +} else { + for (i <- 1 until children.length) { +val j = (i - 1) % numFields +if (children(i).dataType != elementSchema.fields(j).dataType) { + return TypeCheckResult.TypeCheckFailure( +s"Argument ${j + 1} (${elementSchema.fields(j).dataType}) != " + + s"Argument $i (${children(i).dataType})") +} + } + TypeCheckResult.TypeCheckSuccess +} + } + + override def elementSchema: StructType = +StructType(children.tail.take(numFields).zipWithIndex.map { + case (e, index) => StructField(s"col$index", e.dataType) +}) + + override def eval(input: InternalRow): TraversableOnce[InternalRow] = { +val values = children.tail.map(_.eval(input)).toArray +for (row <- 0 until numRows) yield { + val fields = new Array[Any](numFields) + for (col <- 0 until numFields) { +val index = row * numFields + col +fields.update(col, if (index < values.length) values(index) else
spark git commit: [SPARK-16430][SQL][STREAMING] Add option maxFilesPerTrigger
Repository: spark Updated Branches: refs/heads/master 6aa7d09f4 -> 5bce45809 [SPARK-16430][SQL][STREAMING] Add option maxFilesPerTrigger ## What changes were proposed in this pull request? An option that limits the file stream source to read 1 file at a time enables rate limiting. It has the additional convenience that a static set of files can be used like a stream for testing as this will allows those files to be considered one at a time. This PR adds option `maxFilesPerTrigger`. ## How was this patch tested? New unit test Author: Tathagata Das Closes #14094 from tdas/SPARK-16430. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5bce4580 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5bce4580 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5bce4580 Branch: refs/heads/master Commit: 5bce4580939c27876f11cd75f0dc2190fb9fa908 Parents: 6aa7d09 Author: Tathagata Das Authored: Thu Jul 7 23:19:41 2016 -0700 Committer: Tathagata Das Committed: Thu Jul 7 23:19:41 2016 -0700 -- .../execution/streaming/FileStreamSource.scala | 40 +++ .../spark/sql/streaming/DataStreamReader.scala | 10 +++ .../sql/streaming/FileStreamSourceSuite.scala | 76 3 files changed, 112 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5bce4580/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 11bf3c0..72b335a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.streaming -import scala.collection.mutable.ArrayBuffer +import scala.util.Try import org.apache.hadoop.fs.Path @@ -46,6 +46,9 @@ class FileStreamSource( private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath) private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) + /** Maximum number of new files to be considered in each batch */ + private val maxFilesPerBatch = getMaxFilesPerBatch() + private val seenFiles = new OpenHashSet[String] metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) => files.foreach(seenFiles.add) @@ -58,19 +61,17 @@ class FileStreamSource( * there is no race here, so the cost of `synchronized` should be rare. */ private def fetchMaxOffset(): LongOffset = synchronized { -val filesPresent = fetchAllFiles() -val newFiles = new ArrayBuffer[String]() -filesPresent.foreach { file => - if (!seenFiles.contains(file)) { -logDebug(s"new file: $file") -newFiles.append(file) -seenFiles.add(file) - } else { -logDebug(s"old file: $file") - } +val newFiles = fetchAllFiles().filter(!seenFiles.contains(_)) +val batchFiles = + if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles +batchFiles.foreach { file => + seenFiles.add(file) + logDebug(s"New file: $file") } - -if (newFiles.nonEmpty) { +logTrace(s"Number of new files = ${newFiles.size})") +logTrace(s"Number of files selected for batch = ${batchFiles.size}") +logTrace(s"Number of seen files = ${seenFiles.size}") +if (batchFiles.nonEmpty) { maxBatchId += 1 metadataLog.add(maxBatchId, newFiles) logInfo(s"Max batch id increased to $maxBatchId with ${newFiles.size} new files") @@ -118,7 +119,7 @@ class FileStreamSource( val startTime = System.nanoTime val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType)) -val files = catalog.allFiles().map(_.getPath.toUri.toString) +val files = catalog.allFiles().sortBy(_.getModificationTime).map(_.getPath.toUri.toString) val endTime = System.nanoTime val listingTimeMs = (endTime.toDouble - startTime) / 100 if (listingTimeMs > 2000) { @@ -131,6 +132,17 @@ class FileStreamSource( files } + private def getMaxFilesPerBatch(): Option[Int] = { +new CaseInsensitiveMap(options) + .get("maxFilesPerTrigger") + .map { str => +Try(str.toInt).toOption.filter(_ > 0).getOrElse { + throw new IllegalArgumentException( +s"Invalid value '$str' for option 'maxFilesPerBatch', must be a positive integer") +} + } +
spark git commit: [SPARK-16430][SQL][STREAMING] Add option maxFilesPerTrigger
Repository: spark Updated Branches: refs/heads/branch-2.0 565e18cf7 -> 18ace015e [SPARK-16430][SQL][STREAMING] Add option maxFilesPerTrigger ## What changes were proposed in this pull request? An option that limits the file stream source to read 1 file at a time enables rate limiting. It has the additional convenience that a static set of files can be used like a stream for testing as this will allows those files to be considered one at a time. This PR adds option `maxFilesPerTrigger`. ## How was this patch tested? New unit test Author: Tathagata Das Closes #14094 from tdas/SPARK-16430. (cherry picked from commit 5bce4580939c27876f11cd75f0dc2190fb9fa908) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18ace015 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18ace015 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18ace015 Branch: refs/heads/branch-2.0 Commit: 18ace015e967910ce363937ac2fa67011e0a3bba Parents: 565e18c Author: Tathagata Das Authored: Thu Jul 7 23:19:41 2016 -0700 Committer: Tathagata Das Committed: Thu Jul 7 23:19:59 2016 -0700 -- .../execution/streaming/FileStreamSource.scala | 40 +++ .../spark/sql/streaming/DataStreamReader.scala | 10 +++ .../sql/streaming/FileStreamSourceSuite.scala | 76 3 files changed, 112 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/18ace015/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 11bf3c0..72b335a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.streaming -import scala.collection.mutable.ArrayBuffer +import scala.util.Try import org.apache.hadoop.fs.Path @@ -46,6 +46,9 @@ class FileStreamSource( private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath) private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) + /** Maximum number of new files to be considered in each batch */ + private val maxFilesPerBatch = getMaxFilesPerBatch() + private val seenFiles = new OpenHashSet[String] metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) => files.foreach(seenFiles.add) @@ -58,19 +61,17 @@ class FileStreamSource( * there is no race here, so the cost of `synchronized` should be rare. */ private def fetchMaxOffset(): LongOffset = synchronized { -val filesPresent = fetchAllFiles() -val newFiles = new ArrayBuffer[String]() -filesPresent.foreach { file => - if (!seenFiles.contains(file)) { -logDebug(s"new file: $file") -newFiles.append(file) -seenFiles.add(file) - } else { -logDebug(s"old file: $file") - } +val newFiles = fetchAllFiles().filter(!seenFiles.contains(_)) +val batchFiles = + if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles +batchFiles.foreach { file => + seenFiles.add(file) + logDebug(s"New file: $file") } - -if (newFiles.nonEmpty) { +logTrace(s"Number of new files = ${newFiles.size})") +logTrace(s"Number of files selected for batch = ${batchFiles.size}") +logTrace(s"Number of seen files = ${seenFiles.size}") +if (batchFiles.nonEmpty) { maxBatchId += 1 metadataLog.add(maxBatchId, newFiles) logInfo(s"Max batch id increased to $maxBatchId with ${newFiles.size} new files") @@ -118,7 +119,7 @@ class FileStreamSource( val startTime = System.nanoTime val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType)) -val files = catalog.allFiles().map(_.getPath.toUri.toString) +val files = catalog.allFiles().sortBy(_.getModificationTime).map(_.getPath.toUri.toString) val endTime = System.nanoTime val listingTimeMs = (endTime.toDouble - startTime) / 100 if (listingTimeMs > 2000) { @@ -131,6 +132,17 @@ class FileStreamSource( files } + private def getMaxFilesPerBatch(): Option[Int] = { +new CaseInsensitiveMap(options) + .get("maxFilesPerTrigger") + .map { str => +Try(str.toInt).toOption.filter(_ > 0).getOrElse { + throw new IllegalArgumentException( +