spark git commit: [SPARK-19618][SQL] Inconsistency wrt max. buckets allowed from Dataframe API vs SQL
Repository: spark Updated Branches: refs/heads/master 8487902a9 -> f041e55ee [SPARK-19618][SQL] Inconsistency wrt max. buckets allowed from Dataframe API vs SQL ## What changes were proposed in this pull request? Jira: https://issues.apache.org/jira/browse/SPARK-19618 Moved the check for validating number of buckets from `DataFrameWriter` to `BucketSpec` creation ## How was this patch tested? - Added more unit tests Author: Tejas Patil Closes #16948 from tejasapatil/SPARK-19618_max_buckets. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f041e55e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f041e55e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f041e55e Branch: refs/heads/master Commit: f041e55eefe1d8a995fed321c66bccbd8b8e5255 Parents: 8487902 Author: Tejas Patil Authored: Wed Feb 15 22:45:58 2017 -0800 Committer: Wenchen Fan Committed: Wed Feb 15 22:45:58 2017 -0800 -- .../spark/sql/catalyst/catalog/interface.scala | 5 ++-- .../org/apache/spark/sql/DataFrameWriter.scala | 1 - .../sql/sources/CreateTableAsSelectSuite.scala | 28 +++- .../spark/sql/sources/BucketedWriteSuite.scala | 10 --- 4 files changed, 25 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f041e55e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 353e595..2b3b575 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -135,8 +135,9 @@ case class BucketSpec( numBuckets: Int, bucketColumnNames: Seq[String], sortColumnNames: Seq[String]) { - if (numBuckets <= 0) { -throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.") + if (numBuckets <= 0 || numBuckets >= 10) { +throw new AnalysisException( + s"Number of buckets should be greater than 0 but less than 10. Got `$numBuckets`") } override def toString: String = { http://git-wip-us.apache.org/repos/asf/spark/blob/f041e55e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 1d834b1..cdae8ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -275,7 +275,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } numBuckets.map { n => - require(n > 0 && n < 10, "Bucket number must be greater than 0 and less than 10.") BucketSpec(n, bucketColumnNames.get, sortColumnNames.getOrElse(Nil)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/f041e55e/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 99da196..4a42f8e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -206,7 +206,7 @@ class CreateTableAsSelectSuite } } - test("create table using as select - with non-zero buckets") { + test("create table using as select - with valid number of buckets") { val catalog = spark.sessionState.catalog withTable("t") { sql( @@ -222,19 +222,21 @@ class CreateTableAsSelectSuite } } - test("create table using as select - with zero buckets") { + test("create table using as select - with invalid number of buckets") { withTable("t") { - val e = intercept[AnalysisException] { -sql( - s""" - |CREATE TABLE t USING PARQUET - |OPTIONS (PATH '${path.toURI}') - |CLUSTERED BY (a) SORTED BY (b) INTO 0 BUCKETS - |AS SELECT 1 AS a, 2 AS b - """.stripMargin -) - }.getMessage - assert(e.contains("Expected positive number of buckets, but got `0`")) + Seq(0, 10).foreach(numBuckets => { +val e = intercept[AnalysisException] { + sql( +
spark git commit: [SPARK-19399][SPARKR][BACKPORT-2.1] fix tests broken by merge
Repository: spark Updated Branches: refs/heads/branch-2.1 db7adb61b -> 252dd05f0 [SPARK-19399][SPARKR][BACKPORT-2.1] fix tests broken by merge ## What changes were proposed in this pull request? fix test broken by git merge for #16739 ## How was this patch tested? manual Author: Felix Cheung Closes #16950 from felixcheung/fixrtest. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/252dd05f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/252dd05f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/252dd05f Branch: refs/heads/branch-2.1 Commit: 252dd05f0d883bc7d4419308fe71bd817e6c814d Parents: db7adb6 Author: Felix Cheung Authored: Wed Feb 15 21:31:36 2017 -0800 Committer: Felix Cheung Committed: Wed Feb 15 21:31:36 2017 -0800 -- R/pkg/inst/tests/testthat/test_sparkSQL.R | 1 - 1 file changed, 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/252dd05f/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 0447d24..d9dd0f3 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1229,7 +1229,6 @@ test_that("column functions", { c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1") c18 <- covar_pop(c, c1) + covar_pop("c", "c1") c19 <- spark_partition_id() + coalesce(c) + coalesce(c1, c2, c3) - c20 <- to_timestamp(c) + to_timestamp(c, "") + to_date(c, "") # Test if base::is.nan() is exposed expect_equal(is.nan(c("a", "b")), c(FALSE, FALSE)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18871][SQL][TESTS] New test cases for IN/NOT IN subquery 4th batch
Repository: spark Updated Branches: refs/heads/master fc02ef95c -> 8487902a9 [SPARK-18871][SQL][TESTS] New test cases for IN/NOT IN subquery 4th batch ## What changes were proposed in this pull request? This is 4th batch of test case for IN/NOT IN subquery. In this PR, it has these test files: `in-set-operations.sql` `in-with-cte.sql` `not-in-joins.sql` Here are the queries and results from running on DB2. [in-set-operations DB2 version](https://github.com/apache/spark/files/772846/in-set-operations.sql.db2.txt) [Output of in-set-operations](https://github.com/apache/spark/files/772848/in-set-operations.sql.db2.out.txt) [in-with-cte DB2 version](https://github.com/apache/spark/files/772849/in-with-cte.sql.db2.txt) [Output of in-with-cte](https://github.com/apache/spark/files/772856/in-with-cte.sql.db2.out.txt) [not-in-joins DB2 version](https://github.com/apache/spark/files/772851/not-in-joins.sql.db2.txt) [Output of not-in-joins](https://github.com/apache/spark/files/772852/not-in-joins.sql.db2.out.txt) ## How was this patch tested? This pr is adding new test cases. We compare the result from spark with the result from another RDBMS(We used DB2 LUW). If the results are the same, we assume the result is correct. Author: Kevin Yu Closes #16915 from kevinyu98/spark-18871-44. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8487902a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8487902a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8487902a Branch: refs/heads/master Commit: 8487902a98caf727ba3f9820452b01276d20ede3 Parents: fc02ef9 Author: Kevin Yu Authored: Wed Feb 15 21:29:28 2017 -0800 Committer: Xiao Li Committed: Wed Feb 15 21:29:28 2017 -0800 -- .../subquery/in-subquery/in-set-operations.sql | 472 +++ .../inputs/subquery/in-subquery/in-with-cte.sql | 287 + .../subquery/in-subquery/not-in-joins.sql | 167 ++ .../in-subquery/in-set-operations.sql.out | 595 +++ .../subquery/in-subquery/in-with-cte.sql.out| 364 .../subquery/in-subquery/not-in-joins.sql.out | 229 +++ 6 files changed, 2114 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8487902a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-set-operations.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-set-operations.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-set-operations.sql new file mode 100644 index 000..6b9e8bf --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-set-operations.sql @@ -0,0 +1,472 @@ +-- A test suite for set-operations in parent side, subquery, and both predicate subquery +-- It includes correlated cases. + +create temporary view t1 as select * from values + ("val1a", 6S, 8, 10L, float(15.0), 20D, 20E2, timestamp '2014-04-04 01:00:00.000', date '2014-04-04'), + ("val1b", 8S, 16, 19L, float(17.0), 25D, 26E2, timestamp '2014-05-04 01:01:00.000', date '2014-05-04'), + ("val1a", 16S, 12, 21L, float(15.0), 20D, 20E2, timestamp '2014-06-04 01:02:00.001', date '2014-06-04'), + ("val1a", 16S, 12, 10L, float(15.0), 20D, 20E2, timestamp '2014-07-04 01:01:00.000', date '2014-07-04'), + ("val1c", 8S, 16, 19L, float(17.0), 25D, 26E2, timestamp '2014-05-04 01:02:00.001', date '2014-05-05'), + ("val1d", null, 16, 22L, float(17.0), 25D, 26E2, timestamp '2014-06-04 01:01:00.000', null), + ("val1d", null, 16, 19L, float(17.0), 25D, 26E2, timestamp '2014-07-04 01:02:00.001', null), + ("val1e", 10S, null, 25L, float(17.0), 25D, 26E2, timestamp '2014-08-04 01:01:00.000', date '2014-08-04'), + ("val1e", 10S, null, 19L, float(17.0), 25D, 26E2, timestamp '2014-09-04 01:02:00.001', date '2014-09-04'), + ("val1d", 10S, null, 12L, float(17.0), 25D, 26E2, timestamp '2015-05-04 01:01:00.000', date '2015-05-04'), + ("val1a", 6S, 8, 10L, float(15.0), 20D, 20E2, timestamp '2014-04-04 01:02:00.001', date '2014-04-04'), + ("val1e", 10S, null, 19L, float(17.0), 25D, 26E2, timestamp '2014-05-04 01:01:00.000', date '2014-05-04') + as t1(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i); + +create temporary view t2 as select * from values + ("val2a", 6S, 12, 14L, float(15), 20D, 20E2, timestamp '2014-04-04 01:01:00.000', date '2014-04-04'), + ("val1b", 10S, 12, 19L, float(17), 25D, 26E2, timestamp '2014-05-04 01:01:00.000', date '2014-05-04'), + ("val1b", 8S, 16, 119L, float(17), 25D, 26E2, timestamp '2015-05-04 01:01:00.000', date '2015-05-04'), + ("val1c", 12S, 16, 219L, float(17), 25D, 26E2, timestamp '2016-05-04 01:01:00.000', date '2016-05-04'), + ("val1b", null, 16, 319L, float(17), 25
spark git commit: [SPARK-19603][SS] Fix StreamingQuery explain command
Repository: spark Updated Branches: refs/heads/master 08c1972a0 -> fc02ef95c [SPARK-19603][SS] Fix StreamingQuery explain command ## What changes were proposed in this pull request? `StreamingQuery.explain` doesn't show the correct streaming physical plan right now because `ExplainCommand` receives a runtime batch plan and its `logicalPlan.isStreaming` is always false. This PR adds `streaming` parameter to `ExplainCommand` to allow `StreamExecution` to specify that it's a streaming plan. Examples of the explain outputs: - streaming DataFrame.explain() ``` == Physical Plan == *HashAggregate(keys=[value#518], functions=[count(1)]) +- StateStoreSave [value#518], OperatorStateId(,0,0), Append, 0 +- *HashAggregate(keys=[value#518], functions=[merge_count(1)]) +- StateStoreRestore [value#518], OperatorStateId(,0,0) +- *HashAggregate(keys=[value#518], functions=[merge_count(1)]) +- Exchange hashpartitioning(value#518, 5) +- *HashAggregate(keys=[value#518], functions=[partial_count(1)]) +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- *MapElements , obj#517: java.lang.String +- *DeserializeToObject value#513.toString, obj#516: java.lang.String +- StreamingRelation MemoryStream[value#513], [value#513] ``` - StreamingQuery.explain(extended = false) ``` == Physical Plan == *HashAggregate(keys=[value#518], functions=[count(1)]) +- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0 +- *HashAggregate(keys=[value#518], functions=[merge_count(1)]) +- StateStoreRestore [value#518], OperatorStateId(...,0,0) +- *HashAggregate(keys=[value#518], functions=[merge_count(1)]) +- Exchange hashpartitioning(value#518, 5) +- *HashAggregate(keys=[value#518], functions=[partial_count(1)]) +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- *MapElements , obj#517: java.lang.String +- *DeserializeToObject value#543.toString, obj#516: java.lang.String +- LocalTableScan [value#543] ``` - StreamingQuery.explain(extended = true) ``` == Parsed Logical Plan == Aggregate [value#518], [value#518, count(1) AS count(1)#524L] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- MapElements , class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String +- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String +- LocalRelation [value#543] == Analyzed Logical Plan == value: string, count(1): bigint Aggregate [value#518], [value#518, count(1) AS count(1)#524L] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- MapElements , class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String +- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String +- LocalRelation [value#543] == Optimized Logical Plan == Aggregate [value#518], [value#518, count(1) AS count(1)#524L] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- MapElements , class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String +- DeserializeToObject value#543.toString, obj#516: java.lang.String +- LocalRelation [value#543] == Physical Plan == *HashAggregate(keys=[value#518], functions=[count(1)], output=[value#518, count(1)#524L]) +- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0 +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L]) +- StateStoreRestore [value#518], OperatorStateId(...,0,0) +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L]) +- Exchange hashpartitioning(value#518, 5) +- *HashAggregate(keys=[value#518], functions=[partial_count(1)], output=[value#518, count#530L]) +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- *MapElements , obj#517: java.lang.String +- *DeserializeToObject value#543.toString, obj#516: java.lang.String +- LocalTableScan [value#543] ``` ## H
spark git commit: [SPARK-19603][SS] Fix StreamingQuery explain command
Repository: spark Updated Branches: refs/heads/branch-2.1 b9ab4c0e9 -> db7adb61b [SPARK-19603][SS] Fix StreamingQuery explain command ## What changes were proposed in this pull request? `StreamingQuery.explain` doesn't show the correct streaming physical plan right now because `ExplainCommand` receives a runtime batch plan and its `logicalPlan.isStreaming` is always false. This PR adds `streaming` parameter to `ExplainCommand` to allow `StreamExecution` to specify that it's a streaming plan. Examples of the explain outputs: - streaming DataFrame.explain() ``` == Physical Plan == *HashAggregate(keys=[value#518], functions=[count(1)]) +- StateStoreSave [value#518], OperatorStateId(,0,0), Append, 0 +- *HashAggregate(keys=[value#518], functions=[merge_count(1)]) +- StateStoreRestore [value#518], OperatorStateId(,0,0) +- *HashAggregate(keys=[value#518], functions=[merge_count(1)]) +- Exchange hashpartitioning(value#518, 5) +- *HashAggregate(keys=[value#518], functions=[partial_count(1)]) +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- *MapElements , obj#517: java.lang.String +- *DeserializeToObject value#513.toString, obj#516: java.lang.String +- StreamingRelation MemoryStream[value#513], [value#513] ``` - StreamingQuery.explain(extended = false) ``` == Physical Plan == *HashAggregate(keys=[value#518], functions=[count(1)]) +- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0 +- *HashAggregate(keys=[value#518], functions=[merge_count(1)]) +- StateStoreRestore [value#518], OperatorStateId(...,0,0) +- *HashAggregate(keys=[value#518], functions=[merge_count(1)]) +- Exchange hashpartitioning(value#518, 5) +- *HashAggregate(keys=[value#518], functions=[partial_count(1)]) +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- *MapElements , obj#517: java.lang.String +- *DeserializeToObject value#543.toString, obj#516: java.lang.String +- LocalTableScan [value#543] ``` - StreamingQuery.explain(extended = true) ``` == Parsed Logical Plan == Aggregate [value#518], [value#518, count(1) AS count(1)#524L] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- MapElements , class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String +- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String +- LocalRelation [value#543] == Analyzed Logical Plan == value: string, count(1): bigint Aggregate [value#518], [value#518, count(1) AS count(1)#524L] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- MapElements , class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String +- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String +- LocalRelation [value#543] == Optimized Logical Plan == Aggregate [value#518], [value#518, count(1) AS count(1)#524L] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- MapElements , class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String +- DeserializeToObject value#543.toString, obj#516: java.lang.String +- LocalRelation [value#543] == Physical Plan == *HashAggregate(keys=[value#518], functions=[count(1)], output=[value#518, count(1)#524L]) +- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0 +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L]) +- StateStoreRestore [value#518], OperatorStateId(...,0,0) +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L]) +- Exchange hashpartitioning(value#518, 5) +- *HashAggregate(keys=[value#518], functions=[partial_count(1)], output=[value#518, count#530L]) +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- *MapElements , obj#517: java.lang.String +- *DeserializeToObject value#543.toString, obj#516: java.lang.String +- LocalTableScan [value#543] ```
spark git commit: [SPARK-19604][TESTS] Log the start of every Python test
Repository: spark Updated Branches: refs/heads/branch-2.1 88c43f4fb -> b9ab4c0e9 [SPARK-19604][TESTS] Log the start of every Python test ## What changes were proposed in this pull request? Right now, we only have info level log after we finish the tests of a Python test file. We should also log the start of a test. So, if a test is hanging, we can tell which test file is running. ## How was this patch tested? This is a change for python tests. Author: Yin Huai Closes #16935 from yhuai/SPARK-19604. (cherry picked from commit f6c3bba22501ee7753d85c6e51ffe851d43869c1) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9ab4c0e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9ab4c0e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9ab4c0e Branch: refs/heads/branch-2.1 Commit: b9ab4c0e983df463232f1adbe6e5982b0d7d497d Parents: 88c43f4 Author: Yin Huai Authored: Wed Feb 15 14:41:15 2017 -0800 Committer: Yin Huai Committed: Wed Feb 15 18:43:57 2017 -0800 -- python/run-tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b9ab4c0e/python/run-tests.py -- diff --git a/python/run-tests.py b/python/run-tests.py index 38b3bb8..53a0aef 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -72,7 +72,7 @@ def run_individual_python_test(test_name, pyspark_python): 'PYSPARK_PYTHON': which(pyspark_python), 'PYSPARK_DRIVER_PYTHON': which(pyspark_python) }) -LOGGER.debug("Starting test(%s): %s", pyspark_python, test_name) +LOGGER.info("Starting test(%s): %s", pyspark_python, test_name) start_time = time.time() try: per_test_output = tempfile.TemporaryFile() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18080][ML][PYTHON] Python API & Examples for Locality Sensitive Hashing
Repository: spark Updated Branches: refs/heads/master 21b4ba2d6 -> 08c1972a0 [SPARK-18080][ML][PYTHON] Python API & Examples for Locality Sensitive Hashing ## What changes were proposed in this pull request? This pull request includes python API and examples for LSH. The API changes was based on yanboliang 's PR #15768 and resolved conflicts and API changes on the Scala API. The examples are consistent with Scala examples of MinHashLSH and BucketedRandomProjectionLSH. ## How was this patch tested? API and examples are tested using spark-submit: `bin/spark-submit examples/src/main/python/ml/min_hash_lsh.py` `bin/spark-submit examples/src/main/python/ml/bucketed_random_projection_lsh.py` User guide changes are generated and manually inspected: `SKIP_API=1 jekyll build` Author: Yun Ni Author: Yanbo Liang Author: Yunni Closes #16715 from Yunni/spark-18080. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08c1972a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08c1972a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08c1972a Branch: refs/heads/master Commit: 08c1972a0661d42f300520cc6e5fb31023de093b Parents: 21b4ba2 Author: Yun Ni Authored: Wed Feb 15 16:26:05 2017 -0800 Committer: Yanbo Liang Committed: Wed Feb 15 16:26:05 2017 -0800 -- docs/ml-features.md | 17 ++ .../JavaBucketedRandomProjectionLSHExample.java | 38 ++- .../examples/ml/JavaMinHashLSHExample.java | 57 +++- .../bucketed_random_projection_lsh_example.py | 81 ++ .../src/main/python/ml/min_hash_lsh_example.py | 81 ++ .../ml/BucketedRandomProjectionLSHExample.scala | 39 ++- .../spark/examples/ml/MinHashLSHExample.scala | 43 ++- .../scala/org/apache/spark/ml/feature/LSH.scala | 7 +- python/pyspark/ml/feature.py| 291 +++ 9 files changed, 601 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/docs/ml-features.md -- diff --git a/docs/ml-features.md b/docs/ml-features.md index 13d97a2..57605ba 100644 --- a/docs/ml-features.md +++ b/docs/ml-features.md @@ -1558,6 +1558,15 @@ for more details on the API. {% include_example java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java %} + + + +Refer to the [BucketedRandomProjectionLSH Python docs](api/python/pyspark.ml.html#pyspark.ml.feature.BucketedRandomProjectionLSH) +for more details on the API. + +{% include_example python/ml/bucketed_random_projection_lsh_example.py %} + + ### MinHash for Jaccard Distance @@ -1590,4 +1599,12 @@ for more details on the API. {% include_example java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java %} + + + +Refer to the [MinHashLSH Python docs](api/python/pyspark.ml.html#pyspark.ml.feature.MinHashLSH) +for more details on the API. + +{% include_example python/ml/min_hash_lsh_example.py %} + http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java index ca3ee5a..4594e34 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java @@ -35,8 +35,15 @@ import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; + +import static org.apache.spark.sql.functions.col; // $example off$ +/** + * An example demonstrating BucketedRandomProjectionLSH. + * Run with: + * bin/run-example org.apache.spark.examples.ml.JavaBucketedRandomProjectionLSHExample + */ public class JavaBucketedRandomProjectionLSHExample { public static void main(String[] args) { SparkSession spark = SparkSession @@ -61,7 +68,7 @@ public class JavaBucketedRandomProjectionLSHExample { StructType schema = new StructType(new StructField[]{ new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), - new StructField("keys", new VectorUDT(), false, Metadata.empty()) + new StructField("features", new VectorUDT(), false, Metadata.empty()) }); Dataset dfA = spark.createDataFrame(dataA, schema); Dataset dfB = spark.createDataFrame(dataB, schema); @@ -71,26 +78,31 @@ public class JavaBucketedRandomProjectionLSHExample {
spark git commit: [SPARK-19599][SS] Clean up HDFSMetadataLog
Repository: spark Updated Branches: refs/heads/branch-2.1 6c3539906 -> 88c43f4fb [SPARK-19599][SS] Clean up HDFSMetadataLog ## What changes were proposed in this pull request? SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some cleanup for HDFSMetadataLog. This PR includes the following changes: - ~~Remove the workaround codes for HADOOP-10622.~~ Unfortunately, there is another issue [HADOOP-14084](https://issues.apache.org/jira/browse/HADOOP-14084) that prevents us from removing the workaround codes. - Remove unnecessary `writer: (T, OutputStream) => Unit` and just call `serialize` directly. - Remove catching FileNotFoundException. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #16932 from zsxwing/metadata-cleanup. (cherry picked from commit 21b4ba2d6f21a9759af879471715c123073bd67a) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88c43f4f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88c43f4f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88c43f4f Branch: refs/heads/branch-2.1 Commit: 88c43f4fb5ea042a119819c11a5cdbe225095c54 Parents: 6c35399 Author: Shixiong Zhu Authored: Wed Feb 15 16:21:43 2017 -0800 Committer: Shixiong Zhu Committed: Wed Feb 15 16:21:49 2017 -0800 -- .../execution/streaming/HDFSMetadataLog.scala | 39 +--- .../execution/streaming/StreamExecution.scala | 4 +- 2 files changed, 19 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/88c43f4f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 1b41352..e6a48a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -114,15 +114,18 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: case ut: UninterruptibleThread => // When using a local file system, "writeBatch" must be called on a // [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled -// while writing the batch file. This is because there is a potential dead-lock in -// Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread running -// "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our case, -// `writeBatch` creates a file using HDFS API and will call "Shell.runCommand" to set -// the file permission if using the local file system, and can get deadlocked if the -// stream execution thread is stopped by interrupt. Hence, we make sure that -// "writeBatch" is called on [[UninterruptibleThread]] which allows us to disable -// interrupts here. Also see SPARK-14131. -ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) } +// while writing the batch file. +// +// This is because Hadoop "Shell.runCommand" swallows InterruptException (HADOOP-14084). +// If the user tries to stop a query, and the thread running "Shell.runCommand" is +// interrupted, then InterruptException will be dropped and the query will be still +// running. (Note: `writeBatch` creates a file using HDFS APIs and will call +// "Shell.runCommand" to set the file permission if using the local file system) +// +// Hence, we make sure that "writeBatch" is called on [[UninterruptibleThread]] which +// allows us to disable interrupts here, in order to propagate the interrupt state +// correctly. Also see SPARK-19599. +ut.runUninterruptibly { writeBatch(batchId, metadata) } case _ => throw new IllegalStateException( "HDFSMetadataLog.add() on a local file system must be executed on " + @@ -132,20 +135,19 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: // For a distributed file system, such as HDFS or S3, if the network is broken, write // operations may just hang until timeout. We should enable interrupts to allow stopping // the query fast. -writeBatch(batchId, metadata, serialize) +writeBatch(batchId, metadata) } true } } - def writeTempBatch(metadata: T, writer: (T, Outp
spark git commit: [SPARK-19599][SS] Clean up HDFSMetadataLog
Repository: spark Updated Branches: refs/heads/master f6c3bba22 -> 21b4ba2d6 [SPARK-19599][SS] Clean up HDFSMetadataLog ## What changes were proposed in this pull request? SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some cleanup for HDFSMetadataLog. This PR includes the following changes: - ~~Remove the workaround codes for HADOOP-10622.~~ Unfortunately, there is another issue [HADOOP-14084](https://issues.apache.org/jira/browse/HADOOP-14084) that prevents us from removing the workaround codes. - Remove unnecessary `writer: (T, OutputStream) => Unit` and just call `serialize` directly. - Remove catching FileNotFoundException. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #16932 from zsxwing/metadata-cleanup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21b4ba2d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21b4ba2d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21b4ba2d Branch: refs/heads/master Commit: 21b4ba2d6f21a9759af879471715c123073bd67a Parents: f6c3bba Author: Shixiong Zhu Authored: Wed Feb 15 16:21:43 2017 -0800 Committer: Shixiong Zhu Committed: Wed Feb 15 16:21:43 2017 -0800 -- .../execution/streaming/HDFSMetadataLog.scala | 39 +--- .../execution/streaming/StreamExecution.scala | 4 +- 2 files changed, 19 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/21b4ba2d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index bfdc2cb..3155ce0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -114,15 +114,18 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: case ut: UninterruptibleThread => // When using a local file system, "writeBatch" must be called on a // [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled -// while writing the batch file. This is because there is a potential dead-lock in -// Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread running -// "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our case, -// `writeBatch` creates a file using HDFS API and will call "Shell.runCommand" to set -// the file permission if using the local file system, and can get deadlocked if the -// stream execution thread is stopped by interrupt. Hence, we make sure that -// "writeBatch" is called on [[UninterruptibleThread]] which allows us to disable -// interrupts here. Also see SPARK-14131. -ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) } +// while writing the batch file. +// +// This is because Hadoop "Shell.runCommand" swallows InterruptException (HADOOP-14084). +// If the user tries to stop a query, and the thread running "Shell.runCommand" is +// interrupted, then InterruptException will be dropped and the query will be still +// running. (Note: `writeBatch` creates a file using HDFS APIs and will call +// "Shell.runCommand" to set the file permission if using the local file system) +// +// Hence, we make sure that "writeBatch" is called on [[UninterruptibleThread]] which +// allows us to disable interrupts here, in order to propagate the interrupt state +// correctly. Also see SPARK-19599. +ut.runUninterruptibly { writeBatch(batchId, metadata) } case _ => throw new IllegalStateException( "HDFSMetadataLog.add() on a local file system must be executed on " + @@ -132,20 +135,19 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: // For a distributed file system, such as HDFS or S3, if the network is broken, write // operations may just hang until timeout. We should enable interrupts to allow stopping // the query fast. -writeBatch(batchId, metadata, serialize) +writeBatch(batchId, metadata) } true } } - def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = serialize): Option[Path] = { -var nextId = 0 + def writeTempBatch(metadata: T): O
spark git commit: [SPARK-18937][SQL] Timezone support in CSV/JSON parsing
Repository: spark Updated Branches: refs/heads/master 6a9a85b84 -> 865b2fd84 [SPARK-18937][SQL] Timezone support in CSV/JSON parsing ## What changes were proposed in this pull request? This is a follow-up pr of #16308. This pr enables timezone support in CSV/JSON parsing. We should introduce `timeZone` option for CSV/JSON datasources (the default value of the option is session local timezone). The datasources should use the `timeZone` option to format/parse to write/read timestamp values. Notice that while reading, if the timestampFormat has the timezone info, the timezone will not be used because we should respect the timezone in the values. For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values written with the default timezone option, which is `"GMT"` because session local timezone is `"GMT"` here, are: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "GMT") scala> val df = Seq(new java.sql.Timestamp(145160640L)).toDF("ts") df: org.apache.spark.sql.DataFrame = [ts: timestamp] scala> df.show() +---+ |ts | +---+ |2016-01-01 00:00:00| +---+ scala> df.write.json("/path/to/gmtjson") ``` ```sh $ cat /path/to/gmtjson/part-* {"ts":"2016-01-01T00:00:00.000Z"} ``` whereas setting the option to `"PST"`, they are: ```scala scala> df.write.option("timeZone", "PST").json("/path/to/pstjson") ``` ```sh $ cat /path/to/pstjson/part-* {"ts":"2015-12-31T16:00:00.000-08:00"} ``` We can properly read these files even if the timezone option is wrong because the timestamp values have timezone info: ```scala scala> val schema = new StructType().add("ts", TimestampType) schema: org.apache.spark.sql.types.StructType = StructType(StructField(ts,TimestampType,true)) scala> spark.read.schema(schema).json("/path/to/gmtjson").show() +---+ |ts | +---+ |2016-01-01 00:00:00| +---+ scala> spark.read.schema(schema).option("timeZone", "PST").json("/path/to/gmtjson").show() +---+ |ts | +---+ |2016-01-01 00:00:00| +---+ ``` And even if `timezoneFormat` doesn't contain timezone info, we can properly read the values with setting correct timezone option: ```scala scala> df.write.option("timestampFormat", "-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson") ``` ```sh $ cat /path/to/jstjson/part-* {"ts":"2016-01-01T09:00:00"} ``` ```scala // wrong result scala> spark.read.schema(schema).option("timestampFormat", "-MM-dd'T'HH:mm:ss").json("/path/to/jstjson").show() +---+ |ts | +---+ |2016-01-01 09:00:00| +---+ // correct result scala> spark.read.schema(schema).option("timestampFormat", "-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson").show() +---+ |ts | +---+ |2016-01-01 00:00:00| +---+ ``` This pr also makes `JsonToStruct` and `StructToJson` `TimeZoneAwareExpression` to be able to evaluate values with timezone option. ## How was this patch tested? Existing tests and added some tests. Author: Takuya UESHIN Closes #16750 from ueshin/issues/SPARK-18937. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/865b2fd8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/865b2fd8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/865b2fd8 Branch: refs/heads/master Commit: 865b2fd84c6f82de147540c8f17bbe0f0d9fb69c Parents: 6a9a85b Author: Takuya UESHIN Authored: Wed Feb 15 13:26:34 2017 -0800 Committer: Wenchen Fan Committed: Wed Feb 15 13:26:34 2017 -0800 -- python/pyspark/sql/readwriter.py| 43 --- python/pyspark/sql/streaming.py | 20 ++-- .../catalyst/expressions/jsonExpressions.scala | 30 - .../spark/sql/catalyst/json/JSONOptions.scala | 11 +- .../sql/catalyst/json/JacksonGenerator.scala| 2 +- .../expressions/JsonExpressionsSuite.scala | 113 --- .../org/apache/spark/sql/DataFrameReader.scala | 8 +- .../org/apache/spark/sql/DataFrameWriter.scala | 4 + .../scala/org/apache/spark/sql/Dataset.scala| 6 +- .../datasources/csv/CSVFileFormat.scala | 8 +- .../execution/datasources/csv/CSVOptions.scala | 21 ++-- .../datasources/csv/UnivocityGenerator.scala| 2 +- .../datasources/csv/UnivocityParser.scala | 2 +- .../datasources/json/JsonFileFormat.scala | 9 +- .../spark/sql/streaming/DataStreamReader.scala | 4 + .../datasources/csv/CSVInferSchemaSuite.scala | 22 ++-- .../execution/datasources/csv/CSVSuite.scala| 44 +++- .../datasources/csv/UnivocityParserSuite.scala | 73 +++- .../execution/datasources/json/J
spark git commit: [SPARK-19329][SQL] Reading from or writing to a datasource table with a non pre-existing location should succeed
Repository: spark Updated Branches: refs/heads/master 59dc26e37 -> 6a9a85b84 [SPARK-19329][SQL] Reading from or writing to a datasource table with a non pre-existing location should succeed ## What changes were proposed in this pull request? when we insert data into a datasource table use `sqlText`, and the table has an not exists location, this will throw an Exception. example: ``` spark.sql("create table t(a string, b int) using parquet") spark.sql("alter table t set location '/xx'") spark.sql("insert into table t select 'c', 1") ``` Exception: ``` com.google.common.util.concurrent.UncheckedExecutionException: org.apache.spark.sql.AnalysisException: Path does not exist: /xx; at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4814) at com.google.common.cache.LocalCache$LocalLoadingCache.apply(LocalCache.java:4830) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:122) at org.apache.spark.sql.hive.HiveSessionCatalog.lookupRelation(HiveSessionCatalog.scala:69) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:456) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:465) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:463) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:463) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:453) ``` As discussed following comments, we should unify the action when we reading from or writing to a datasource table with a non pre-existing locaiton: 1. reading from a datasource table: return 0 rows 2. writing to a datasource table: write data successfully ## How was this patch tested? unit test added Author: windpiger Closes #16672 from windpiger/insertNotExistLocation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a9a85b8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a9a85b8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a9a85b8 Branch: refs/heads/master Commit: 6a9a85b84decc2cbe1a0d8791118a0f91a62aa3f Parents: 59dc26e Author: windpiger Authored: Wed Feb 15 13:21:48 2017 -0800 Committer: Xiao Li Committed: Wed Feb 15 13:21:48 2017 -0800 -- .../datasources/DataSourceStrategy.scala| 3 +- .../spark/sql/execution/command/DDLSuite.scala | 119 +++ 2 files changed, 121 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6a9a85b8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index d8a5158..f429232 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -233,7 +233,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] // TODO: improve `InMemoryCatalog` and remove this limitation. catalogTable = if (withHiveSupport) Some(table) else None) -LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table)) +LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), + catalogTable = Some(table)) } }) } http://git-wip-us.apache.org/repos/asf/spark/blob/6a9a85b8/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 278d247..e1a3b24 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1832,4 +1
spark git commit: [SPARK-19607][HOTFIX] Finding QueryExecution that matches provided executionId
Repository: spark Updated Branches: refs/heads/master 3755da76c -> 59dc26e37 [SPARK-19607][HOTFIX] Finding QueryExecution that matches provided executionId ## What changes were proposed in this pull request? #16940 adds a test case which does not stop the spark job. It causes many failures of other test cases. - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.7/2403/consoleFull - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/2600/consoleFull ``` [info] org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at: ``` ## How was this patch tested? Pass the Jenkins test. Author: Dongjoon Hyun Closes #16943 from dongjoon-hyun/SPARK-19607-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59dc26e3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59dc26e3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59dc26e3 Branch: refs/heads/master Commit: 59dc26e378c5960a955ad238fdf1c9745c732c8a Parents: 3755da7 Author: Dongjoon Hyun Authored: Wed Feb 15 21:57:49 2017 +0100 Committer: Reynold Xin Committed: Wed Feb 15 21:57:49 2017 +0100 -- .../scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/59dc26e3/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala index b059706..fe78a76 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala @@ -129,6 +129,8 @@ class SQLExecutionSuite extends SparkFunSuite { df.collect() assert(df.queryExecution === queryExecution) + +spark.stop() } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column
Repository: spark Updated Branches: refs/heads/branch-2.1 8ee4ec812 -> 6c3539906 [SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column manual, unit tests Author: Felix Cheung Closes #16739 from felixcheung/rcoalesce. (cherry picked from commit 671bc08ed502815bfa2254c30d64149402acb0c7) Signed-off-by: Felix Cheung Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c353990 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c353990 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c353990 Branch: refs/heads/branch-2.1 Commit: 6c35399068f1035fec6d5f909a83a5b1683702e0 Parents: 8ee4ec8 Author: Felix Cheung Authored: Wed Feb 15 10:45:37 2017 -0800 Committer: Felix Cheung Committed: Wed Feb 15 10:57:08 2017 -0800 -- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 46 ++-- R/pkg/R/RDD.R | 4 +- R/pkg/R/functions.R | 26 ++- R/pkg/R/generics.R | 9 +++- R/pkg/inst/tests/testthat/test_rdd.R| 2 +- R/pkg/inst/tests/testthat/test_sparkSQL.R | 33 +++--- .../main/scala/org/apache/spark/rdd/RDD.scala | 3 +- python/pyspark/sql/dataframe.py | 10 - .../scala/org/apache/spark/sql/Dataset.scala| 10 - .../sql/execution/basicPhysicalOperators.scala | 10 - 11 files changed, 136 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6c353990/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 62a20e6..6f96b96 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -80,6 +80,7 @@ exportMethods("arrange", "as.data.frame", "attach", "cache", + "coalesce", "collect", "colnames", "colnames<-", http://git-wip-us.apache.org/repos/asf/spark/blob/6c353990/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index c4147c5..986f1f1 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -680,14 +680,53 @@ setMethod("storageLevel", storageLevelToString(callJMethod(x@sdf, "storageLevel")) }) +#' Coalesce +#' +#' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions. +#' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100 +#' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of +#' the current partitions. If a larger number of partitions is requested, it will stay at the +#' current number of partitions. +#' +#' However, if you're doing a drastic coalesce on a SparkDataFrame, e.g. to numPartitions = 1, +#' this may result in your computation taking place on fewer nodes than +#' you like (e.g. one node in the case of numPartitions = 1). To avoid this, +#' call \code{repartition}. This will add a shuffle step, but means the +#' current upstream partitions will be executed in parallel (per whatever +#' the current partitioning is). +#' +#' @param numPartitions the number of partitions to use. +#' +#' @family SparkDataFrame functions +#' @rdname coalesce +#' @name coalesce +#' @aliases coalesce,SparkDataFrame-method +#' @seealso \link{repartition} +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' path <- "path/to/file.json" +#' df <- read.json(path) +#' newDF <- coalesce(df, 1L) +#'} +#' @note coalesce(SparkDataFrame) since 2.1.1 +setMethod("coalesce", + signature(x = "SparkDataFrame"), + function(x, numPartitions) { +stopifnot(is.numeric(numPartitions)) +sdf <- callJMethod(x@sdf, "coalesce", numToInt(numPartitions)) +dataFrame(sdf) + }) + #' Repartition #' #' The following options for repartition are possible: #' \itemize{ -#' \item{1.} {Return a new SparkDataFrame partitioned by +#' \item{1.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.} +#' \item{2.} {Return a new SparkDataFrame hash partitioned by #' the given columns into \code{numPartitions}.} -#' \item{2.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.} -#' \item{3.} {Return a new SparkDataFrame partitioned by the given column(s), +#' \item{3.} {Return a new SparkDataFrame hash partitioned by the given column(s), #' using \code{spark.sql.shuffle.partitions} as number of partitions.} #'} #' @param x
spark git commit: [SPARK-19331][SQL][TESTS] Improve the test coverage of SQLViewSuite
Repository: spark Updated Branches: refs/heads/master 671bc08ed -> 3755da76c [SPARK-19331][SQL][TESTS] Improve the test coverage of SQLViewSuite Move `SQLViewSuite` from `sql/hive` to `sql/core`, so we can test the view supports without hive metastore. Also moved the test cases that specified to hive to `HiveSQLViewSuite`. Improve the test coverage of SQLViewSuite, cover the following cases: 1. view resolution(possibly a referenced table/view have changed after the view creation); 2. handle a view with user specified column names; 3. improve the test cases for a nested view. Also added a test case for cyclic view reference, which is a known issue that is not fixed yet. N/A Author: jiangxingbo Closes #16674 from jiangxb1987/view-test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3755da76 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3755da76 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3755da76 Branch: refs/heads/master Commit: 3755da76c3821e8e6a4f359c43243a51a06559ca Parents: 671bc08 Author: jiangxingbo Authored: Wed Feb 15 10:46:54 2017 -0800 Committer: Wenchen Fan Committed: Wed Feb 15 10:47:11 2017 -0800 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 11 +- .../catalyst/analysis/NoSuchItemException.scala | 2 +- .../spark/sql/execution/SQLViewSuite.scala | 620 +++ .../spark/sql/test/SharedSQLContext.scala | 11 +- .../sql/hive/execution/HiveSQLViewSuite.scala | 140 .../spark/sql/hive/execution/SQLViewSuite.scala | 762 --- 6 files changed, 773 insertions(+), 773 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3755da76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6aa0e8d..cd517a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -620,13 +620,18 @@ class Analyzer( private def lookupTableFromCatalog( u: UnresolvedRelation, defaultDatabase: Option[String] = None): LogicalPlan = { + val tableIdentWithDb = u.tableIdentifier.copy( +database = u.tableIdentifier.database.orElse(defaultDatabase)) try { -val tableIdentWithDb = u.tableIdentifier.copy( - database = u.tableIdentifier.database.orElse(defaultDatabase)) catalog.lookupRelation(tableIdentWithDb, u.alias) } catch { case _: NoSuchTableException => - u.failAnalysis(s"Table or view not found: ${u.tableName}") + u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}") +// If the database is defined and that database is not found, throw an AnalysisException. +// Note that if the database is not defined, it is possible we are looking up a temp view. +case e: NoSuchDatabaseException => + u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}, the " + +s"database ${e.db} doesn't exsits.") } } http://git-wip-us.apache.org/repos/asf/spark/blob/3755da76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala index 8febdca..f5aae60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec * Thrown by a catalog when an item cannot be found. The analyzer will rethrow the exception * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information. */ -class NoSuchDatabaseException(db: String) extends AnalysisException(s"Database '$db' not found") +class NoSuchDatabaseException(val db: String) extends AnalysisException(s"Database '$db' not found") class NoSuchTableException(db: String, table: String) extends AnalysisException(s"Table or view '$table' not found in database '$db'") http://git-wip-us.apache.org/repos/asf/spark/blob/3755da76/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala --
spark git commit: [SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column
Repository: spark Updated Branches: refs/heads/master c97f4e17d -> 671bc08ed [SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column ## What changes were proposed in this pull request? Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column ## How was this patch tested? manual, unit tests Author: Felix Cheung Closes #16739 from felixcheung/rcoalesce. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/671bc08e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/671bc08e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/671bc08e Branch: refs/heads/master Commit: 671bc08ed502815bfa2254c30d64149402acb0c7 Parents: c97f4e1 Author: Felix Cheung Authored: Wed Feb 15 10:45:37 2017 -0800 Committer: Felix Cheung Committed: Wed Feb 15 10:45:37 2017 -0800 -- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 46 ++-- R/pkg/R/RDD.R | 4 +- R/pkg/R/functions.R | 26 ++- R/pkg/R/generics.R | 9 +++- R/pkg/inst/tests/testthat/test_rdd.R| 2 +- R/pkg/inst/tests/testthat/test_sparkSQL.R | 32 +++--- .../main/scala/org/apache/spark/rdd/RDD.scala | 3 +- python/pyspark/sql/dataframe.py | 10 - .../scala/org/apache/spark/sql/Dataset.scala| 10 - .../sql/execution/basicPhysicalOperators.scala | 10 - 11 files changed, 135 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/671bc08e/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 8b26500..81e1936 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -82,6 +82,7 @@ exportMethods("arrange", "as.data.frame", "attach", "cache", + "coalesce", "collect", "colnames", "colnames<-", http://git-wip-us.apache.org/repos/asf/spark/blob/671bc08e/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 5bca410..cf331ba 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -678,14 +678,53 @@ setMethod("storageLevel", storageLevelToString(callJMethod(x@sdf, "storageLevel")) }) +#' Coalesce +#' +#' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions. +#' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100 +#' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of +#' the current partitions. If a larger number of partitions is requested, it will stay at the +#' current number of partitions. +#' +#' However, if you're doing a drastic coalesce on a SparkDataFrame, e.g. to numPartitions = 1, +#' this may result in your computation taking place on fewer nodes than +#' you like (e.g. one node in the case of numPartitions = 1). To avoid this, +#' call \code{repartition}. This will add a shuffle step, but means the +#' current upstream partitions will be executed in parallel (per whatever +#' the current partitioning is). +#' +#' @param numPartitions the number of partitions to use. +#' +#' @family SparkDataFrame functions +#' @rdname coalesce +#' @name coalesce +#' @aliases coalesce,SparkDataFrame-method +#' @seealso \link{repartition} +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' path <- "path/to/file.json" +#' df <- read.json(path) +#' newDF <- coalesce(df, 1L) +#'} +#' @note coalesce(SparkDataFrame) since 2.1.1 +setMethod("coalesce", + signature(x = "SparkDataFrame"), + function(x, numPartitions) { +stopifnot(is.numeric(numPartitions)) +sdf <- callJMethod(x@sdf, "coalesce", numToInt(numPartitions)) +dataFrame(sdf) + }) + #' Repartition #' #' The following options for repartition are possible: #' \itemize{ -#' \item{1.} {Return a new SparkDataFrame partitioned by +#' \item{1.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.} +#' \item{2.} {Return a new SparkDataFrame hash partitioned by #' the given columns into \code{numPartitions}.} -#' \item{2.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.} -#' \item{3.} {Return a new SparkDataFrame partitioned by the given column(s), +#' \item{3.} {Return a new SparkDataFrame hash partitioned by the given column(s), #' using \code{spark.sql.shuffle.partitions} as number of partitions.} #'} #' @param x a SparkDataFrame. @@ -6
spark git commit: [SPARK-19160][PYTHON][SQL] Add udf decorator
Repository: spark Updated Branches: refs/heads/master 6eca21ba8 -> c97f4e17d [SPARK-19160][PYTHON][SQL] Add udf decorator ## What changes were proposed in this pull request? This PR adds `udf` decorator syntax as proposed in [SPARK-19160](https://issues.apache.org/jira/browse/SPARK-19160). This allows users to define UDF using simplified syntax: ```python from pyspark.sql.decorators import udf udf(IntegerType()) def add_one(x): """Adds one""" if x is not None: return x + 1 ``` without need to define a separate function and udf. ## How was this patch tested? Existing unit tests to ensure backward compatibility and additional unit tests covering new functionality. Author: zero323 Closes #16533 from zero323/SPARK-19160. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c97f4e17 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c97f4e17 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c97f4e17 Branch: refs/heads/master Commit: c97f4e17de0ce39e8172a5a4ae81f1914816a358 Parents: 6eca21b Author: zero323 Authored: Wed Feb 15 10:16:34 2017 -0800 Committer: Holden Karau Committed: Wed Feb 15 10:16:34 2017 -0800 -- python/pyspark/sql/functions.py | 41 +- python/pyspark/sql/tests.py | 57 2 files changed, 91 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c97f4e17/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 4f4ae10..d261720 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -20,6 +20,7 @@ A collections of builtin functions """ import math import sys +import functools if sys.version < "3": from itertools import imap as map @@ -1908,22 +1909,48 @@ class UserDefinedFunction(object): @since(1.3) -def udf(f, returnType=StringType()): +def udf(f=None, returnType=StringType()): """Creates a :class:`Column` expression representing a user defined function (UDF). .. note:: The user-defined functions must be deterministic. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query. -:param f: python function -:param returnType: a :class:`pyspark.sql.types.DataType` object or data type string. +:param f: python function if used as a standalone function +:param returnType: a :class:`pyspark.sql.types.DataType` object >>> from pyspark.sql.types import IntegerType >>> slen = udf(lambda s: len(s), IntegerType()) ->>> df.select(slen(df.name).alias('slen')).collect() -[Row(slen=5), Row(slen=3)] -""" -return UserDefinedFunction(f, returnType) +>>> @udf +... def to_upper(s): +... if s is not None: +... return s.upper() +... +>>> @udf(returnType=IntegerType()) +... def add_one(x): +... if x is not None: +... return x + 1 +... +>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) +>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show() ++--+--++ +|slen(name)|to_upper(name)|add_one(age)| ++--+--++ +| 8| JOHN DOE| 22| ++--+--++ +""" +def _udf(f, returnType=StringType()): +return UserDefinedFunction(f, returnType) + +# decorator @udf, @udf() or @udf(dataType()) +if f is None or isinstance(f, (str, DataType)): +# If DataType has been passed as a positional argument +# for decorator use it as a returnType +return_type = f or returnType +return functools.partial(_udf, returnType=return_type) +else: +return _udf(f=f, returnType=returnType) + blacklist = ['map', 'since', 'ignore_unicode_prefix'] __all__ = [k for k, v in globals().items() http://git-wip-us.apache.org/repos/asf/spark/blob/c97f4e17/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 62e1a8c..d8b7b31 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -514,6 +514,63 @@ class SQLTests(ReusedPySparkTestCase): non_callable = None self.assertRaises(TypeError, UserDefinedFunction, non_callable, StringType()) +def test_udf_with_decorator(self): +from pyspark.sql.functions import lit, udf +from pyspark.sql.types import IntegerType, DoubleType + +@udf(Integer
spark git commit: [SPARK-19590][PYSPARK][ML] Update the document for QuantileDiscretizer in pyspark
Repository: spark Updated Branches: refs/heads/master acf71c63c -> 6eca21ba8 [SPARK-19590][PYSPARK][ML] Update the document for QuantileDiscretizer in pyspark ## What changes were proposed in this pull request? This PR is to document the changes on QuantileDiscretizer in pyspark for PR: https://github.com/apache/spark/pull/15428 ## How was this patch tested? No test needed Signed-off-by: VinceShieh Author: VinceShieh Closes #16922 from VinceShieh/spark-19590. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6eca21ba Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6eca21ba Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6eca21ba Branch: refs/heads/master Commit: 6eca21ba881120f1ac7854621380ef8a92972384 Parents: acf71c6 Author: VinceShieh Authored: Wed Feb 15 10:12:07 2017 -0800 Committer: Holden Karau Committed: Wed Feb 15 10:12:07 2017 -0800 -- python/pyspark/ml/feature.py | 12 +++- 1 file changed, 11 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6eca21ba/python/pyspark/ml/feature.py -- diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index ac90c89..1ab4291 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -1178,7 +1178,17 @@ class QuantileDiscretizer(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadab `QuantileDiscretizer` takes a column with continuous features and outputs a column with binned categorical features. The number of bins can be set using the :py:attr:`numBuckets` parameter. -The bin ranges are chosen using an approximate algorithm (see the documentation for +It is possible that the number of buckets used will be less than this value, for example, if +there are too few distinct values of the input to create enough distinct quantiles. + +NaN handling: Note also that +QuantileDiscretizer will raise an error when it finds NaN values in the dataset, but the user +can also choose to either keep or remove NaN values within the dataset by setting +:py:attr:`handleInvalid` parameter. If the user chooses to keep NaN values, they will be +handled specially and placed into their own bucket, for example, if 4 buckets are used, then +non-NaN data will be put into buckets[0-3], but NaNs will be counted in a special bucket[4]. + +Algorithm: The bin ranges are chosen using an approximate algorithm (see the documentation for :py:meth:`~.DataFrameStatFunctions.approxQuantile` for a detailed description). The precision of the approximation can be controlled with the :py:attr:`relativeError` parameter. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16475][SQL] broadcast hint for SQL queries - disallow space as the delimiter
Repository: spark Updated Branches: refs/heads/master a8a139820 -> acf71c63c [SPARK-16475][SQL] broadcast hint for SQL queries - disallow space as the delimiter ## What changes were proposed in this pull request? A follow-up to disallow space as the delimiter in broadcast hint. ## How was this patch tested? Jenkins test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh Closes #16941 from viirya/disallow-space-delimiter. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acf71c63 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acf71c63 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acf71c63 Branch: refs/heads/master Commit: acf71c63cdde8dced8d108260cdd35e1cc992248 Parents: a8a1398 Author: Liang-Chi Hsieh Authored: Wed Feb 15 18:48:02 2017 +0100 Committer: Reynold Xin Committed: Wed Feb 15 18:48:02 2017 +0100 -- .../org/apache/spark/sql/catalyst/parser/SqlBase.g4 | 1 - .../spark/sql/catalyst/parser/PlanParserSuite.scala | 10 -- 2 files changed, 8 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/acf71c63/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 -- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 0ac986d..d8cd68e 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -380,7 +380,6 @@ hint hintStatement : hintName=identifier -| hintName=identifier '(' parameters+=identifier parameters+=identifier ')' | hintName=identifier '(' parameters+=identifier (',' parameters+=identifier)* ')' ; http://git-wip-us.apache.org/repos/asf/spark/blob/acf71c63/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 13a84b4..2c14252 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -505,7 +505,13 @@ class PlanParserSuite extends PlanTest { val m2 = intercept[ParseException] { parsePlan("SELECT /*+ MAPJOIN(default.t) */ * from default.t") }.getMessage -assert(m2.contains("no viable alternative at input")) +assert(m2.contains("mismatched input '.' expecting {')', ','}")) + +// Disallow space as the delimiter. +val m3 = intercept[ParseException] { + parsePlan("SELECT /*+ INDEX(a b c) */ * from default.t") +}.getMessage +assert(m3.contains("mismatched input 'b' expecting {')', ','}")) comparePlans( parsePlan("SELECT /*+ HINT */ * FROM t"), @@ -524,7 +530,7 @@ class PlanParserSuite extends PlanTest { Hint("STREAMTABLE", Seq("a", "b", "c"), table("t").select(star( comparePlans( - parsePlan("SELECT /*+ INDEX(t emp_job_ix) */ * FROM t"), + parsePlan("SELECT /*+ INDEX(t, emp_job_ix) */ * FROM t"), Hint("INDEX", Seq("t", "emp_job_ix"), table("t").select(star( comparePlans( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18872][SQL][TESTS] New test cases for EXISTS subquery (Joins + CTE)
Repository: spark Updated Branches: refs/heads/master 5ad10c531 -> a8a139820 [SPARK-18872][SQL][TESTS] New test cases for EXISTS subquery (Joins + CTE) ## What changes were proposed in this pull request? This PR adds the third and final set of tests for EXISTS subquery. File name| Brief description | - exists-cte.sql |Tests Exist subqueries referencing CTE exists-joins-and-set-ops.sql|Tests Exists subquery used in Joins (Both when joins occurs in outer and suquery blocks) DB2 results are attached here as reference : [exists-cte-db2.txt](https://github.com/apache/spark/files/752091/exists-cte-db2.txt) [exists-joins-and-set-ops-db2.txt](https://github.com/apache/spark/files/753283/exists-joins-and-set-ops-db2.txt) (updated) ## How was this patch tested? The test result is compared with the result run from another SQL engine (in this case is IBM DB2). If the result are equivalent, we assume the result is correct. Author: Dilip Biswal Closes #16802 from dilipbiswal/exists-pr3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8a13982 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8a13982 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8a13982 Branch: refs/heads/master Commit: a8a139820c4a77a0b017b621bec6273cc09c8476 Parents: 5ad10c5 Author: Dilip Biswal Authored: Wed Feb 15 17:34:05 2017 +0100 Committer: Herman van Hovell Committed: Wed Feb 15 17:34:05 2017 +0100 -- .../subquery/exists-subquery/exists-cte.sql | 142 .../exists-joins-and-set-ops.sql| 228 .../subquery/exists-subquery/exists-cte.sql.out | 200 ++ .../exists-joins-and-set-ops.sql.out| 363 +++ 4 files changed, 933 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a8a13982/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-cte.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-cte.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-cte.sql new file mode 100644 index 000..c678483 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-cte.sql @@ -0,0 +1,142 @@ +-- Tests EXISTS subquery used along with +-- Common Table Expressions(CTE) + +CREATE TEMPORARY VIEW EMP AS SELECT * FROM VALUES + (100, "emp 1", date "2005-01-01", 100.00D, 10), + (100, "emp 1", date "2005-01-01", 100.00D, 10), + (200, "emp 2", date "2003-01-01", 200.00D, 10), + (300, "emp 3", date "2002-01-01", 300.00D, 20), + (400, "emp 4", date "2005-01-01", 400.00D, 30), + (500, "emp 5", date "2001-01-01", 400.00D, NULL), + (600, "emp 6 - no dept", date "2001-01-01", 400.00D, 100), + (700, "emp 7", date "2010-01-01", 400.00D, 100), + (800, "emp 8", date "2016-01-01", 150.00D, 70) +AS EMP(id, emp_name, hiredate, salary, dept_id); + +CREATE TEMPORARY VIEW DEPT AS SELECT * FROM VALUES + (10, "dept 1", "CA"), + (20, "dept 2", "NY"), + (30, "dept 3", "TX"), + (40, "dept 4 - unassigned", "OR"), + (50, "dept 5 - unassigned", "NJ"), + (70, "dept 7", "FL") +AS DEPT(dept_id, dept_name, state); + +CREATE TEMPORARY VIEW BONUS AS SELECT * FROM VALUES + ("emp 1", 10.00D), + ("emp 1", 20.00D), + ("emp 2", 300.00D), + ("emp 2", 100.00D), + ("emp 3", 300.00D), + ("emp 4", 100.00D), + ("emp 5", 1000.00D), + ("emp 6 - no dept", 500.00D) +AS BONUS(emp_name, bonus_amt); + +-- CTE used inside subquery with correlated condition +-- TC.01.01 +WITH bonus_cte + AS (SELECT * + FROM bonus + WHERE EXISTS (SELECT dept.dept_id, + emp.emp_name, + Max(salary), + Count(*) + FROM emp + JOIN dept + ON dept.dept_id = emp.dept_id + WHERE bonus.emp_name = emp.emp_name + GROUP BY dept.dept_id, +emp.emp_name + ORDER BY emp.emp_name)) +SELECT * +FROM bonus a +WHERE a.bonus_amt > 30 + AND EXISTS (SELECT 1 + FROM bonus_cte b + WHERE a.emp_name = b.emp_name); + +-- Inner join between two CTEs with correlated condition +-- TC.01.02 +WITH emp_cte + AS (SELECT * + FROM emp + WHERE id >= 100 +AND id <= 300), + dept_cte + AS (SELECT * + FROM dept + WHERE dept_id = 10) +SELECT * +FROM bonus +WHERE EXISTS (SELECT * +
spark git commit: [SPARK-18873][SQL][TEST] New test cases for scalar subquery (part 2 of 2) - scalar subquery in predicate context
Repository: spark Updated Branches: refs/heads/master d22db6278 -> 5ad10c531 [SPARK-18873][SQL][TEST] New test cases for scalar subquery (part 2 of 2) - scalar subquery in predicate context ## What changes were proposed in this pull request? This PR adds new test cases for scalar subquery in predicate context ## How was this patch tested? The test result is compared with the result run from another SQL engine (in this case is IBM DB2). If the result are equivalent, we assume the result is correct. Author: Nattavut Sutyanyong Closes #16798 from nsyca/18873-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ad10c53 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ad10c53 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ad10c53 Branch: refs/heads/master Commit: 5ad10c53102ac2f77c47bfd8c977e7beef55ea10 Parents: d22db62 Author: Nattavut Sutyanyong Authored: Wed Feb 15 17:30:55 2017 +0100 Committer: Herman van Hovell Committed: Wed Feb 15 17:30:55 2017 +0100 -- .../sql-tests/inputs/scalar-subquery.sql| 20 - .../scalar-subquery-predicate.sql | 271 .../sql-tests/results/scalar-subquery.sql.out | 46 -- .../scalar-subquery-predicate.sql.out | 430 +++ 4 files changed, 701 insertions(+), 66 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5ad10c53/sql/core/src/test/resources/sql-tests/inputs/scalar-subquery.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/scalar-subquery.sql b/sql/core/src/test/resources/sql-tests/inputs/scalar-subquery.sql deleted file mode 100644 index 3acc9db..000 --- a/sql/core/src/test/resources/sql-tests/inputs/scalar-subquery.sql +++ /dev/null @@ -1,20 +0,0 @@ -CREATE OR REPLACE TEMPORARY VIEW p AS VALUES (1, 1) AS T(pk, pv); -CREATE OR REPLACE TEMPORARY VIEW c AS VALUES (1, 1) AS T(ck, cv); - --- SPARK-18814.1: Simplified version of TPCDS-Q32 -SELECT pk, cv -FROM p, c -WHERE p.pk = c.ck -ANDc.cv = (SELECT avg(c1.cv) - FROM c c1 - WHERE c1.ck = p.pk); - --- SPARK-18814.2: Adding stack of aggregates -SELECT pk, cv -FROM p, c -WHERE p.pk = c.ck -ANDc.cv = (SELECT max(avg) - FROM (SELECT c1.cv, avg(c1.cv) avg - FROM c c1 - WHEREc1.ck = p.pk - GROUP BY c1.cv)); http://git-wip-us.apache.org/repos/asf/spark/blob/5ad10c53/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql new file mode 100644 index 000..fb0d07f --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql @@ -0,0 +1,271 @@ +-- A test suite for scalar subquery in predicate context + +CREATE OR REPLACE TEMPORARY VIEW p AS VALUES (1, 1) AS T(pk, pv); +CREATE OR REPLACE TEMPORARY VIEW c AS VALUES (1, 1) AS T(ck, cv); + +-- SPARK-18814.1: Simplified version of TPCDS-Q32 +SELECT pk, cv +FROM p, c +WHERE p.pk = c.ck +ANDc.cv = (SELECT avg(c1.cv) + FROM c c1 + WHERE c1.ck = p.pk); + +-- SPARK-18814.2: Adding stack of aggregates +SELECT pk, cv +FROM p, c +WHERE p.pk = c.ck +ANDc.cv = (SELECT max(avg) + FROM (SELECT c1.cv, avg(c1.cv) avg + FROM c c1 + WHEREc1.ck = p.pk + GROUP BY c1.cv)); + +create temporary view t1 as select * from values + ('val1a', 6S, 8, 10L, float(15.0), 20D, 20E2, timestamp '2014-04-04 00:00:00.000', date '2014-04-04'), + ('val1b', 8S, 16, 19L, float(17.0), 25D, 26E2, timestamp '2014-05-04 01:01:00.000', date '2014-05-04'), + ('val1a', 16S, 12, 21L, float(15.0), 20D, 20E2, timestamp '2014-06-04 01:02:00.001', date '2014-06-04'), + ('val1a', 16S, 12, 10L, float(15.0), 20D, 20E2, timestamp '2014-07-04 01:01:00.000', date '2014-07-04'), + ('val1c', 8S, 16, 19L, float(17.0), 25D, 26E2, timestamp '2014-05-04 01:02:00.001', date '2014-05-05'), + ('val1d', null, 16, 22L, float(17.0), 25D, 26E2, timestamp '2014-06-04 01:01:00.000', null), + ('val1d', null, 16, 19L, float(17.0), 25D, 26E2, timestamp '2014-07-04 01:02:00.001', null), + ('val1e', 10S, null, 25L, float(17.0), 25D, 26E2, timestamp '2014-08-04 01:01:00.000', date '2014-08-04'), + ('val1e', 10S, null, 19L, float(17.0), 25D, 26E2, timestamp '2014-09-04 01:02:00.001', date '2014-09-
spark git commit: [SPARK-18871][SQL][TESTS] New test cases for IN/NOT IN subquery 2nd batch
Repository: spark Updated Branches: refs/heads/master 601b9c3e6 -> d22db6278 [SPARK-18871][SQL][TESTS] New test cases for IN/NOT IN subquery 2nd batch ## What changes were proposed in this pull request? This is 2nd batch of test case for IN/NOT IN subquery. In this PR, it has these test cases: `in-limit.sql` `in-order-by.sql` `not-in-group-by.sql` These are the queries and results from running on DB2. [in-limit DB2 version](https://github.com/apache/spark/files/743267/in-limit.sql.db2.out.txt) [in-order-by DB2 version](https://github.com/apache/spark/files/743269/in-order-by.sql.db2.txt) [not-in-group-by DB2 version](https://github.com/apache/spark/files/743271/not-in-group-by.sql.db2.txt) [output of in-limit.sql DB2](https://github.com/apache/spark/files/743276/in-limit.sql.db2.out.txt) [output of in-order-by.sql DB2](https://github.com/apache/spark/files/743278/in-order-by.sql.db2.out.txt) [output of not-in-group-by.sql DB2](https://github.com/apache/spark/files/743279/not-in-group-by.sql.db2.out.txt) ## How was this patch tested? This pr is adding new test cases. Author: Kevin Yu Closes #16759 from kevinyu98/spark-18871-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d22db627 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d22db627 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d22db627 Branch: refs/heads/master Commit: d22db62785b74f433c51b07605b60126ccaa4d6d Parents: 601b9c3 Author: Kevin Yu Authored: Wed Feb 15 17:28:42 2017 +0100 Committer: Herman van Hovell Committed: Wed Feb 15 17:28:42 2017 +0100 -- .../inputs/subquery/in-subquery/in-limit.sql| 100 ++ .../inputs/subquery/in-subquery/in-order-by.sql | 197 +++ .../subquery/in-subquery/not-in-group-by.sql| 101 ++ .../in-subquery/not-in-multiple-columns.sql | 55 .../subquery/in-subquery/in-limit.sql.out | 147 + .../subquery/in-subquery/in-order-by.sql.out| 328 +++ .../in-subquery/not-in-group-by.sql.out | 150 + .../in-subquery/not-in-multiple-columns.sql.out | 59 8 files changed, 1023 insertions(+), 114 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d22db627/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql new file mode 100644 index 000..a40ee08 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql @@ -0,0 +1,100 @@ +-- A test suite for IN LIMIT in parent side, subquery, and both predicate subquery +-- It includes correlated cases. + +create temporary view t1 as select * from values + ("val1a", 6S, 8, 10L, float(15.0), 20D, 20E2, timestamp '2014-04-04 01:00:00.000', date '2014-04-04'), + ("val1b", 8S, 16, 19L, float(17.0), 25D, 26E2, timestamp '2014-05-04 01:01:00.000', date '2014-05-04'), + ("val1a", 16S, 12, 21L, float(15.0), 20D, 20E2, timestamp '2014-06-04 01:02:00.001', date '2014-06-04'), + ("val1a", 16S, 12, 10L, float(15.0), 20D, 20E2, timestamp '2014-07-04 01:01:00.000', date '2014-07-04'), + ("val1c", 8S, 16, 19L, float(17.0), 25D, 26E2, timestamp '2014-05-04 01:02:00.001', date '2014-05-05'), + ("val1d", null, 16, 22L, float(17.0), 25D, 26E2, timestamp '2014-06-04 01:01:00.000', null), + ("val1d", null, 16, 19L, float(17.0), 25D, 26E2, timestamp '2014-07-04 01:02:00.001', null), + ("val1e", 10S, null, 25L, float(17.0), 25D, 26E2, timestamp '2014-08-04 01:01:00.000', date '2014-08-04'), + ("val1e", 10S, null, 19L, float(17.0), 25D, 26E2, timestamp '2014-09-04 01:02:00.001', date '2014-09-04'), + ("val1d", 10S, null, 12L, float(17.0), 25D, 26E2, timestamp '2015-05-04 01:01:00.000', date '2015-05-04'), + ("val1a", 6S, 8, 10L, float(15.0), 20D, 20E2, timestamp '2014-04-04 01:02:00.001', date '2014-04-04'), + ("val1e", 10S, null, 19L, float(17.0), 25D, 26E2, timestamp '2014-05-04 01:01:00.000', date '2014-05-04') + as t1(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i); + +create temporary view t2 as select * from values + ("val2a", 6S, 12, 14L, float(15), 20D, 20E2, timestamp '2014-04-04 01:01:00.000', date '2014-04-04'), + ("val1b", 10S, 12, 19L, float(17), 25D, 26E2, timestamp '2014-05-04 01:01:00.000', date '2014-05-04'), + ("val1b", 8S, 16, 119L, float(17), 25D, 26E2, timestamp '2015-05-04 01:01:00.000', date '2015-05-04'), + ("val1c", 12S, 16, 219L, float(17), 25D, 26E2, timestamp '2016-05-04 01:01:00.000', date '2016-05-04'), + ("val1b", null, 16, 319L, float(17), 25D, 26E2, timestamp '2017-05-04 01:01:00.000', null), +
spark git commit: [SPARK-17076][SQL] Cardinality estimation for join based on basic column statistics
Repository: spark Updated Branches: refs/heads/master 8b75f8c1c -> 601b9c3e6 [SPARK-17076][SQL] Cardinality estimation for join based on basic column statistics ## What changes were proposed in this pull request? Support cardinality estimation and stats propagation for all join types. Limitations: - For inner/outer joins without any equal condition, we estimate it like cartesian product. - For left semi/anti joins, since we can't apply the heuristics for inner join to it, for now we just propagate the statistics from left side. We should support them when other advanced stats (e.g. histograms) are available in spark. ## How was this patch tested? Add a new test suite. Author: Zhenhua Wang Author: wangzhenhua Closes #16228 from wzhfy/joinEstimate. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/601b9c3e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/601b9c3e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/601b9c3e Branch: refs/heads/master Commit: 601b9c3e6821b533a76d538f7f26bb622fd026fc Parents: 8b75f8c Author: Zhenhua Wang Authored: Wed Feb 15 08:21:51 2017 -0800 Committer: Wenchen Fan Committed: Wed Feb 15 08:21:51 2017 -0800 -- .../plans/logical/basicLogicalOperators.scala | 26 +- .../statsEstimation/EstimationUtils.scala | 20 +- .../statsEstimation/JoinEstimation.scala| 307 + .../plans/logical/statsEstimation/Range.scala | 116 +++ .../statsEstimation/JoinEstimationSuite.scala | 327 +++ .../ProjectEstimationSuite.scala| 21 +- .../StatsEstimationTestBase.scala | 8 +- 7 files changed, 801 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/601b9c3e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 4d696c0..af57632 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTypes} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.{AggregateEstimation, EstimationUtils, ProjectEstimation} +import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.{AggregateEstimation, EstimationUtils, JoinEstimation, ProjectEstimation} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -340,14 +340,22 @@ case class Join( case _ => resolvedExceptNatural } - override def computeStats(conf: CatalystConf): Statistics = joinType match { -case LeftAnti | LeftSemi => - // LeftSemi and LeftAnti won't ever be bigger than left - left.stats(conf).copy() -case _ => - // make sure we don't propagate isBroadcastable in other joins, because - // they could explode the size. - super.computeStats(conf).copy(isBroadcastable = false) + override def computeStats(conf: CatalystConf): Statistics = { +def simpleEstimation: Statistics = joinType match { + case LeftAnti | LeftSemi => +// LeftSemi and LeftAnti won't ever be bigger than left +left.stats(conf) + case _ => +// Make sure we don't propagate isBroadcastable in other joins, because +// they could explode the size. +super.computeStats(conf).copy(isBroadcastable = false) +} + +if (conf.cboEnabled) { + JoinEstimation.estimate(conf, this).getOrElse(simpleEstimation) +} else { + simpleEstimation +} } } http://git-wip-us.apache.org/repos/asf/spark/blob/601b9c3e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala index e8b7942..4d18b28 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/pl
spark git commit: [SPARK-19587][SQL] bucket sorting columns should not be picked from partition columns
Repository: spark Updated Branches: refs/heads/master 733c59ec1 -> 8b75f8c1c [SPARK-19587][SQL] bucket sorting columns should not be picked from partition columns ## What changes were proposed in this pull request? We will throw an exception if bucket columns are part of partition columns, this should also apply to sort columns. This PR also move the checking logic from `DataFrameWriter` to `PreprocessTableCreation`, which is the central place for checking and normailization. ## How was this patch tested? updated test. Author: Wenchen Fan Closes #16931 from cloud-fan/bucket. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b75f8c1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b75f8c1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b75f8c1 Branch: refs/heads/master Commit: 8b75f8c1c9acae9c5c0dee92ad4f50195bf185d4 Parents: 733c59e Author: Wenchen Fan Authored: Wed Feb 15 08:15:03 2017 -0800 Committer: Wenchen Fan Committed: Wed Feb 15 08:15:03 2017 -0800 -- .../org/apache/spark/sql/DataFrameWriter.scala | 40 +--- .../spark/sql/execution/datasources/rules.scala | 18 +++-- .../spark/sql/sources/BucketedWriteSuite.scala | 15 3 files changed, 25 insertions(+), 48 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8b75f8c1/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 81657d9..748ebba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -215,7 +215,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { df.sparkSession, className = source, partitionColumns = partitioningColumns.getOrElse(Nil), - bucketSpec = getBucketSpec, options = extraOptions.toMap) dataSource.write(mode, df) @@ -270,52 +269,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ifNotExists = false)).toRdd } - private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols => -cols.map(normalize(_, "Partition")) - } - - private def normalizedBucketColNames: Option[Seq[String]] = bucketColumnNames.map { cols => -cols.map(normalize(_, "Bucketing")) - } - - private def normalizedSortColNames: Option[Seq[String]] = sortColumnNames.map { cols => -cols.map(normalize(_, "Sorting")) - } - private def getBucketSpec: Option[BucketSpec] = { if (sortColumnNames.isDefined) { require(numBuckets.isDefined, "sortBy must be used together with bucketBy") } -for { - n <- numBuckets -} yield { +numBuckets.map { n => require(n > 0 && n < 10, "Bucket number must be greater than 0 and less than 10.") - - // partitionBy columns cannot be used in bucketBy - if (normalizedParCols.nonEmpty && - normalizedBucketColNames.get.toSet.intersect(normalizedParCols.get.toSet).nonEmpty) { - throw new AnalysisException( -s"bucketBy columns '${bucketColumnNames.get.mkString(", ")}' should not be part of " + -s"partitionBy columns '${partitioningColumns.get.mkString(", ")}'") - } - - BucketSpec(n, normalizedBucketColNames.get, normalizedSortColNames.getOrElse(Nil)) + BucketSpec(n, bucketColumnNames.get, sortColumnNames.getOrElse(Nil)) } } - /** - * The given column name may not be equal to any of the existing column names if we were in - * case-insensitive context. Normalize the given column name to the real one so that we don't - * need to care about case sensitivity afterwards. - */ - private def normalize(columnName: String, columnType: String): String = { -val validColumnNames = df.logicalPlan.output.map(_.name) -validColumnNames.find(df.sparkSession.sessionState.analyzer.resolver(_, columnName)) - .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " + -s"existing columns (${validColumnNames.mkString(", ")})")) - } - private def assertNotBucketed(operation: String): Unit = { if (numBuckets.isDefined || sortColumnNames.isDefined) { throw new AnalysisException(s"'$operation' does not support bucketing right now") http://git-wip-us.apache.org/repos/asf/spark/blob/8b75f8c1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.s
spark git commit: [SPARK-16475][SQL] broadcast hint for SQL queries - follow up
Repository: spark Updated Branches: refs/heads/master b55563c17 -> 733c59ec1 [SPARK-16475][SQL] broadcast hint for SQL queries - follow up ## What changes were proposed in this pull request? A small update to https://github.com/apache/spark/pull/16925 1. Rename SubstituteHints -> ResolveHints to be more consistent with rest of the rules. 2. Added more documentation in the rule and be more defensive / future proof to skip views as well as CTEs. ## How was this patch tested? This pull request contains no real logic change and all behavior should be covered by existing tests. Author: Reynold Xin Closes #16939 from rxin/SPARK-16475. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/733c59ec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/733c59ec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/733c59ec Branch: refs/heads/master Commit: 733c59ec1ee5746c322e68459cd06241f5fa0903 Parents: b55563c Author: Reynold Xin Authored: Wed Feb 15 17:10:49 2017 +0100 Committer: Herman van Hovell Committed: Wed Feb 15 17:10:49 2017 +0100 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 4 +- .../sql/catalyst/analysis/ResolveHints.scala| 103 .../sql/catalyst/analysis/SubstituteHints.scala | 104 .../catalyst/analysis/ResolveHintsSuite.scala | 120 ++ .../analysis/SubstituteHintsSuite.scala | 121 --- 5 files changed, 225 insertions(+), 227 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/733c59ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 8348cb5..6aa0e8d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -115,8 +115,8 @@ class Analyzer( lazy val batches: Seq[Batch] = Seq( Batch("Hints", fixedPoint, - new SubstituteHints.SubstituteBroadcastHints(conf), - SubstituteHints.RemoveAllHints), + new ResolveHints.ResolveBroadcastHints(conf), + ResolveHints.RemoveAllHints), Batch("Substitution", fixedPoint, CTESubstitution, WindowsSubstitution, http://git-wip-us.apache.org/repos/asf/spark/blob/733c59ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala new file mode 100644 index 000..2124177 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.CurrentOrigin + + +/** + * Collection of rules related to hints. The only hint currently available is broadcast join hint. + * + * Note that this is separatedly into two rules because in the future we might introduce new hint + * rules that have different ordering requirements from broadcast. + */ +object ResolveHints { + + /** + * For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of + * relation aliases can be specified in the hint. A broadcast hint plan node will be inserted + * on top of any relation (that is not aliased differently), subquery, or common table expression + * that match the specified name. + * +
spark git commit: [SPARK-19607] Finding QueryExecution that matches provided executionId
Repository: spark Updated Branches: refs/heads/master 3973403d5 -> b55563c17 [SPARK-19607] Finding QueryExecution that matches provided executionId ## What changes were proposed in this pull request? Implementing a mapping between executionId and corresponding QueryExecution in SQLExecution. ## How was this patch tested? Adds a unit test. Author: Ala Luszczak Closes #16940 from ala/execution-id. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b55563c1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b55563c1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b55563c1 Branch: refs/heads/master Commit: b55563c17ec67f56017fa6bda5a18310c38dbefb Parents: 3973403 Author: Ala Luszczak Authored: Wed Feb 15 17:06:04 2017 +0100 Committer: Reynold Xin Committed: Wed Feb 15 17:06:04 2017 +0100 -- .../spark/sql/execution/SQLExecution.scala | 9 ++ .../spark/sql/execution/SQLExecutionSuite.scala | 32 2 files changed, 41 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b55563c1/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index ec07aab..be35916 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong import org.apache.spark.SparkContext @@ -32,6 +33,12 @@ object SQLExecution { private def nextExecutionId: Long = _nextExecutionId.getAndIncrement + private val executionIdToQueryExecution = new ConcurrentHashMap[Long, QueryExecution]() + + def getQueryExecution(executionId: Long): QueryExecution = { +executionIdToQueryExecution.get(executionId) + } + /** * Wrap an action that will execute "queryExecution" to track all Spark jobs in the body so that * we can connect them with an execution. @@ -44,6 +51,7 @@ object SQLExecution { if (oldExecutionId == null) { val executionId = SQLExecution.nextExecutionId sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString) + executionIdToQueryExecution.put(executionId, queryExecution) val r = try { // sparkContext.getCallSite() would first try to pick up any call site that was previously // set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly on @@ -60,6 +68,7 @@ object SQLExecution { executionId, System.currentTimeMillis())) } } finally { +executionIdToQueryExecution.remove(executionId) sc.setLocalProperty(EXECUTION_ID_KEY, null) } r http://git-wip-us.apache.org/repos/asf/spark/blob/b55563c1/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala index ad4..b059706 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import java.util.Properties import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.SparkSession class SQLExecutionSuite extends SparkFunSuite { @@ -102,6 +103,33 @@ class SQLExecutionSuite extends SparkFunSuite { } } + + test("Finding QueryExecution for given executionId") { +val spark = SparkSession.builder.master("local[*]").appName("test").getOrCreate() +import spark.implicits._ + +var queryExecution: QueryExecution = null + +spark.sparkContext.addSparkListener(new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { +val executionIdStr = jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) +if (executionIdStr != null) { + queryExecution = SQLExecution.getQueryExecution(executionIdStr.toLong) +} +SQLExecutionSuite.canProgress = true + } +}) + +val df = spark.range(1).map { x => + while (!SQLExecutionSuite.canProgress) { +Thread.sleep(1) + } + x +} +df.collect() + +assert
spark git commit: [SPARK-19456][SPARKR] Add LinearSVC R API
Repository: spark Updated Branches: refs/heads/master 447b2b530 -> 3973403d5 [SPARK-19456][SPARKR] Add LinearSVC R API ## What changes were proposed in this pull request? Linear SVM classifier is newly added into ML and python API has been added. This JIRA is to add R side API. Marked as WIP, as I am designing unit tests. ## How was this patch tested? Please review http://spark.apache.org/contributing.html before opening a pull request. Author: wm...@hotmail.com Closes #16800 from wangmiao1981/svc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3973403d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3973403d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3973403d Branch: refs/heads/master Commit: 3973403d5d90a48e3a995159680239ba5240e30c Parents: 447b2b5 Author: wm...@hotmail.com Authored: Wed Feb 15 01:15:50 2017 -0800 Committer: Felix Cheung Committed: Wed Feb 15 01:15:50 2017 -0800 -- R/pkg/NAMESPACE | 3 +- R/pkg/R/generics.R | 4 + R/pkg/R/mllib_classification.R | 132 R/pkg/R/mllib_utils.R | 9 +- .../tests/testthat/test_mllib_classification.R | 44 ++ .../apache/spark/ml/r/LinearSVCWrapper.scala| 152 +++ .../scala/org/apache/spark/ml/r/RWrappers.scala | 2 + 7 files changed, 342 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3973403d/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 625c797..8b26500 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -65,7 +65,8 @@ exportMethods("glm", "spark.logit", "spark.randomForest", "spark.gbt", - "spark.bisectingKmeans") + "spark.bisectingKmeans", + "spark.svmLinear") # Job group lifecycle management methods export("setJobGroup", http://git-wip-us.apache.org/repos/asf/spark/blob/3973403d/R/pkg/R/generics.R -- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index d78b1a1..0d9a996 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1401,6 +1401,10 @@ setGeneric("spark.randomForest", #' @export setGeneric("spark.survreg", function(data, formula) { standardGeneric("spark.survreg") }) +#' @rdname spark.svmLinear +#' @export +setGeneric("spark.svmLinear", function(data, formula, ...) { standardGeneric("spark.svmLinear") }) + #' @rdname spark.lda #' @export setGeneric("spark.posterior", function(object, newData) { standardGeneric("spark.posterior") }) http://git-wip-us.apache.org/repos/asf/spark/blob/3973403d/R/pkg/R/mllib_classification.R -- diff --git a/R/pkg/R/mllib_classification.R b/R/pkg/R/mllib_classification.R index 552cbe4..fa0d795 100644 --- a/R/pkg/R/mllib_classification.R +++ b/R/pkg/R/mllib_classification.R @@ -18,6 +18,13 @@ # mllib_regression.R: Provides methods for MLlib classification algorithms # (except for tree-based algorithms) integration +#' S4 class that represents an LinearSVCModel +#' +#' @param jobj a Java object reference to the backing Scala LinearSVCModel +#' @export +#' @note LinearSVCModel since 2.2.0 +setClass("LinearSVCModel", representation(jobj = "jobj")) + #' S4 class that represents an LogisticRegressionModel #' #' @param jobj a Java object reference to the backing Scala LogisticRegressionModel @@ -39,6 +46,131 @@ setClass("MultilayerPerceptronClassificationModel", representation(jobj = "jobj" #' @note NaiveBayesModel since 2.0.0 setClass("NaiveBayesModel", representation(jobj = "jobj")) +#' linear SVM Model +#' +#' Fits an linear SVM model against a SparkDataFrame. It is a binary classifier, similar to svm in glmnet package +#' Users can print, make predictions on the produced model and save the model to the input path. +#' +#' @param data SparkDataFrame for training. +#' @param formula A symbolic description of the model to be fitted. Currently only a few formula +#'operators are supported, including '~', '.', ':', '+', and '-'. +#' @param regParam The regularization parameter. +#' @param maxIter Maximum iteration number. +#' @param tol Convergence tolerance of iterations. +#' @param standardization Whether to standardize the training features before fitting the model. The coefficients +#'of models will be always returned on the original scale, so it will be transparent for +#'users. Note that with/without standardization, the models should be always converg