svn commit: r24090 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_22_01-fd46a27-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jan 9 06:15:41 2018 New Revision: 24090 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_08_22_01-fd46a27 docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21293][SPARKR][DOCS] structured streaming doc update
Repository: spark Updated Branches: refs/heads/branch-2.3 911a4dbe7 -> a23c07ecb [SPARK-21293][SPARKR][DOCS] structured streaming doc update ## What changes were proposed in this pull request? doc update Author: Felix Cheung Closes #20197 from felixcheung/rwadoc. (cherry picked from commit 02214b094390e913f52e71d55c9bb8a81c9e7ef9) Signed-off-by: Felix Cheung Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a23c07ec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a23c07ec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a23c07ec Branch: refs/heads/branch-2.3 Commit: a23c07ecb1dba0dbd52a0d2362d8d21e9cdd8b5a Parents: 911a4db Author: Felix Cheung Authored: Mon Jan 8 22:08:19 2018 -0800 Committer: Felix Cheung Committed: Mon Jan 8 22:08:34 2018 -0800 -- R/pkg/vignettes/sparkr-vignettes.Rmd | 2 +- docs/sparkr.md | 2 +- docs/structured-streaming-programming-guide.md | 32 +++-- 3 files changed, 32 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a23c07ec/R/pkg/vignettes/sparkr-vignettes.Rmd -- diff --git a/R/pkg/vignettes/sparkr-vignettes.Rmd b/R/pkg/vignettes/sparkr-vignettes.Rmd index 2e66242..feca617 100644 --- a/R/pkg/vignettes/sparkr-vignettes.Rmd +++ b/R/pkg/vignettes/sparkr-vignettes.Rmd @@ -1042,7 +1042,7 @@ unlink(modelPath) ## Structured Streaming -SparkR supports the Structured Streaming API (experimental). +SparkR supports the Structured Streaming API. You can check the Structured Streaming Programming Guide for [an introduction](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#programming-model) to its programming model and basic concepts. http://git-wip-us.apache.org/repos/asf/spark/blob/a23c07ec/docs/sparkr.md -- diff --git a/docs/sparkr.md b/docs/sparkr.md index 997ea60..6685b58 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -596,7 +596,7 @@ The following example shows how to save/load a MLlib model by SparkR. # Structured Streaming -SparkR supports the Structured Streaming API (experimental). Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. For more information see the R API on the [Structured Streaming Programming Guide](structured-streaming-programming-guide.html) +SparkR supports the Structured Streaming API. Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. For more information see the R API on the [Structured Streaming Programming Guide](structured-streaming-programming-guide.html) # R Function Name Conflicts http://git-wip-us.apache.org/repos/asf/spark/blob/a23c07ec/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 31fcfab..de13e28 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -827,8 +827,8 @@ df.isStreaming() {% endhighlight %} -{% highlight bash %} -Not available. +{% highlight r %} +isStreaming(df) {% endhighlight %} @@ -886,6 +886,19 @@ windowedCounts = words.groupBy( {% endhighlight %} + +{% highlight r %} +words <- ... # streaming DataFrame of schema { timestamp: Timestamp, word: String } + +# Group the data by window and word and compute the count of each group +windowedCounts <- count( +groupBy( + words, + window(words$timestamp, "10 minutes", "5 minutes"), + words$word)) +{% endhighlight %} + + @@ -960,6 +973,21 @@ windowedCounts = words \ {% endhighlight %} + +{% highlight r %} +words <- ... # streaming DataFrame of schema { timestamp: Timestamp, word: String } + +# Group the data by window and word and compute the count of each group + +words <- withWatermark(words, "timestamp", "10 minutes") +windowedCounts <- count( +groupBy( + words, + window(words$timestamp, "10 minutes", "5 minutes"), + words$word)) +{% endhighlight %} + + In this example, we are defining the watermark of the query on the value of the column "timestamp", - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21293][SPARKR][DOCS] structured streaming doc update
Repository: spark Updated Branches: refs/heads/master 8486ad419 -> 02214b094 [SPARK-21293][SPARKR][DOCS] structured streaming doc update ## What changes were proposed in this pull request? doc update Author: Felix Cheung Closes #20197 from felixcheung/rwadoc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02214b09 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02214b09 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02214b09 Branch: refs/heads/master Commit: 02214b094390e913f52e71d55c9bb8a81c9e7ef9 Parents: 8486ad419 Author: Felix Cheung Authored: Mon Jan 8 22:08:19 2018 -0800 Committer: Felix Cheung Committed: Mon Jan 8 22:08:19 2018 -0800 -- R/pkg/vignettes/sparkr-vignettes.Rmd | 2 +- docs/sparkr.md | 2 +- docs/structured-streaming-programming-guide.md | 32 +++-- 3 files changed, 32 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/02214b09/R/pkg/vignettes/sparkr-vignettes.Rmd -- diff --git a/R/pkg/vignettes/sparkr-vignettes.Rmd b/R/pkg/vignettes/sparkr-vignettes.Rmd index 2e66242..feca617 100644 --- a/R/pkg/vignettes/sparkr-vignettes.Rmd +++ b/R/pkg/vignettes/sparkr-vignettes.Rmd @@ -1042,7 +1042,7 @@ unlink(modelPath) ## Structured Streaming -SparkR supports the Structured Streaming API (experimental). +SparkR supports the Structured Streaming API. You can check the Structured Streaming Programming Guide for [an introduction](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#programming-model) to its programming model and basic concepts. http://git-wip-us.apache.org/repos/asf/spark/blob/02214b09/docs/sparkr.md -- diff --git a/docs/sparkr.md b/docs/sparkr.md index 997ea60..6685b58 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -596,7 +596,7 @@ The following example shows how to save/load a MLlib model by SparkR. # Structured Streaming -SparkR supports the Structured Streaming API (experimental). Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. For more information see the R API on the [Structured Streaming Programming Guide](structured-streaming-programming-guide.html) +SparkR supports the Structured Streaming API. Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. For more information see the R API on the [Structured Streaming Programming Guide](structured-streaming-programming-guide.html) # R Function Name Conflicts http://git-wip-us.apache.org/repos/asf/spark/blob/02214b09/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 31fcfab..de13e28 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -827,8 +827,8 @@ df.isStreaming() {% endhighlight %} -{% highlight bash %} -Not available. +{% highlight r %} +isStreaming(df) {% endhighlight %} @@ -886,6 +886,19 @@ windowedCounts = words.groupBy( {% endhighlight %} + +{% highlight r %} +words <- ... # streaming DataFrame of schema { timestamp: Timestamp, word: String } + +# Group the data by window and word and compute the count of each group +windowedCounts <- count( +groupBy( + words, + window(words$timestamp, "10 minutes", "5 minutes"), + words$word)) +{% endhighlight %} + + @@ -960,6 +973,21 @@ windowedCounts = words \ {% endhighlight %} + +{% highlight r %} +words <- ... # streaming DataFrame of schema { timestamp: Timestamp, word: String } + +# Group the data by window and word and compute the count of each group + +words <- withWatermark(words, "timestamp", "10 minutes") +windowedCounts <- count( +groupBy( + words, + window(words$timestamp, "10 minutes", "5 minutes"), + words$word)) +{% endhighlight %} + + In this example, we are defining the watermark of the query on the value of the column "timestamp", - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21292][DOCS] refreshtable example
Repository: spark Updated Branches: refs/heads/branch-2.3 fd46a276c -> 911a4dbe7 [SPARK-21292][DOCS] refreshtable example ## What changes were proposed in this pull request? doc update Author: Felix Cheung Closes #20198 from felixcheung/rrefreshdoc. (cherry picked from commit 8486ad419d8f1779e277ec71c39e1516673a83ab) Signed-off-by: Felix Cheung Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/911a4dbe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/911a4dbe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/911a4dbe Branch: refs/heads/branch-2.3 Commit: 911a4dbe7da45dbb62ed08b0c99bb97ea4fc9e3e Parents: fd46a27 Author: Felix Cheung Authored: Mon Jan 8 21:58:26 2018 -0800 Committer: Felix Cheung Committed: Mon Jan 8 22:02:04 2018 -0800 -- docs/sql-programming-guide.md | 16 1 file changed, 12 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/911a4dbe/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 3ccaaf4..72f79d6 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -915,6 +915,14 @@ spark.catalog.refreshTable("my_table") + + +{% highlight r %} +refreshTable("my_table") +{% endhighlight %} + + + {% highlight sql %} @@ -1498,10 +1506,10 @@ that these options will be deprecated in future release as more optimizations ar ## Broadcast Hint for SQL Queries The `BROADCAST` hint guides Spark to broadcast each specified table when joining them with another table or view. -When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is preferred, +When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is preferred, even if the statistics is above the configuration `spark.sql.autoBroadcastJoinThreshold`. When both sides of a join are specified, Spark broadcasts the one having the lower statistics. -Note Spark does not guarantee BHJ is always chosen, since not all cases (e.g. full outer join) +Note Spark does not guarantee BHJ is always chosen, since not all cases (e.g. full outer join) support BHJ. When the broadcast nested loop join is selected, we still respect the hint. @@ -1780,7 +1788,7 @@ options. Note that, for DecimalType(38,0)*, the table above intentionally does not cover all other combinations of scales and precisions because currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type. - In PySpark, now we need Pandas 0.19.2 or upper if you want to use Pandas related functionalities, such as `toPandas`, `createDataFrame` from Pandas DataFrame, etc. - In PySpark, the behavior of timestamp values for Pandas related functionalities was changed to respect session timezone. If you want to use the old behavior, you need to set a configuration `spark.sql.execution.pandas.respectSessionTimeZone` to `False`. See [SPARK-22395](https://issues.apache.org/jira/browse/SPARK-22395) for details. - + - Since Spark 2.3, when either broadcast hash join or broadcast nested loop join is applicable, we prefer to broadcasting the table that is explicitly specified in a broadcast hint. For details, see the section [Broadcast Hint](#broadcast-hint-for-sql-queries) and [SPARK-22489](https://issues.apache.org/jira/browse/SPARK-22489). - Since Spark 2.3, when all inputs are binary, `functions.concat()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.concatBinaryAsString` to `true`. @@ -2167,7 +2175,7 @@ Not all the APIs of the Hive UDF/UDTF/UDAF are supported by Spark SQL. Below are Spark SQL currently does not support the reuse of aggregation. * `getWindowingEvaluator` (`GenericUDAFEvaluator`) is a function to optimize aggregation by evaluating an aggregate over a fixed window. - + ### Incompatible Hive UDF Below are the scenarios in which Hive and Spark generate different results: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21292][DOCS] refreshtable example
Repository: spark Updated Branches: refs/heads/master f20131dd3 -> 8486ad419 [SPARK-21292][DOCS] refreshtable example ## What changes were proposed in this pull request? doc update Author: Felix Cheung Closes #20198 from felixcheung/rrefreshdoc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8486ad41 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8486ad41 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8486ad41 Branch: refs/heads/master Commit: 8486ad419d8f1779e277ec71c39e1516673a83ab Parents: f20131d Author: Felix Cheung Authored: Mon Jan 8 21:58:26 2018 -0800 Committer: Felix Cheung Committed: Mon Jan 8 21:58:26 2018 -0800 -- docs/sql-programming-guide.md | 16 1 file changed, 12 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8486ad41/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 3ccaaf4..72f79d6 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -915,6 +915,14 @@ spark.catalog.refreshTable("my_table") + + +{% highlight r %} +refreshTable("my_table") +{% endhighlight %} + + + {% highlight sql %} @@ -1498,10 +1506,10 @@ that these options will be deprecated in future release as more optimizations ar ## Broadcast Hint for SQL Queries The `BROADCAST` hint guides Spark to broadcast each specified table when joining them with another table or view. -When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is preferred, +When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is preferred, even if the statistics is above the configuration `spark.sql.autoBroadcastJoinThreshold`. When both sides of a join are specified, Spark broadcasts the one having the lower statistics. -Note Spark does not guarantee BHJ is always chosen, since not all cases (e.g. full outer join) +Note Spark does not guarantee BHJ is always chosen, since not all cases (e.g. full outer join) support BHJ. When the broadcast nested loop join is selected, we still respect the hint. @@ -1780,7 +1788,7 @@ options. Note that, for DecimalType(38,0)*, the table above intentionally does not cover all other combinations of scales and precisions because currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type. - In PySpark, now we need Pandas 0.19.2 or upper if you want to use Pandas related functionalities, such as `toPandas`, `createDataFrame` from Pandas DataFrame, etc. - In PySpark, the behavior of timestamp values for Pandas related functionalities was changed to respect session timezone. If you want to use the old behavior, you need to set a configuration `spark.sql.execution.pandas.respectSessionTimeZone` to `False`. See [SPARK-22395](https://issues.apache.org/jira/browse/SPARK-22395) for details. - + - Since Spark 2.3, when either broadcast hash join or broadcast nested loop join is applicable, we prefer to broadcasting the table that is explicitly specified in a broadcast hint. For details, see the section [Broadcast Hint](#broadcast-hint-for-sql-queries) and [SPARK-22489](https://issues.apache.org/jira/browse/SPARK-22489). - Since Spark 2.3, when all inputs are binary, `functions.concat()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.concatBinaryAsString` to `true`. @@ -2167,7 +2175,7 @@ Not all the APIs of the Hive UDF/UDTF/UDAF are supported by Spark SQL. Below are Spark SQL currently does not support the reuse of aggregation. * `getWindowingEvaluator` (`GenericUDAFEvaluator`) is a function to optimize aggregation by evaluating an aggregate over a fixed window. - + ### Incompatible Hive UDF Below are the scenarios in which Hive and Spark generate different results: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24089 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_20_01-f20131d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jan 9 04:15:10 2018 New Revision: 24089 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_08_20_01-f20131d docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22984] Fix incorrect bitmap copying and offset adjustment in GenerateUnsafeRowJoiner
Repository: spark Updated Branches: refs/heads/branch-2.2 7c30ae39f -> 24f1f2a54 [SPARK-22984] Fix incorrect bitmap copying and offset adjustment in GenerateUnsafeRowJoiner ## What changes were proposed in this pull request? This PR fixes a longstanding correctness bug in `GenerateUnsafeRowJoiner`. This class was introduced in https://github.com/apache/spark/pull/7821 (July 2015 / Spark 1.5.0+) and is used to combine pairs of UnsafeRows in TungstenAggregationIterator, CartesianProductExec, and AppendColumns. ### Bugs fixed by this patch 1. **Incorrect combining of null-tracking bitmaps**: when concatenating two UnsafeRows, the implementation "Concatenate the two bitsets together into a single one, taking padding into account". If one row has no columns then it has a bitset size of 0, but the code was incorrectly assuming that if the left row had a non-zero number of fields then the right row would also have at least one field, so it was copying invalid bytes and and treating them as part of the bitset. I'm not sure whether this bug was also present in the original implementation or whether it was introduced in https://github.com/apache/spark/pull/7892 (which fixed another bug in this code). 2. **Incorrect updating of data offsets for null variable-length fields**: after updating the bitsets and copying fixed-length and variable-length data, we need to perform adjustments to the offsets pointing the start of variable length fields's data. The existing code was _conditionally_ adding a fixed offset to correct for the new length of the combined row, but it is unsafe to do this if the variable-length field has a null value: we always represent nulls by storing `0` in the fixed-length slot, but this code was incorrectly incrementing those values. This bug was present since the original version of `GenerateUnsafeRowJoiner`. ### Why this bug remained latent for so long The PR which introduced `GenerateUnsafeRowJoiner` features several randomized tests, including tests of the cases where one side of the join has no fields and where string-valued fields are null. However, the existing assertions were too weak to uncover this bug: - If a null field has a non-zero value in its fixed-length data slot then this will not cause problems for field accesses because the null-tracking bitmap should still be correct and we will not try to use the incorrect offset for anything. - If the null tracking bitmap is corrupted by joining against a row with no fields then the corruption occurs in field numbers past the actual field numbers contained in the row. Thus valid `isNullAt()` calls will not read the incorrectly-set bits. The existing `GenerateUnsafeRowJoinerSuite` tests only exercised `.get()` and `isNullAt()`, but didn't actually check the UnsafeRows for bit-for-bit equality, preventing these bugs from failing assertions. It turns out that there was even a [GenerateUnsafeRowJoinerBitsetSuite](https://github.com/apache/spark/blob/03377d2522776267a07b7d6ae9bddf79a4e0f516/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala) but it looks like it also didn't catch this problem because it only tested the bitsets in an end-to-end fashion by accessing them through the `UnsafeRow` interface instead of actually comparing the bitsets' bytes. ### Impact of these bugs - This bug will cause `equals()` and `hashCode()` to be incorrect for these rows, which will be problematic in case`GenerateUnsafeRowJoiner`'s results are used as join or grouping keys. - Chained / repeated invocations of `GenerateUnsafeRowJoiner` may result in reads from invalid null bitmap positions causing fields to incorrectly become NULL (see the end-to-end example below). - It looks like this generally only happens in `CartesianProductExec`, which our query optimizer often avoids executing (usually we try to plan a `BroadcastNestedLoopJoin` instead). ### End-to-end test case demonstrating the problem The following query demonstrates how this bug may result in incorrect query results: ```sql set spark.sql.autoBroadcastJoinThreshold=-1; -- Needed to trigger CartesianProductExec create table a as select * from values 1; create table b as select * from values 2; SELECT t3.col1, t1.col1 FROM a t1 CROSS JOIN b t2 CROSS JOIN b t3 ``` This should return `(2, 1)` but instead was returning `(null, 1)`. Column pruning ends up trimming off all columns from `t2`, so when `t2` joins with another table this triggers the bitmap-copying bug. This incorrect bitmap is subsequently copied again when performing the final join, causing the final output to have an incorrectly-set null bit for the first field. ## How was this patch tested? Strengthened the assertions in existing tests in GenerateUnsafeRowJoinerSuite. Also verified that the end-to-end test case which uncovered this now passes. Author: Josh Rosen Closes #20181 from JoshRosen/SPARK
spark git commit: [SPARK-22984] Fix incorrect bitmap copying and offset adjustment in GenerateUnsafeRowJoiner
Repository: spark Updated Branches: refs/heads/branch-2.3 850b9f391 -> fd46a276c [SPARK-22984] Fix incorrect bitmap copying and offset adjustment in GenerateUnsafeRowJoiner ## What changes were proposed in this pull request? This PR fixes a longstanding correctness bug in `GenerateUnsafeRowJoiner`. This class was introduced in https://github.com/apache/spark/pull/7821 (July 2015 / Spark 1.5.0+) and is used to combine pairs of UnsafeRows in TungstenAggregationIterator, CartesianProductExec, and AppendColumns. ### Bugs fixed by this patch 1. **Incorrect combining of null-tracking bitmaps**: when concatenating two UnsafeRows, the implementation "Concatenate the two bitsets together into a single one, taking padding into account". If one row has no columns then it has a bitset size of 0, but the code was incorrectly assuming that if the left row had a non-zero number of fields then the right row would also have at least one field, so it was copying invalid bytes and and treating them as part of the bitset. I'm not sure whether this bug was also present in the original implementation or whether it was introduced in https://github.com/apache/spark/pull/7892 (which fixed another bug in this code). 2. **Incorrect updating of data offsets for null variable-length fields**: after updating the bitsets and copying fixed-length and variable-length data, we need to perform adjustments to the offsets pointing the start of variable length fields's data. The existing code was _conditionally_ adding a fixed offset to correct for the new length of the combined row, but it is unsafe to do this if the variable-length field has a null value: we always represent nulls by storing `0` in the fixed-length slot, but this code was incorrectly incrementing those values. This bug was present since the original version of `GenerateUnsafeRowJoiner`. ### Why this bug remained latent for so long The PR which introduced `GenerateUnsafeRowJoiner` features several randomized tests, including tests of the cases where one side of the join has no fields and where string-valued fields are null. However, the existing assertions were too weak to uncover this bug: - If a null field has a non-zero value in its fixed-length data slot then this will not cause problems for field accesses because the null-tracking bitmap should still be correct and we will not try to use the incorrect offset for anything. - If the null tracking bitmap is corrupted by joining against a row with no fields then the corruption occurs in field numbers past the actual field numbers contained in the row. Thus valid `isNullAt()` calls will not read the incorrectly-set bits. The existing `GenerateUnsafeRowJoinerSuite` tests only exercised `.get()` and `isNullAt()`, but didn't actually check the UnsafeRows for bit-for-bit equality, preventing these bugs from failing assertions. It turns out that there was even a [GenerateUnsafeRowJoinerBitsetSuite](https://github.com/apache/spark/blob/03377d2522776267a07b7d6ae9bddf79a4e0f516/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala) but it looks like it also didn't catch this problem because it only tested the bitsets in an end-to-end fashion by accessing them through the `UnsafeRow` interface instead of actually comparing the bitsets' bytes. ### Impact of these bugs - This bug will cause `equals()` and `hashCode()` to be incorrect for these rows, which will be problematic in case`GenerateUnsafeRowJoiner`'s results are used as join or grouping keys. - Chained / repeated invocations of `GenerateUnsafeRowJoiner` may result in reads from invalid null bitmap positions causing fields to incorrectly become NULL (see the end-to-end example below). - It looks like this generally only happens in `CartesianProductExec`, which our query optimizer often avoids executing (usually we try to plan a `BroadcastNestedLoopJoin` instead). ### End-to-end test case demonstrating the problem The following query demonstrates how this bug may result in incorrect query results: ```sql set spark.sql.autoBroadcastJoinThreshold=-1; -- Needed to trigger CartesianProductExec create table a as select * from values 1; create table b as select * from values 2; SELECT t3.col1, t1.col1 FROM a t1 CROSS JOIN b t2 CROSS JOIN b t3 ``` This should return `(2, 1)` but instead was returning `(null, 1)`. Column pruning ends up trimming off all columns from `t2`, so when `t2` joins with another table this triggers the bitmap-copying bug. This incorrect bitmap is subsequently copied again when performing the final join, causing the final output to have an incorrectly-set null bit for the first field. ## How was this patch tested? Strengthened the assertions in existing tests in GenerateUnsafeRowJoinerSuite. Also verified that the end-to-end test case which uncovered this now passes. Author: Josh Rosen Closes #20181 from JoshRosen/SPARK
spark git commit: [SPARK-22984] Fix incorrect bitmap copying and offset adjustment in GenerateUnsafeRowJoiner
Repository: spark Updated Branches: refs/heads/master 849043ce1 -> f20131dd3 [SPARK-22984] Fix incorrect bitmap copying and offset adjustment in GenerateUnsafeRowJoiner ## What changes were proposed in this pull request? This PR fixes a longstanding correctness bug in `GenerateUnsafeRowJoiner`. This class was introduced in https://github.com/apache/spark/pull/7821 (July 2015 / Spark 1.5.0+) and is used to combine pairs of UnsafeRows in TungstenAggregationIterator, CartesianProductExec, and AppendColumns. ### Bugs fixed by this patch 1. **Incorrect combining of null-tracking bitmaps**: when concatenating two UnsafeRows, the implementation "Concatenate the two bitsets together into a single one, taking padding into account". If one row has no columns then it has a bitset size of 0, but the code was incorrectly assuming that if the left row had a non-zero number of fields then the right row would also have at least one field, so it was copying invalid bytes and and treating them as part of the bitset. I'm not sure whether this bug was also present in the original implementation or whether it was introduced in https://github.com/apache/spark/pull/7892 (which fixed another bug in this code). 2. **Incorrect updating of data offsets for null variable-length fields**: after updating the bitsets and copying fixed-length and variable-length data, we need to perform adjustments to the offsets pointing the start of variable length fields's data. The existing code was _conditionally_ adding a fixed offset to correct for the new length of the combined row, but it is unsafe to do this if the variable-length field has a null value: we always represent nulls by storing `0` in the fixed-length slot, but this code was incorrectly incrementing those values. This bug was present since the original version of `GenerateUnsafeRowJoiner`. ### Why this bug remained latent for so long The PR which introduced `GenerateUnsafeRowJoiner` features several randomized tests, including tests of the cases where one side of the join has no fields and where string-valued fields are null. However, the existing assertions were too weak to uncover this bug: - If a null field has a non-zero value in its fixed-length data slot then this will not cause problems for field accesses because the null-tracking bitmap should still be correct and we will not try to use the incorrect offset for anything. - If the null tracking bitmap is corrupted by joining against a row with no fields then the corruption occurs in field numbers past the actual field numbers contained in the row. Thus valid `isNullAt()` calls will not read the incorrectly-set bits. The existing `GenerateUnsafeRowJoinerSuite` tests only exercised `.get()` and `isNullAt()`, but didn't actually check the UnsafeRows for bit-for-bit equality, preventing these bugs from failing assertions. It turns out that there was even a [GenerateUnsafeRowJoinerBitsetSuite](https://github.com/apache/spark/blob/03377d2522776267a07b7d6ae9bddf79a4e0f516/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala) but it looks like it also didn't catch this problem because it only tested the bitsets in an end-to-end fashion by accessing them through the `UnsafeRow` interface instead of actually comparing the bitsets' bytes. ### Impact of these bugs - This bug will cause `equals()` and `hashCode()` to be incorrect for these rows, which will be problematic in case`GenerateUnsafeRowJoiner`'s results are used as join or grouping keys. - Chained / repeated invocations of `GenerateUnsafeRowJoiner` may result in reads from invalid null bitmap positions causing fields to incorrectly become NULL (see the end-to-end example below). - It looks like this generally only happens in `CartesianProductExec`, which our query optimizer often avoids executing (usually we try to plan a `BroadcastNestedLoopJoin` instead). ### End-to-end test case demonstrating the problem The following query demonstrates how this bug may result in incorrect query results: ```sql set spark.sql.autoBroadcastJoinThreshold=-1; -- Needed to trigger CartesianProductExec create table a as select * from values 1; create table b as select * from values 2; SELECT t3.col1, t1.col1 FROM a t1 CROSS JOIN b t2 CROSS JOIN b t3 ``` This should return `(2, 1)` but instead was returning `(null, 1)`. Column pruning ends up trimming off all columns from `t2`, so when `t2` joins with another table this triggers the bitmap-copying bug. This incorrect bitmap is subsequently copied again when performing the final join, causing the final output to have an incorrectly-set null bit for the first field. ## How was this patch tested? Strengthened the assertions in existing tests in GenerateUnsafeRowJoinerSuite. Also verified that the end-to-end test case which uncovered this now passes. Author: Josh Rosen Closes #20181 from JoshRosen/SPARK-229
spark git commit: [SPARK-22990][CORE] Fix method isFairScheduler in JobsTab and StagesTab
Repository: spark Updated Branches: refs/heads/master 68ce792b5 -> 849043ce1 [SPARK-22990][CORE] Fix method isFairScheduler in JobsTab and StagesTab ## What changes were proposed in this pull request? In current implementation, the function `isFairScheduler` is always false, since it is comparing String with `SchedulingMode` Author: Wang Gengliang Closes #20186 from gengliangwang/isFairScheduler. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/849043ce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/849043ce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/849043ce Branch: refs/heads/master Commit: 849043ce1d28a976659278d29368da0799329db8 Parents: 68ce792 Author: Wang Gengliang Authored: Tue Jan 9 10:44:21 2018 +0800 Committer: Wenchen Fan Committed: Tue Jan 9 10:44:21 2018 +0800 -- core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala | 8 core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala | 8 2 files changed, 8 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/849043ce/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala index 99eab1b..ff1b75e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala @@ -34,10 +34,10 @@ private[ui] class JobsTab(parent: SparkUI, store: AppStatusStore) val killEnabled = parent.killEnabled def isFairScheduler: Boolean = { -store.environmentInfo().sparkProperties.toMap - .get("spark.scheduler.mode") - .map { mode => mode == SchedulingMode.FAIR } - .getOrElse(false) +store + .environmentInfo() + .sparkProperties + .contains(("spark.scheduler.mode", SchedulingMode.FAIR.toString)) } def getSparkUser: String = parent.getSparkUser http://git-wip-us.apache.org/repos/asf/spark/blob/849043ce/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index be05a96..10b0320 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -37,10 +37,10 @@ private[ui] class StagesTab(val parent: SparkUI, val store: AppStatusStore) attachPage(new PoolPage(this)) def isFairScheduler: Boolean = { -store.environmentInfo().sparkProperties.toMap - .get("spark.scheduler.mode") - .map { mode => mode == SchedulingMode.FAIR } - .getOrElse(false) +store + .environmentInfo() + .sparkProperties + .contains(("spark.scheduler.mode", SchedulingMode.FAIR.toString)) } def handleKillRequest(request: HttpServletRequest): Unit = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22990][CORE] Fix method isFairScheduler in JobsTab and StagesTab
Repository: spark Updated Branches: refs/heads/branch-2.3 8032cf852 -> 850b9f391 [SPARK-22990][CORE] Fix method isFairScheduler in JobsTab and StagesTab ## What changes were proposed in this pull request? In current implementation, the function `isFairScheduler` is always false, since it is comparing String with `SchedulingMode` Author: Wang Gengliang Closes #20186 from gengliangwang/isFairScheduler. (cherry picked from commit 849043ce1d28a976659278d29368da0799329db8) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/850b9f39 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/850b9f39 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/850b9f39 Branch: refs/heads/branch-2.3 Commit: 850b9f39186665fd727737a98b29abe5236830db Parents: 8032cf8 Author: Wang Gengliang Authored: Tue Jan 9 10:44:21 2018 +0800 Committer: Wenchen Fan Committed: Tue Jan 9 10:44:42 2018 +0800 -- core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala | 8 core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala | 8 2 files changed, 8 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/850b9f39/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala index 99eab1b..ff1b75e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala @@ -34,10 +34,10 @@ private[ui] class JobsTab(parent: SparkUI, store: AppStatusStore) val killEnabled = parent.killEnabled def isFairScheduler: Boolean = { -store.environmentInfo().sparkProperties.toMap - .get("spark.scheduler.mode") - .map { mode => mode == SchedulingMode.FAIR } - .getOrElse(false) +store + .environmentInfo() + .sparkProperties + .contains(("spark.scheduler.mode", SchedulingMode.FAIR.toString)) } def getSparkUser: String = parent.getSparkUser http://git-wip-us.apache.org/repos/asf/spark/blob/850b9f39/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index be05a96..10b0320 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -37,10 +37,10 @@ private[ui] class StagesTab(val parent: SparkUI, val store: AppStatusStore) attachPage(new PoolPage(this)) def isFairScheduler: Boolean = { -store.environmentInfo().sparkProperties.toMap - .get("spark.scheduler.mode") - .map { mode => mode == SchedulingMode.FAIR } - .getOrElse(false) +store + .environmentInfo() + .sparkProperties + .contains(("spark.scheduler.mode", SchedulingMode.FAIR.toString)) } def handleKillRequest(request: HttpServletRequest): Unit = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22972] Couldn't find corresponding Hive SerDe for data source provider org.apache.spark.sql.hive.orc
Repository: spark Updated Branches: refs/heads/branch-2.3 eecd83cb2 -> 8032cf852 [SPARK-22972] Couldn't find corresponding Hive SerDe for data source provider org.apache.spark.sql.hive.orc ## What changes were proposed in this pull request? Fix the warning: Couldn't find corresponding Hive SerDe for data source provider org.apache.spark.sql.hive.orc. ## How was this patch tested? test("SPARK-22972: hive orc source") assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.hive.orc") .equals(HiveSerDe.sourceToSerDe("orc"))) Author: xubo245 <601450...@qq.com> Closes #20165 from xubo245/HiveSerDe. (cherry picked from commit 68ce792b5857f0291154f524ac651036db868bb9) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8032cf85 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8032cf85 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8032cf85 Branch: refs/heads/branch-2.3 Commit: 8032cf852fccd0ab8754f633affdc9ba8fc99e58 Parents: eecd83c Author: xubo245 <601450...@qq.com> Authored: Tue Jan 9 10:15:01 2018 +0800 Committer: gatorsmile Committed: Tue Jan 9 10:15:45 2018 +0800 -- .../apache/spark/sql/internal/HiveSerDe.scala | 1 + .../spark/sql/hive/orc/HiveOrcSourceSuite.scala | 29 2 files changed, 30 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8032cf85/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala index b9515ec..dac4636 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala @@ -73,6 +73,7 @@ object HiveSerDe { val key = source.toLowerCase(Locale.ROOT) match { case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet" case s if s.startsWith("org.apache.spark.sql.orc") => "orc" + case s if s.startsWith("org.apache.spark.sql.hive.orc") => "orc" case s if s.equals("orcfile") => "orc" case s if s.equals("parquetfile") => "parquet" case s if s.equals("avrofile") => "avro" http://git-wip-us.apache.org/repos/asf/spark/blob/8032cf85/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala index 17b7d8c..d556a03 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala @@ -20,8 +20,10 @@ package org.apache.spark.sql.hive.orc import java.io.File import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.datasources.orc.OrcSuite import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.util.Utils class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { @@ -62,6 +64,33 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { """.stripMargin) } + test("SPARK-22972: hive orc source") { +val tableName = "normal_orc_as_source_hive" +withTable(tableName) { + sql( +s""" + |CREATE TABLE $tableName + |USING org.apache.spark.sql.hive.orc + |OPTIONS ( + | PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}' + |) +""".stripMargin) + + val tableMetadata = spark.sessionState.catalog.getTableMetadata( +TableIdentifier(tableName)) + assert(tableMetadata.storage.inputFormat == +Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) + assert(tableMetadata.storage.outputFormat == +Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) + assert(tableMetadata.storage.serde == +Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) + assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.hive.orc") +.equals(HiveSerDe.sourceToSerDe("orc"))) + assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.orc") +.equals(HiveSerDe.sourceToSerDe("orc"))) +} + } + test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") { val location = Utils.createTempDir() val uri = location.toURI - To unsubscribe, e-mail: commits-unsubscr.
spark git commit: [SPARK-22972] Couldn't find corresponding Hive SerDe for data source provider org.apache.spark.sql.hive.orc
Repository: spark Updated Branches: refs/heads/master 4f7e75883 -> 68ce792b5 [SPARK-22972] Couldn't find corresponding Hive SerDe for data source provider org.apache.spark.sql.hive.orc ## What changes were proposed in this pull request? Fix the warning: Couldn't find corresponding Hive SerDe for data source provider org.apache.spark.sql.hive.orc. ## How was this patch tested? test("SPARK-22972: hive orc source") assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.hive.orc") .equals(HiveSerDe.sourceToSerDe("orc"))) Author: xubo245 <601450...@qq.com> Closes #20165 from xubo245/HiveSerDe. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68ce792b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68ce792b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68ce792b Branch: refs/heads/master Commit: 68ce792b5857f0291154f524ac651036db868bb9 Parents: 4f7e758 Author: xubo245 <601450...@qq.com> Authored: Tue Jan 9 10:15:01 2018 +0800 Committer: gatorsmile Committed: Tue Jan 9 10:15:01 2018 +0800 -- .../apache/spark/sql/internal/HiveSerDe.scala | 1 + .../spark/sql/hive/orc/HiveOrcSourceSuite.scala | 29 2 files changed, 30 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/68ce792b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala index b9515ec..dac4636 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala @@ -73,6 +73,7 @@ object HiveSerDe { val key = source.toLowerCase(Locale.ROOT) match { case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet" case s if s.startsWith("org.apache.spark.sql.orc") => "orc" + case s if s.startsWith("org.apache.spark.sql.hive.orc") => "orc" case s if s.equals("orcfile") => "orc" case s if s.equals("parquetfile") => "parquet" case s if s.equals("avrofile") => "avro" http://git-wip-us.apache.org/repos/asf/spark/blob/68ce792b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala index 17b7d8c..d556a03 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala @@ -20,8 +20,10 @@ package org.apache.spark.sql.hive.orc import java.io.File import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.datasources.orc.OrcSuite import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.util.Utils class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { @@ -62,6 +64,33 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { """.stripMargin) } + test("SPARK-22972: hive orc source") { +val tableName = "normal_orc_as_source_hive" +withTable(tableName) { + sql( +s""" + |CREATE TABLE $tableName + |USING org.apache.spark.sql.hive.orc + |OPTIONS ( + | PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}' + |) +""".stripMargin) + + val tableMetadata = spark.sessionState.catalog.getTableMetadata( +TableIdentifier(tableName)) + assert(tableMetadata.storage.inputFormat == +Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) + assert(tableMetadata.storage.outputFormat == +Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) + assert(tableMetadata.storage.serde == +Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) + assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.hive.orc") +.equals(HiveSerDe.sourceToSerDe("orc"))) + assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.orc") +.equals(HiveSerDe.sourceToSerDe("orc"))) +} + } + test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") { val location = Utils.createTempDir() val uri = location.toURI - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24086 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_16_01-4f7e758-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jan 9 00:15:01 2018 New Revision: 24086 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_08_16_01-4f7e758 docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24085 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_14_01-eecd83c-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Jan 8 22:17:41 2018 New Revision: 24085 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_08_14_01-eecd83c docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22912] v2 data source support in MicroBatchExecution
Repository: spark Updated Branches: refs/heads/master eed82a0b2 -> 4f7e75883 [SPARK-22912] v2 data source support in MicroBatchExecution ## What changes were proposed in this pull request? Support for v2 data sources in microbatch streaming. ## How was this patch tested? A very basic new unit test on the toy v2 implementation of rate source. Once we have a v1 source fully migrated to v2, we'll need to do more detailed compatibility testing. Author: Jose Torres Closes #20097 from jose-torres/v2-impl. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f7e7588 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f7e7588 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f7e7588 Branch: refs/heads/master Commit: 4f7e75883436069c2d9028c4cd5daa78e8d59560 Parents: eed82a0 Author: Jose Torres Authored: Mon Jan 8 13:24:08 2018 -0800 Committer: Tathagata Das Committed: Mon Jan 8 13:24:08 2018 -0800 -- apache.spark.sql.sources.DataSourceRegister | 1 + .../datasources/v2/DataSourceV2Relation.scala | 10 ++ .../streaming/MicroBatchExecution.scala | 112 +++ .../execution/streaming/ProgressReporter.scala | 6 +- .../streaming/RateSourceProvider.scala | 10 +- .../execution/streaming/StreamExecution.scala | 4 +- .../execution/streaming/StreamingRelation.scala | 4 +- .../continuous/ContinuousExecution.scala| 4 +- .../continuous/ContinuousRateStreamSource.scala | 17 +-- .../streaming/sources/RateStreamSourceV2.scala | 31 - .../spark/sql/streaming/DataStreamReader.scala | 25 - .../sql/streaming/StreamingQueryManager.scala | 24 ++-- .../execution/streaming/RateSourceV2Suite.scala | 68 +-- .../apache/spark/sql/streaming/StreamTest.scala | 2 +- .../streaming/continuous/ContinuousSuite.scala | 2 +- 15 files changed, 241 insertions(+), 79 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister -- diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 6cdfe2f..0259c77 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -7,3 +7,4 @@ org.apache.spark.sql.execution.datasources.text.TextFileFormat org.apache.spark.sql.execution.streaming.ConsoleSinkProvider org.apache.spark.sql.execution.streaming.TextSocketSourceProvider org.apache.spark.sql.execution.streaming.RateSourceProvider +org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2 http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 7eb99a6..cba20dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -35,6 +35,16 @@ case class DataSourceV2Relation( } } +/** + * A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical + * to the non-streaming relation. + */ +class StreamingDataSourceV2Relation( +fullOutput: Seq[AttributeReference], +reader: DataSourceV2Reader) extends DataSourceV2Relation(fullOutput, reader) { + override def isStreaming: Boolean = true +} + object DataSourceV2Relation { def apply(reader: DataSourceV2Reader): DataSourceV2Relation = { new DataSourceV2Relation(reader.readSchema().toAttributes, reader) http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 9a7a13f..42240ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Mi
spark git commit: [SPARK-22992][K8S] Remove assumption of the DNS domain
Repository: spark Updated Branches: refs/heads/branch-2.3 06fd842e3 -> eecd83cb2 [SPARK-22992][K8S] Remove assumption of the DNS domain ## What changes were proposed in this pull request? Remove the use of FQDN to access the driver because it assumes that it's set up in a DNS zone - `cluster.local` which is common but not ubiquitous Note that we already access the in-cluster API server through `kubernetes.default.svc`, so, by extension, this should work as well. The alternative is to introduce DNS zones for both of those addresses. ## How was this patch tested? Unit tests cc vanzin liyinan926 mridulm mccheah Author: foxish Closes #20187 from foxish/cluster.local. (cherry picked from commit eed82a0b211352215316ec70dc48aefc013ad0b2) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eecd83cb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eecd83cb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eecd83cb Branch: refs/heads/branch-2.3 Commit: eecd83cb2d24907aba303095b052997471247500 Parents: 06fd842 Author: foxish Authored: Mon Jan 8 13:01:45 2018 -0800 Committer: Marcelo Vanzin Committed: Mon Jan 8 13:01:56 2018 -0800 -- .../deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala | 2 +- .../k8s/submit/steps/DriverServiceBootstrapStepSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eecd83cb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala -- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala index eb594e4..34af7cd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala @@ -83,7 +83,7 @@ private[spark] class DriverServiceBootstrapStep( .build() val namespace = sparkConf.get(KUBERNETES_NAMESPACE) -val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local" +val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc" val resolvedSparkConf = driverSpec.driverSparkConf.clone() .set(DRIVER_HOST_KEY, driverHostname) .set("spark.driver.port", driverPort.toString) http://git-wip-us.apache.org/repos/asf/spark/blob/eecd83cb/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala -- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala index 006ce26..78c8c3b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala @@ -85,7 +85,7 @@ class DriverServiceBootstrapStepSuite extends SparkFunSuite with BeforeAndAfter val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec) val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX + DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX -val expectedHostName = s"$expectedServiceName.my-namespace.svc.cluster.local" +val expectedHostName = s"$expectedServiceName.my-namespace.svc" verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, expectedHostName) } @@ -120,7 +120,7 @@ class DriverServiceBootstrapStepSuite extends SparkFunSuite with BeforeAndAfter val driverService = resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service] val expectedServiceName = s"spark-1${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}" assert(driverService.getMetadata.getName === expectedServiceName) -val expectedHostName = s"$expectedServiceName.my-namespace.svc.cluster.local" +val expectedHostName = s"$expectedServiceName.my-namespace.svc" verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, expectedHostName) } - To unsubscribe, e-mail: commits-unsubscr...@spark
spark git commit: [SPARK-22992][K8S] Remove assumption of the DNS domain
Repository: spark Updated Branches: refs/heads/master 40b983c3b -> eed82a0b2 [SPARK-22992][K8S] Remove assumption of the DNS domain ## What changes were proposed in this pull request? Remove the use of FQDN to access the driver because it assumes that it's set up in a DNS zone - `cluster.local` which is common but not ubiquitous Note that we already access the in-cluster API server through `kubernetes.default.svc`, so, by extension, this should work as well. The alternative is to introduce DNS zones for both of those addresses. ## How was this patch tested? Unit tests cc vanzin liyinan926 mridulm mccheah Author: foxish Closes #20187 from foxish/cluster.local. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eed82a0b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eed82a0b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eed82a0b Branch: refs/heads/master Commit: eed82a0b211352215316ec70dc48aefc013ad0b2 Parents: 40b983c Author: foxish Authored: Mon Jan 8 13:01:45 2018 -0800 Committer: Marcelo Vanzin Committed: Mon Jan 8 13:01:45 2018 -0800 -- .../deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala | 2 +- .../k8s/submit/steps/DriverServiceBootstrapStepSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eed82a0b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala -- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala index eb594e4..34af7cd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala @@ -83,7 +83,7 @@ private[spark] class DriverServiceBootstrapStep( .build() val namespace = sparkConf.get(KUBERNETES_NAMESPACE) -val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local" +val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc" val resolvedSparkConf = driverSpec.driverSparkConf.clone() .set(DRIVER_HOST_KEY, driverHostname) .set("spark.driver.port", driverPort.toString) http://git-wip-us.apache.org/repos/asf/spark/blob/eed82a0b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala -- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala index 006ce26..78c8c3b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala @@ -85,7 +85,7 @@ class DriverServiceBootstrapStepSuite extends SparkFunSuite with BeforeAndAfter val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec) val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX + DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX -val expectedHostName = s"$expectedServiceName.my-namespace.svc.cluster.local" +val expectedHostName = s"$expectedServiceName.my-namespace.svc" verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, expectedHostName) } @@ -120,7 +120,7 @@ class DriverServiceBootstrapStepSuite extends SparkFunSuite with BeforeAndAfter val driverService = resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service] val expectedServiceName = s"spark-1${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}" assert(driverService.getMetadata.getName === expectedServiceName) -val expectedHostName = s"$expectedServiceName.my-namespace.svc.cluster.local" +val expectedHostName = s"$expectedServiceName.my-namespace.svc" verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, expectedHostName) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark-website git commit: update latest
Repository: spark-website Updated Branches: refs/heads/asf-site 8c5354f29 -> 1a8adcaa8 update latest Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/1a8adcaa Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/1a8adcaa Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/1a8adcaa Branch: refs/heads/asf-site Commit: 1a8adcaa872743663f6d429e3a1962f9ce888755 Parents: 8c5354f Author: Felix Cheung Authored: Mon Jan 8 10:28:12 2018 -0800 Committer: Felix Cheung Committed: Mon Jan 8 10:28:12 2018 -0800 -- site/docs/latest | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/1a8adcaa/site/docs/latest -- diff --git a/site/docs/latest b/site/docs/latest index e3a4f19..fae692e 12 --- a/site/docs/latest +++ b/site/docs/latest @@ -1 +1 @@ -2.2.0 \ No newline at end of file +2.2.1 \ No newline at end of file - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24082 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_10_01-06fd842-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Jan 8 18:18:59 2018 New Revision: 24082 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_08_10_01-06fd842 docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24079 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_08_01-40b983c-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Jan 8 16:20:17 2018 New Revision: 24079 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_08_08_01-40b983c docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22952][CORE] Deprecate stageAttemptId in favour of stageAttemptNumber
Repository: spark Updated Branches: refs/heads/branch-2.3 4a45f0a53 -> 06fd842e3 [SPARK-22952][CORE] Deprecate stageAttemptId in favour of stageAttemptNumber ## What changes were proposed in this pull request? 1. Deprecate attemptId in StageInfo and add `def attemptNumber() = attemptId` 2. Replace usage of stageAttemptId with stageAttemptNumber ## How was this patch tested? I manually checked the compiler warning info Author: Xianjin YE Closes #20178 from advancedxy/SPARK-22952. (cherry picked from commit 40b983c3b44b6771f07302ce87987fa4716b5ebf) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06fd842e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06fd842e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06fd842e Branch: refs/heads/branch-2.3 Commit: 06fd842e3a120fde1c137e4945bcb747fc71a322 Parents: 4a45f0a Author: Xianjin YE Authored: Mon Jan 8 23:49:07 2018 +0800 Committer: Wenchen Fan Committed: Mon Jan 8 23:49:27 2018 +0800 -- .../apache/spark/scheduler/DAGScheduler.scala | 15 +++--- .../org/apache/spark/scheduler/StageInfo.scala | 4 +- .../spark/scheduler/StatsReportListener.scala | 2 +- .../apache/spark/status/AppStatusListener.scala | 7 +-- .../org/apache/spark/status/LiveEntity.scala| 4 +- .../spark/ui/scope/RDDOperationGraph.scala | 2 +- .../org/apache/spark/util/JsonProtocol.scala| 2 +- .../spark/status/AppStatusListenerSuite.scala | 54 +++- .../sql/execution/ui/SQLAppStatusListener.scala | 2 +- 9 files changed, 51 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/06fd842e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c2498d4..199937b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -815,7 +815,8 @@ class DAGScheduler( private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) { // Note that there is a chance that this task is launched after the stage is cancelled. // In that case, we wouldn't have the stage anymore in stageIdToStage. -val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1) +val stageAttemptId = + stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1) listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo)) } @@ -1050,7 +1051,7 @@ class DAGScheduler( val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) stage.pendingPartitions += id -new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, +new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } @@ -1060,7 +1061,7 @@ class DAGScheduler( val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) -new ResultTask(stage.id, stage.latestInfo.attemptId, +new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } @@ -1076,7 +1077,7 @@ class DAGScheduler( logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") taskScheduler.submitTasks(new TaskSet( -tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) +tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties)) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run @@ -1245,7 +1246,7 @@ class DAGScheduler( val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) -if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) { +if (stageIdToStage(task.stageId).latestInfo.attemptNumber == task.stageAttemptId) { // This task was for the currently running attempt of the stage.
spark git commit: [SPARK-22952][CORE] Deprecate stageAttemptId in favour of stageAttemptNumber
Repository: spark Updated Branches: refs/heads/master eb45b52e8 -> 40b983c3b [SPARK-22952][CORE] Deprecate stageAttemptId in favour of stageAttemptNumber ## What changes were proposed in this pull request? 1. Deprecate attemptId in StageInfo and add `def attemptNumber() = attemptId` 2. Replace usage of stageAttemptId with stageAttemptNumber ## How was this patch tested? I manually checked the compiler warning info Author: Xianjin YE Closes #20178 from advancedxy/SPARK-22952. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40b983c3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40b983c3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40b983c3 Branch: refs/heads/master Commit: 40b983c3b44b6771f07302ce87987fa4716b5ebf Parents: eb45b52 Author: Xianjin YE Authored: Mon Jan 8 23:49:07 2018 +0800 Committer: Wenchen Fan Committed: Mon Jan 8 23:49:07 2018 +0800 -- .../apache/spark/scheduler/DAGScheduler.scala | 15 +++--- .../org/apache/spark/scheduler/StageInfo.scala | 4 +- .../spark/scheduler/StatsReportListener.scala | 2 +- .../apache/spark/status/AppStatusListener.scala | 7 +-- .../org/apache/spark/status/LiveEntity.scala| 4 +- .../spark/ui/scope/RDDOperationGraph.scala | 2 +- .../org/apache/spark/util/JsonProtocol.scala| 2 +- .../spark/status/AppStatusListenerSuite.scala | 54 +++- .../sql/execution/ui/SQLAppStatusListener.scala | 2 +- 9 files changed, 51 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/40b983c3/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c2498d4..199937b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -815,7 +815,8 @@ class DAGScheduler( private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) { // Note that there is a chance that this task is launched after the stage is cancelled. // In that case, we wouldn't have the stage anymore in stageIdToStage. -val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1) +val stageAttemptId = + stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1) listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo)) } @@ -1050,7 +1051,7 @@ class DAGScheduler( val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) stage.pendingPartitions += id -new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, +new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } @@ -1060,7 +1061,7 @@ class DAGScheduler( val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) -new ResultTask(stage.id, stage.latestInfo.attemptId, +new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } @@ -1076,7 +1077,7 @@ class DAGScheduler( logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") taskScheduler.submitTasks(new TaskSet( -tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) +tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties)) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run @@ -1245,7 +1246,7 @@ class DAGScheduler( val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) -if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) { +if (stageIdToStage(task.stageId).latestInfo.attemptNumber == task.stageAttemptId) { // This task was for the currently running attempt of the stage. Since the task // completed successfully from the perspective of the TaskSetManager, mark
svn commit: r24074 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_06_01-4a45f0a-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Jan 8 14:14:57 2018 New Revision: 24074 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_08_06_01-4a45f0a docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24071 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_05_06-eb45b52-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Jan 8 13:19:57 2018 New Revision: 24071 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_08_05_06-eb45b52 docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21865][SQL] simplify the distribution semantic of Spark SQL
Repository: spark Updated Branches: refs/heads/branch-2.3 6964dfe47 -> 4a45f0a53 [SPARK-21865][SQL] simplify the distribution semantic of Spark SQL ## What changes were proposed in this pull request? **The current shuffle planning logic** 1. Each operator specifies the distribution requirements for its children, via the `Distribution` interface. 2. Each operator specifies its output partitioning, via the `Partitioning` interface. 3. `Partitioning.satisfy` determines whether a `Partitioning` can satisfy a `Distribution`. 4. For each operator, check each child of it, add a shuffle node above the child if the child partitioning can not satisfy the required distribution. 5. For each operator, check if its children's output partitionings are compatible with each other, via the `Partitioning.compatibleWith`. 6. If the check in 5 failed, add a shuffle above each child. 7. try to eliminate the shuffles added in 6, via `Partitioning.guarantees`. This design has a major problem with the definition of "compatible". `Partitioning.compatibleWith` is not well defined, ideally a `Partitioning` can't know if it's compatible with other `Partitioning`, without more information from the operator. For example, `t1 join t2 on t1.a = t2.b`, `HashPartitioning(a, 10)` should be compatible with `HashPartitioning(b, 10)` under this case, but the partitioning itself doesn't know it. As a result, currently `Partitioning.compatibleWith` always return false except for literals, which make it almost useless. This also means, if an operator has distribution requirements for multiple children, Spark always add shuffle nodes to all the children(although some of them can be eliminated). However, there is no guarantee that the children's output partitionings are compatible with each other after adding these shuffles, we just assume that the operator will only specify `ClusteredDistribution` for multiple children. I think it's very hard to guarantee children co-partition for all kinds of operators, and we can not even give a clear definition about co-partition between distributions like `ClusteredDistribution(a,b)` and `ClusteredDistribution(c)`. I think we should drop the "compatible" concept in the distribution model, and let the operator achieve the co-partition requirement by special distribution requirements. **Proposed shuffle planning logic after this PR** (The first 4 are same as before) 1. Each operator specifies the distribution requirements for its children, via the `Distribution` interface. 2. Each operator specifies its output partitioning, via the `Partitioning` interface. 3. `Partitioning.satisfy` determines whether a `Partitioning` can satisfy a `Distribution`. 4. For each operator, check each child of it, add a shuffle node above the child if the child partitioning can not satisfy the required distribution. 5. For each operator, check if its children's output partitionings have the same number of partitions. 6. If the check in 5 failed, pick the max number of partitions from children's output partitionings, and add shuffle to child whose number of partitions doesn't equal to the max one. The new distribution model is very simple, we only have one kind of relationship, which is `Partitioning.satisfy`. For multiple children, Spark only guarantees they have the same number of partitions, and it's the operator's responsibility to leverage this guarantee to achieve more complicated requirements. For example, non-broadcast joins can use the newly added `HashPartitionedDistribution` to achieve co-partition. ## How was this patch tested? existing tests. Author: Wenchen Fan Closes #19080 from cloud-fan/exchange. (cherry picked from commit eb45b52e826ea9cea48629760db35ef87f91fea0) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a45f0a5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a45f0a5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a45f0a5 Branch: refs/heads/branch-2.3 Commit: 4a45f0a532216736f8874417c5cbd7912ca13db5 Parents: 6964dfe Author: Wenchen Fan Authored: Mon Jan 8 19:41:41 2018 +0800 Committer: Wenchen Fan Committed: Mon Jan 8 19:42:28 2018 +0800 -- .../catalyst/plans/physical/partitioning.scala | 286 +++ .../spark/sql/catalyst/PartitioningSuite.scala | 55 .../apache/spark/sql/execution/SparkPlan.scala | 16 +- .../execution/exchange/EnsureRequirements.scala | 120 +++- .../execution/joins/ShuffledHashJoinExec.scala | 2 +- .../sql/execution/joins/SortMergeJoinExec.scala | 2 +- .../apache/spark/sql/execution/objects.scala| 2 +- .../spark/sql/execution/PlannerSuite.scala | 81 ++ 8 files changed, 194 insertions(+), 370 deletions(-) -- http://git-wip-us.apache
spark git commit: [SPARK-21865][SQL] simplify the distribution semantic of Spark SQL
Repository: spark Updated Branches: refs/heads/master 2c73d2a94 -> eb45b52e8 [SPARK-21865][SQL] simplify the distribution semantic of Spark SQL ## What changes were proposed in this pull request? **The current shuffle planning logic** 1. Each operator specifies the distribution requirements for its children, via the `Distribution` interface. 2. Each operator specifies its output partitioning, via the `Partitioning` interface. 3. `Partitioning.satisfy` determines whether a `Partitioning` can satisfy a `Distribution`. 4. For each operator, check each child of it, add a shuffle node above the child if the child partitioning can not satisfy the required distribution. 5. For each operator, check if its children's output partitionings are compatible with each other, via the `Partitioning.compatibleWith`. 6. If the check in 5 failed, add a shuffle above each child. 7. try to eliminate the shuffles added in 6, via `Partitioning.guarantees`. This design has a major problem with the definition of "compatible". `Partitioning.compatibleWith` is not well defined, ideally a `Partitioning` can't know if it's compatible with other `Partitioning`, without more information from the operator. For example, `t1 join t2 on t1.a = t2.b`, `HashPartitioning(a, 10)` should be compatible with `HashPartitioning(b, 10)` under this case, but the partitioning itself doesn't know it. As a result, currently `Partitioning.compatibleWith` always return false except for literals, which make it almost useless. This also means, if an operator has distribution requirements for multiple children, Spark always add shuffle nodes to all the children(although some of them can be eliminated). However, there is no guarantee that the children's output partitionings are compatible with each other after adding these shuffles, we just assume that the operator will only specify `ClusteredDistribution` for multiple children. I think it's very hard to guarantee children co-partition for all kinds of operators, and we can not even give a clear definition about co-partition between distributions like `ClusteredDistribution(a,b)` and `ClusteredDistribution(c)`. I think we should drop the "compatible" concept in the distribution model, and let the operator achieve the co-partition requirement by special distribution requirements. **Proposed shuffle planning logic after this PR** (The first 4 are same as before) 1. Each operator specifies the distribution requirements for its children, via the `Distribution` interface. 2. Each operator specifies its output partitioning, via the `Partitioning` interface. 3. `Partitioning.satisfy` determines whether a `Partitioning` can satisfy a `Distribution`. 4. For each operator, check each child of it, add a shuffle node above the child if the child partitioning can not satisfy the required distribution. 5. For each operator, check if its children's output partitionings have the same number of partitions. 6. If the check in 5 failed, pick the max number of partitions from children's output partitionings, and add shuffle to child whose number of partitions doesn't equal to the max one. The new distribution model is very simple, we only have one kind of relationship, which is `Partitioning.satisfy`. For multiple children, Spark only guarantees they have the same number of partitions, and it's the operator's responsibility to leverage this guarantee to achieve more complicated requirements. For example, non-broadcast joins can use the newly added `HashPartitionedDistribution` to achieve co-partition. ## How was this patch tested? existing tests. Author: Wenchen Fan Closes #19080 from cloud-fan/exchange. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb45b52e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb45b52e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb45b52e Branch: refs/heads/master Commit: eb45b52e826ea9cea48629760db35ef87f91fea0 Parents: 2c73d2a Author: Wenchen Fan Authored: Mon Jan 8 19:41:41 2018 +0800 Committer: Wenchen Fan Committed: Mon Jan 8 19:41:41 2018 +0800 -- .../catalyst/plans/physical/partitioning.scala | 286 +++ .../spark/sql/catalyst/PartitioningSuite.scala | 55 .../apache/spark/sql/execution/SparkPlan.scala | 16 +- .../execution/exchange/EnsureRequirements.scala | 120 +++- .../execution/joins/ShuffledHashJoinExec.scala | 2 +- .../sql/execution/joins/SortMergeJoinExec.scala | 2 +- .../apache/spark/sql/execution/objects.scala| 2 +- .../spark/sql/execution/PlannerSuite.scala | 81 ++ 8 files changed, 194 insertions(+), 370 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eb45b52e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physica
svn commit: r24067 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_02_01-6964dfe-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Jan 8 10:15:29 2018 New Revision: 24067 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_08_02_01-6964dfe docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24065 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_00_01-8fdeb4b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Jan 8 08:16:44 2018 New Revision: 24065 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_08_00_01-8fdeb4b docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22983] Don't push filters beneath aggregates with empty grouping expressions
Repository: spark Updated Branches: refs/heads/branch-2.2 41f705a57 -> 7c30ae39f [SPARK-22983] Don't push filters beneath aggregates with empty grouping expressions ## What changes were proposed in this pull request? The following SQL query should return zero rows, but in Spark it actually returns one row: ``` SELECT 1 from ( SELECT 1 AS z, MIN(a.x) FROM (select 1 as x) a WHERE false ) b where b.z != b.z ``` The problem stems from the `PushDownPredicate` rule: when this rule encounters a filter on top of an Aggregate operator, e.g. `Filter(Agg(...))`, it removes the original filter and adds a new filter onto Aggregate's child, e.g. `Agg(Filter(...))`. This is sometimes okay, but the case above is a counterexample: because there is no explicit `GROUP BY`, we are implicitly computing a global aggregate over the entire table so the original filter was not acting like a `HAVING` clause filtering the number of groups: if we push this filter then it fails to actually reduce the cardinality of the Aggregate output, leading to the wrong answer. In 2016 I fixed a similar problem involving invalid pushdowns of data-independent filters (filters which reference no columns of the filtered relation). There was additional discussion after my fix was merged which pointed out that my patch was an incomplete fix (see #15289), but it looks I must have either misunderstood the comment or forgot to follow up on the additional points raised there. This patch fixes the problem by choosing to never push down filters in cases where there are no grouping expressions. Since there are no grouping keys, the only columns are aggregate columns and we can't push filters defined over aggregate results, so this change won't cause us to miss out on any legitimate pushdown opportunities. ## How was this patch tested? New regression tests in `SQLQueryTestSuite` and `FilterPushdownSuite`. Author: Josh Rosen Closes #20180 from JoshRosen/SPARK-22983-dont-push-filters-beneath-aggs-with-empty-grouping-expressions. (cherry picked from commit 2c73d2a948bdde798aaf0f87c18846281deb05fd) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c30ae39 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c30ae39 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c30ae39 Branch: refs/heads/branch-2.2 Commit: 7c30ae39f57ef0c42173b52aa405027b44e0ad9f Parents: 41f705a Author: Josh Rosen Authored: Mon Jan 8 16:04:03 2018 +0800 Committer: gatorsmile Committed: Mon Jan 8 16:05:04 2018 +0800 -- .../spark/sql/catalyst/optimizer/Optimizer.scala| 3 ++- .../catalyst/optimizer/FilterPushdownSuite.scala| 13 + .../test/resources/sql-tests/inputs/group-by.sql| 9 + .../resources/sql-tests/results/group-by.sql.out| 16 +++- 4 files changed, 39 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7c30ae39/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 82bd759..fe66821 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -754,7 +754,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) case filter @ Filter(condition, aggregate: Aggregate) - if aggregate.aggregateExpressions.forall(_.deterministic) => + if aggregate.aggregateExpressions.forall(_.deterministic) +&& aggregate.groupingExpressions.nonEmpty => // Find all the aliased expressions in the aggregate list that don't include any actual // AggregateExpression, and create a map from the alias to the expression val aliasMap = AttributeMap(aggregate.aggregateExpressions.collect { http://git-wip-us.apache.org/repos/asf/spark/blob/7c30ae39/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index d4d281e..4d41354 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apac
spark git commit: [SPARK-22983] Don't push filters beneath aggregates with empty grouping expressions
Repository: spark Updated Branches: refs/heads/branch-2.3 8bf24e9fe -> 6964dfe47 [SPARK-22983] Don't push filters beneath aggregates with empty grouping expressions ## What changes were proposed in this pull request? The following SQL query should return zero rows, but in Spark it actually returns one row: ``` SELECT 1 from ( SELECT 1 AS z, MIN(a.x) FROM (select 1 as x) a WHERE false ) b where b.z != b.z ``` The problem stems from the `PushDownPredicate` rule: when this rule encounters a filter on top of an Aggregate operator, e.g. `Filter(Agg(...))`, it removes the original filter and adds a new filter onto Aggregate's child, e.g. `Agg(Filter(...))`. This is sometimes okay, but the case above is a counterexample: because there is no explicit `GROUP BY`, we are implicitly computing a global aggregate over the entire table so the original filter was not acting like a `HAVING` clause filtering the number of groups: if we push this filter then it fails to actually reduce the cardinality of the Aggregate output, leading to the wrong answer. In 2016 I fixed a similar problem involving invalid pushdowns of data-independent filters (filters which reference no columns of the filtered relation). There was additional discussion after my fix was merged which pointed out that my patch was an incomplete fix (see #15289), but it looks I must have either misunderstood the comment or forgot to follow up on the additional points raised there. This patch fixes the problem by choosing to never push down filters in cases where there are no grouping expressions. Since there are no grouping keys, the only columns are aggregate columns and we can't push filters defined over aggregate results, so this change won't cause us to miss out on any legitimate pushdown opportunities. ## How was this patch tested? New regression tests in `SQLQueryTestSuite` and `FilterPushdownSuite`. Author: Josh Rosen Closes #20180 from JoshRosen/SPARK-22983-dont-push-filters-beneath-aggs-with-empty-grouping-expressions. (cherry picked from commit 2c73d2a948bdde798aaf0f87c18846281deb05fd) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6964dfe4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6964dfe4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6964dfe4 Branch: refs/heads/branch-2.3 Commit: 6964dfe47b2090e542b26cd64e27420ec3eb1a3d Parents: 8bf24e9 Author: Josh Rosen Authored: Mon Jan 8 16:04:03 2018 +0800 Committer: gatorsmile Committed: Mon Jan 8 16:04:28 2018 +0800 -- .../spark/sql/catalyst/optimizer/Optimizer.scala| 3 ++- .../catalyst/optimizer/FilterPushdownSuite.scala| 13 + .../test/resources/sql-tests/inputs/group-by.sql| 9 + .../resources/sql-tests/results/group-by.sql.out| 16 +++- 4 files changed, 39 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6964dfe4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0d4b02c..df0af82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -795,7 +795,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) case filter @ Filter(condition, aggregate: Aggregate) - if aggregate.aggregateExpressions.forall(_.deterministic) => + if aggregate.aggregateExpressions.forall(_.deterministic) +&& aggregate.groupingExpressions.nonEmpty => // Find all the aliased expressions in the aggregate list that don't include any actual // AggregateExpression, and create a map from the alias to the expression val aliasMap = AttributeMap(aggregate.aggregateExpressions.collect { http://git-wip-us.apache.org/repos/asf/spark/blob/6964dfe4/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 85a5e97..82a1025 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apac
spark git commit: [SPARK-22983] Don't push filters beneath aggregates with empty grouping expressions
Repository: spark Updated Branches: refs/heads/master 8fdeb4b99 -> 2c73d2a94 [SPARK-22983] Don't push filters beneath aggregates with empty grouping expressions ## What changes were proposed in this pull request? The following SQL query should return zero rows, but in Spark it actually returns one row: ``` SELECT 1 from ( SELECT 1 AS z, MIN(a.x) FROM (select 1 as x) a WHERE false ) b where b.z != b.z ``` The problem stems from the `PushDownPredicate` rule: when this rule encounters a filter on top of an Aggregate operator, e.g. `Filter(Agg(...))`, it removes the original filter and adds a new filter onto Aggregate's child, e.g. `Agg(Filter(...))`. This is sometimes okay, but the case above is a counterexample: because there is no explicit `GROUP BY`, we are implicitly computing a global aggregate over the entire table so the original filter was not acting like a `HAVING` clause filtering the number of groups: if we push this filter then it fails to actually reduce the cardinality of the Aggregate output, leading to the wrong answer. In 2016 I fixed a similar problem involving invalid pushdowns of data-independent filters (filters which reference no columns of the filtered relation). There was additional discussion after my fix was merged which pointed out that my patch was an incomplete fix (see #15289), but it looks I must have either misunderstood the comment or forgot to follow up on the additional points raised there. This patch fixes the problem by choosing to never push down filters in cases where there are no grouping expressions. Since there are no grouping keys, the only columns are aggregate columns and we can't push filters defined over aggregate results, so this change won't cause us to miss out on any legitimate pushdown opportunities. ## How was this patch tested? New regression tests in `SQLQueryTestSuite` and `FilterPushdownSuite`. Author: Josh Rosen Closes #20180 from JoshRosen/SPARK-22983-dont-push-filters-beneath-aggs-with-empty-grouping-expressions. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c73d2a9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c73d2a9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c73d2a9 Branch: refs/heads/master Commit: 2c73d2a948bdde798aaf0f87c18846281deb05fd Parents: 8fdeb4b Author: Josh Rosen Authored: Mon Jan 8 16:04:03 2018 +0800 Committer: gatorsmile Committed: Mon Jan 8 16:04:03 2018 +0800 -- .../spark/sql/catalyst/optimizer/Optimizer.scala| 3 ++- .../catalyst/optimizer/FilterPushdownSuite.scala| 13 + .../test/resources/sql-tests/inputs/group-by.sql| 9 + .../resources/sql-tests/results/group-by.sql.out| 16 +++- 4 files changed, 39 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2c73d2a9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0d4b02c..df0af82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -795,7 +795,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) case filter @ Filter(condition, aggregate: Aggregate) - if aggregate.aggregateExpressions.forall(_.deterministic) => + if aggregate.aggregateExpressions.forall(_.deterministic) +&& aggregate.groupingExpressions.nonEmpty => // Find all the aliased expressions in the aggregate list that don't include any actual // AggregateExpression, and create a map from the alias to the expression val aliasMap = AttributeMap(aggregate.aggregateExpressions.collect { http://git-wip-us.apache.org/repos/asf/spark/blob/2c73d2a9/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 85a5e97..82a1025 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -809,6 +809,19 @@ class FilterPushdownSuite