[spark] branch master updated: [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 33f532a [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator 33f532a is described below commit 33f532a9f201fb9c7895d685b3dce82cf042dc61 Author: yi.wu AuthorDate: Thu Mar 26 09:11:13 2020 -0700 [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator ### What changes were proposed in this pull request? Fix incorrect log of `cureRequestSize`. ### Why are the changes needed? In batch mode, `curRequestSize` can be the total size of several block groups. And each group should have its own request size instead of using the total size. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? It's only affect log. Closes #28028 from Ngone51/fix_curRequestSize. Authored-by: yi.wu Signed-off-by: Dongjoon Hyun --- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 14 ++ 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index f1a7d88..404e055 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -329,9 +329,8 @@ final class ShuffleBlockFetcherIterator( private def createFetchRequest( blocks: Seq[FetchBlockInfo], - address: BlockManagerId, - curRequestSize: Long): FetchRequest = { -logDebug(s"Creating fetch request of $curRequestSize at $address " + address: BlockManagerId): FetchRequest = { +logDebug(s"Creating fetch request of ${blocks.map(_.size).sum} at $address " + s"with ${blocks.size} blocks") FetchRequest(address, blocks) } @@ -339,17 +338,16 @@ final class ShuffleBlockFetcherIterator( private def createFetchRequests( curBlocks: Seq[FetchBlockInfo], address: BlockManagerId, - curRequestSize: Long, isLast: Boolean, collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo] = { val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks) var retBlocks = Seq.empty[FetchBlockInfo] if (mergedBlocks.length <= maxBlocksInFlightPerAddress) { - collectedRemoteRequests += createFetchRequest(mergedBlocks, address, curRequestSize) + collectedRemoteRequests += createFetchRequest(mergedBlocks, address) } else { mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { blocks => if (blocks.length == maxBlocksInFlightPerAddress || isLast) { - collectedRemoteRequests += createFetchRequest(blocks, address, curRequestSize) + collectedRemoteRequests += createFetchRequest(blocks, address) } else { // The last group does not exceed `maxBlocksInFlightPerAddress`. Put it back // to `curBlocks`. @@ -377,14 +375,14 @@ final class ShuffleBlockFetcherIterator( // For batch fetch, the actual block in flight should count for merged block. val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) { -curBlocks = createFetchRequests(curBlocks, address, curRequestSize, isLast = false, +curBlocks = createFetchRequests(curBlocks, address, isLast = false, collectedRemoteRequests).to[ArrayBuffer] curRequestSize = curBlocks.map(_.size).sum } } // Add in the final request if (curBlocks.nonEmpty) { - curBlocks = createFetchRequests(curBlocks, address, curRequestSize, isLast = true, + curBlocks = createFetchRequests(curBlocks, address, isLast = true, collectedRemoteRequests).to[ArrayBuffer] curRequestSize = curBlocks.map(_.size).sum } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 33f532a [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator 33f532a is described below commit 33f532a9f201fb9c7895d685b3dce82cf042dc61 Author: yi.wu AuthorDate: Thu Mar 26 09:11:13 2020 -0700 [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator ### What changes were proposed in this pull request? Fix incorrect log of `cureRequestSize`. ### Why are the changes needed? In batch mode, `curRequestSize` can be the total size of several block groups. And each group should have its own request size instead of using the total size. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? It's only affect log. Closes #28028 from Ngone51/fix_curRequestSize. Authored-by: yi.wu Signed-off-by: Dongjoon Hyun --- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 14 ++ 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index f1a7d88..404e055 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -329,9 +329,8 @@ final class ShuffleBlockFetcherIterator( private def createFetchRequest( blocks: Seq[FetchBlockInfo], - address: BlockManagerId, - curRequestSize: Long): FetchRequest = { -logDebug(s"Creating fetch request of $curRequestSize at $address " + address: BlockManagerId): FetchRequest = { +logDebug(s"Creating fetch request of ${blocks.map(_.size).sum} at $address " + s"with ${blocks.size} blocks") FetchRequest(address, blocks) } @@ -339,17 +338,16 @@ final class ShuffleBlockFetcherIterator( private def createFetchRequests( curBlocks: Seq[FetchBlockInfo], address: BlockManagerId, - curRequestSize: Long, isLast: Boolean, collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo] = { val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks) var retBlocks = Seq.empty[FetchBlockInfo] if (mergedBlocks.length <= maxBlocksInFlightPerAddress) { - collectedRemoteRequests += createFetchRequest(mergedBlocks, address, curRequestSize) + collectedRemoteRequests += createFetchRequest(mergedBlocks, address) } else { mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { blocks => if (blocks.length == maxBlocksInFlightPerAddress || isLast) { - collectedRemoteRequests += createFetchRequest(blocks, address, curRequestSize) + collectedRemoteRequests += createFetchRequest(blocks, address) } else { // The last group does not exceed `maxBlocksInFlightPerAddress`. Put it back // to `curBlocks`. @@ -377,14 +375,14 @@ final class ShuffleBlockFetcherIterator( // For batch fetch, the actual block in flight should count for merged block. val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) { -curBlocks = createFetchRequests(curBlocks, address, curRequestSize, isLast = false, +curBlocks = createFetchRequests(curBlocks, address, isLast = false, collectedRemoteRequests).to[ArrayBuffer] curRequestSize = curBlocks.map(_.size).sum } } // Add in the final request if (curBlocks.nonEmpty) { - curBlocks = createFetchRequests(curBlocks, address, curRequestSize, isLast = true, + curBlocks = createFetchRequests(curBlocks, address, isLast = true, collectedRemoteRequests).to[ArrayBuffer] curRequestSize = curBlocks.map(_.size).sum } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 8f93dc2 [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator 8f93dc2 is described below commit 8f93dc2f1dd8bd09d52fd3dc07a4c10e70bd237c Author: yi.wu AuthorDate: Thu Mar 26 09:11:13 2020 -0700 [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator ### What changes were proposed in this pull request? Fix incorrect log of `cureRequestSize`. ### Why are the changes needed? In batch mode, `curRequestSize` can be the total size of several block groups. And each group should have its own request size instead of using the total size. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? It's only affect log. Closes #28028 from Ngone51/fix_curRequestSize. Authored-by: yi.wu Signed-off-by: Dongjoon Hyun (cherry picked from commit 33f532a9f201fb9c7895d685b3dce82cf042dc61) Signed-off-by: Dongjoon Hyun --- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 14 ++ 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index f1a7d88..404e055 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -329,9 +329,8 @@ final class ShuffleBlockFetcherIterator( private def createFetchRequest( blocks: Seq[FetchBlockInfo], - address: BlockManagerId, - curRequestSize: Long): FetchRequest = { -logDebug(s"Creating fetch request of $curRequestSize at $address " + address: BlockManagerId): FetchRequest = { +logDebug(s"Creating fetch request of ${blocks.map(_.size).sum} at $address " + s"with ${blocks.size} blocks") FetchRequest(address, blocks) } @@ -339,17 +338,16 @@ final class ShuffleBlockFetcherIterator( private def createFetchRequests( curBlocks: Seq[FetchBlockInfo], address: BlockManagerId, - curRequestSize: Long, isLast: Boolean, collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo] = { val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks) var retBlocks = Seq.empty[FetchBlockInfo] if (mergedBlocks.length <= maxBlocksInFlightPerAddress) { - collectedRemoteRequests += createFetchRequest(mergedBlocks, address, curRequestSize) + collectedRemoteRequests += createFetchRequest(mergedBlocks, address) } else { mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { blocks => if (blocks.length == maxBlocksInFlightPerAddress || isLast) { - collectedRemoteRequests += createFetchRequest(blocks, address, curRequestSize) + collectedRemoteRequests += createFetchRequest(blocks, address) } else { // The last group does not exceed `maxBlocksInFlightPerAddress`. Put it back // to `curBlocks`. @@ -377,14 +375,14 @@ final class ShuffleBlockFetcherIterator( // For batch fetch, the actual block in flight should count for merged block. val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) { -curBlocks = createFetchRequests(curBlocks, address, curRequestSize, isLast = false, +curBlocks = createFetchRequests(curBlocks, address, isLast = false, collectedRemoteRequests).to[ArrayBuffer] curRequestSize = curBlocks.map(_.size).sum } } // Add in the final request if (curBlocks.nonEmpty) { - curBlocks = createFetchRequests(curBlocks, address, curRequestSize, isLast = true, + curBlocks = createFetchRequests(curBlocks, address, isLast = true, collectedRemoteRequests).to[ArrayBuffer] curRequestSize = curBlocks.map(_.size).sum } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 78cc2ef [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource 78cc2ef is described below commit 78cc2ef5b663d6d605e3d4febc6fb99e20b7f165 Author: Maxim Gekk AuthorDate: Thu Mar 26 13:14:28 2020 -0700 [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource ### What changes were proposed in this pull request? This PR (SPARK-31238) aims the followings. 1. Modified ORC Vectorized Reader, in particular, OrcColumnVector v1.2 and v2.3. After the changes, it uses `DateTimeUtils. rebaseJulianToGregorianDays()` added by https://github.com/apache/spark/pull/27915 . The method performs rebasing days from the hybrid calendar (Julian + Gregorian) to Proleptic Gregorian calendar. It builds a local date in the original calendar, extracts date fields `year`, `month` and `day` from the local date, and builds another local date in the target calend [...] 2. Introduced rebasing dates while saving ORC files, in particular, I modified `OrcShimUtils. getDateWritable` v1.2 and v2.3, and returned `DaysWritable` instead of Hive's `DateWritable`. The `DaysWritable` class was added by the PR https://github.com/apache/spark/pull/27890 (and fixed by https://github.com/apache/spark/pull/27962). I moved `DaysWritable` from `sql/hive` to `sql/core` to re-use it in ORC datasource. ### Why are the changes needed? For the backward compatibility with Spark 2.4 and earlier versions. The changes allow users to read dates/timestamps saved by previous version, and get the same result. ### Does this PR introduce any user-facing change? Yes. Before the changes, loading the date `1200-01-01` saved by Spark 2.4.5 returns the following: ```scala scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false) +--+ |dt| +--+ |1200-01-08| +--+ ``` After the changes ```scala scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false) +--+ |dt| +--+ |1200-01-01| +--+ ``` ### How was this patch tested? - By running `OrcSourceSuite` and `HiveOrcSourceSuite`. - Add new test `SPARK-31238: compatibility with Spark 2.4 in reading dates` to `OrcSuite` which reads an ORC file saved by Spark 2.4.5 via the commands: ```shell $ export TZ="America/Los_Angeles" ``` ```scala scala> sql("select cast('1200-01-01' as date) dt").write.mode("overwrite").orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc") scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false) +--+ |dt| +--+ |1200-01-01| +--+ ``` - Add round trip test `SPARK-31238: rebasing dates in write`. The test `SPARK-31238: compatibility with Spark 2.4 in reading dates` confirms rebasing in read. So, we can check rebasing in write. Closes #28016 from MaxGekk/rebase-date-orc. Authored-by: Maxim Gekk Signed-off-by: Dongjoon Hyun (cherry picked from commit d72ec8574113f9a7e87f3d7ec56c8447267b0506) Signed-off-by: Dongjoon Hyun --- .../sql/execution/datasources}/DaysWritable.scala | 10 ++-- .../test-data/before_1582_date_v2_4.snappy.orc | Bin 0 -> 201 bytes .../execution/datasources/orc/OrcSourceSuite.scala | 28 - .../sql/execution/datasources/orc/OrcTest.scala| 5 .../execution/datasources/orc/OrcColumnVector.java | 15 ++- .../execution/datasources/orc}/DaysWritable.scala | 17 ++--- .../execution/datasources/orc/OrcShimUtils.scala | 4 +-- .../execution/datasources/orc/OrcColumnVector.java | 15 ++- .../execution/datasources/orc/OrcShimUtils.scala | 5 ++-- .../org/apache/spark/sql/hive/HiveInspectors.scala | 1 + 10 files changed, 88 insertions(+), 12 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala similarity index 92% copy from sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala copy to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala index 1eec8d7..00b710f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.hive +package org.apache.spark.sql.execution.datasources import java.io.{DataInput, DataOutput,
[spark] branch master updated: [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d72ec85 [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource d72ec85 is described below commit d72ec8574113f9a7e87f3d7ec56c8447267b0506 Author: Maxim Gekk AuthorDate: Thu Mar 26 13:14:28 2020 -0700 [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource ### What changes were proposed in this pull request? This PR (SPARK-31238) aims the followings. 1. Modified ORC Vectorized Reader, in particular, OrcColumnVector v1.2 and v2.3. After the changes, it uses `DateTimeUtils. rebaseJulianToGregorianDays()` added by https://github.com/apache/spark/pull/27915 . The method performs rebasing days from the hybrid calendar (Julian + Gregorian) to Proleptic Gregorian calendar. It builds a local date in the original calendar, extracts date fields `year`, `month` and `day` from the local date, and builds another local date in the target calend [...] 2. Introduced rebasing dates while saving ORC files, in particular, I modified `OrcShimUtils. getDateWritable` v1.2 and v2.3, and returned `DaysWritable` instead of Hive's `DateWritable`. The `DaysWritable` class was added by the PR https://github.com/apache/spark/pull/27890 (and fixed by https://github.com/apache/spark/pull/27962). I moved `DaysWritable` from `sql/hive` to `sql/core` to re-use it in ORC datasource. ### Why are the changes needed? For the backward compatibility with Spark 2.4 and earlier versions. The changes allow users to read dates/timestamps saved by previous version, and get the same result. ### Does this PR introduce any user-facing change? Yes. Before the changes, loading the date `1200-01-01` saved by Spark 2.4.5 returns the following: ```scala scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false) +--+ |dt| +--+ |1200-01-08| +--+ ``` After the changes ```scala scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false) +--+ |dt| +--+ |1200-01-01| +--+ ``` ### How was this patch tested? - By running `OrcSourceSuite` and `HiveOrcSourceSuite`. - Add new test `SPARK-31238: compatibility with Spark 2.4 in reading dates` to `OrcSuite` which reads an ORC file saved by Spark 2.4.5 via the commands: ```shell $ export TZ="America/Los_Angeles" ``` ```scala scala> sql("select cast('1200-01-01' as date) dt").write.mode("overwrite").orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc") scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false) +--+ |dt| +--+ |1200-01-01| +--+ ``` - Add round trip test `SPARK-31238: rebasing dates in write`. The test `SPARK-31238: compatibility with Spark 2.4 in reading dates` confirms rebasing in read. So, we can check rebasing in write. Closes #28016 from MaxGekk/rebase-date-orc. Authored-by: Maxim Gekk Signed-off-by: Dongjoon Hyun --- .../sql/execution/datasources}/DaysWritable.scala | 10 ++-- .../test-data/before_1582_date_v2_4.snappy.orc | Bin 0 -> 201 bytes .../execution/datasources/orc/OrcSourceSuite.scala | 28 - .../sql/execution/datasources/orc/OrcTest.scala| 5 .../execution/datasources/orc/OrcColumnVector.java | 15 ++- .../execution/datasources/orc}/DaysWritable.scala | 17 ++--- .../execution/datasources/orc/OrcShimUtils.scala | 4 +-- .../execution/datasources/orc/OrcColumnVector.java | 15 ++- .../execution/datasources/orc/OrcShimUtils.scala | 5 ++-- .../org/apache/spark/sql/hive/HiveInspectors.scala | 1 + 10 files changed, 88 insertions(+), 12 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala similarity index 92% copy from sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala copy to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala index 1eec8d7..00b710f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.hive +package org.apache.spark.sql.execution.datasources import java.io.{DataInput, DataOutput, IOException} import java.sql.Date @@ -35,11 +35,12 @@ import
[spark] branch branch-3.0 updated: [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 78cc2ef [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource 78cc2ef is described below commit 78cc2ef5b663d6d605e3d4febc6fb99e20b7f165 Author: Maxim Gekk AuthorDate: Thu Mar 26 13:14:28 2020 -0700 [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource ### What changes were proposed in this pull request? This PR (SPARK-31238) aims the followings. 1. Modified ORC Vectorized Reader, in particular, OrcColumnVector v1.2 and v2.3. After the changes, it uses `DateTimeUtils. rebaseJulianToGregorianDays()` added by https://github.com/apache/spark/pull/27915 . The method performs rebasing days from the hybrid calendar (Julian + Gregorian) to Proleptic Gregorian calendar. It builds a local date in the original calendar, extracts date fields `year`, `month` and `day` from the local date, and builds another local date in the target calend [...] 2. Introduced rebasing dates while saving ORC files, in particular, I modified `OrcShimUtils. getDateWritable` v1.2 and v2.3, and returned `DaysWritable` instead of Hive's `DateWritable`. The `DaysWritable` class was added by the PR https://github.com/apache/spark/pull/27890 (and fixed by https://github.com/apache/spark/pull/27962). I moved `DaysWritable` from `sql/hive` to `sql/core` to re-use it in ORC datasource. ### Why are the changes needed? For the backward compatibility with Spark 2.4 and earlier versions. The changes allow users to read dates/timestamps saved by previous version, and get the same result. ### Does this PR introduce any user-facing change? Yes. Before the changes, loading the date `1200-01-01` saved by Spark 2.4.5 returns the following: ```scala scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false) +--+ |dt| +--+ |1200-01-08| +--+ ``` After the changes ```scala scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false) +--+ |dt| +--+ |1200-01-01| +--+ ``` ### How was this patch tested? - By running `OrcSourceSuite` and `HiveOrcSourceSuite`. - Add new test `SPARK-31238: compatibility with Spark 2.4 in reading dates` to `OrcSuite` which reads an ORC file saved by Spark 2.4.5 via the commands: ```shell $ export TZ="America/Los_Angeles" ``` ```scala scala> sql("select cast('1200-01-01' as date) dt").write.mode("overwrite").orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc") scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false) +--+ |dt| +--+ |1200-01-01| +--+ ``` - Add round trip test `SPARK-31238: rebasing dates in write`. The test `SPARK-31238: compatibility with Spark 2.4 in reading dates` confirms rebasing in read. So, we can check rebasing in write. Closes #28016 from MaxGekk/rebase-date-orc. Authored-by: Maxim Gekk Signed-off-by: Dongjoon Hyun (cherry picked from commit d72ec8574113f9a7e87f3d7ec56c8447267b0506) Signed-off-by: Dongjoon Hyun --- .../sql/execution/datasources}/DaysWritable.scala | 10 ++-- .../test-data/before_1582_date_v2_4.snappy.orc | Bin 0 -> 201 bytes .../execution/datasources/orc/OrcSourceSuite.scala | 28 - .../sql/execution/datasources/orc/OrcTest.scala| 5 .../execution/datasources/orc/OrcColumnVector.java | 15 ++- .../execution/datasources/orc}/DaysWritable.scala | 17 ++--- .../execution/datasources/orc/OrcShimUtils.scala | 4 +-- .../execution/datasources/orc/OrcColumnVector.java | 15 ++- .../execution/datasources/orc/OrcShimUtils.scala | 5 ++-- .../org/apache/spark/sql/hive/HiveInspectors.scala | 1 + 10 files changed, 88 insertions(+), 12 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala similarity index 92% copy from sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala copy to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala index 1eec8d7..00b710f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.hive +package org.apache.spark.sql.execution.datasources import java.io.{DataInput, DataOutput,
[spark] branch branch-3.0 updated: [SPARK-31262][SQL][TESTS] Fix bug tests imported bracketed comments
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 6f30ff4 [SPARK-31262][SQL][TESTS] Fix bug tests imported bracketed comments 6f30ff4 is described below commit 6f30ff44cf2d3d347a516a0e0370d07e8de9352c Author: beliefer AuthorDate: Fri Mar 27 08:09:17 2020 +0900 [SPARK-31262][SQL][TESTS] Fix bug tests imported bracketed comments ### What changes were proposed in this pull request? This PR related to https://github.com/apache/spark/pull/27481. If test case A uses `--IMPORT` to import test case B contains bracketed comments, the output can't display bracketed comments in golden files well. The content of `nested-comments.sql` show below: ``` -- This test case just used to test imported bracketed comments. -- the first case of bracketed comment --QUERY-DELIMITER-START /* This is the first example of bracketed comment. SELECT 'ommented out content' AS first; */ SELECT 'selected content' AS first; --QUERY-DELIMITER-END ``` The test case `comments.sql` imports `nested-comments.sql` below: `--IMPORT nested-comments.sql` Before this PR, the output will be: ``` -- !query /* This is the first example of bracketed comment. SELECT 'ommented out content' AS first -- !query schema struct<> -- !query output org.apache.spark.sql.catalyst.parser.ParseException mismatched input '/' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', ' ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == /* This is the first example of bracketed comment. ^^^ SELECT 'ommented out content' AS first -- !query */ SELECT 'selected content' AS first -- !query schema struct<> -- !query output org.apache.spark.sql.catalyst.parser.ParseException extraneous input '*/' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == */ ^^^ SELECT 'selected content' AS first ``` After this PR, the output will be: ``` -- !query /* This is the first example of bracketed comment. SELECT 'ommented out content' AS first; */ SELECT 'selected content' AS first -- !query schema struct -- !query output selected content ``` ### Why are the changes needed? Golden files can't display the bracketed comments in imported test cases. ### Does this PR introduce any user-facing change? 'No'. ### How was this patch tested? New UT. Closes #28018 from beliefer/fix-bug-tests-imported-bracketed-comments. Authored-by: beliefer Signed-off-by: Takeshi Yamamuro (cherry picked from commit 9e0fee933e62eb309d4aa32bb1e5126125d0bf9f) Signed-off-by: Takeshi Yamamuro --- .../src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 6c66166..848966a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -256,20 +256,23 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession { def splitWithSemicolon(seq: Seq[String]) = { seq.mkString("\n").split("(?<=[^]);") } -val input = fileToString(new File(testCase.inputFile)) -val (comments, code) = input.split("\n").partition { line => +def splitCommentsAndCodes(input: String) = input.split("\n").partition { line => val newLine = line.trim newLine.startsWith("--") && !newLine.startsWith("--QUERY-DELIMITER") } +val input = fileToString(new File(testCase.inputFile)) + +val (comments, code) = splitCommentsAndCodes(input) + // If `--IMPORT` found, load code from another test case file, then insert them // into the head in this test. val importedTestCaseName =
[spark] branch master updated: [SPARK-31262][SQL][TESTS] Fix bug tests imported bracketed comments
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9e0fee9 [SPARK-31262][SQL][TESTS] Fix bug tests imported bracketed comments 9e0fee9 is described below commit 9e0fee933e62eb309d4aa32bb1e5126125d0bf9f Author: beliefer AuthorDate: Fri Mar 27 08:09:17 2020 +0900 [SPARK-31262][SQL][TESTS] Fix bug tests imported bracketed comments ### What changes were proposed in this pull request? This PR related to https://github.com/apache/spark/pull/27481. If test case A uses `--IMPORT` to import test case B contains bracketed comments, the output can't display bracketed comments in golden files well. The content of `nested-comments.sql` show below: ``` -- This test case just used to test imported bracketed comments. -- the first case of bracketed comment --QUERY-DELIMITER-START /* This is the first example of bracketed comment. SELECT 'ommented out content' AS first; */ SELECT 'selected content' AS first; --QUERY-DELIMITER-END ``` The test case `comments.sql` imports `nested-comments.sql` below: `--IMPORT nested-comments.sql` Before this PR, the output will be: ``` -- !query /* This is the first example of bracketed comment. SELECT 'ommented out content' AS first -- !query schema struct<> -- !query output org.apache.spark.sql.catalyst.parser.ParseException mismatched input '/' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', ' ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == /* This is the first example of bracketed comment. ^^^ SELECT 'ommented out content' AS first -- !query */ SELECT 'selected content' AS first -- !query schema struct<> -- !query output org.apache.spark.sql.catalyst.parser.ParseException extraneous input '*/' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == */ ^^^ SELECT 'selected content' AS first ``` After this PR, the output will be: ``` -- !query /* This is the first example of bracketed comment. SELECT 'ommented out content' AS first; */ SELECT 'selected content' AS first -- !query schema struct -- !query output selected content ``` ### Why are the changes needed? Golden files can't display the bracketed comments in imported test cases. ### Does this PR introduce any user-facing change? 'No'. ### How was this patch tested? New UT. Closes #28018 from beliefer/fix-bug-tests-imported-bracketed-comments. Authored-by: beliefer Signed-off-by: Takeshi Yamamuro --- .../src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 6c66166..848966a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -256,20 +256,23 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession { def splitWithSemicolon(seq: Seq[String]) = { seq.mkString("\n").split("(?<=[^]);") } -val input = fileToString(new File(testCase.inputFile)) -val (comments, code) = input.split("\n").partition { line => +def splitCommentsAndCodes(input: String) = input.split("\n").partition { line => val newLine = line.trim newLine.startsWith("--") && !newLine.startsWith("--QUERY-DELIMITER") } +val input = fileToString(new File(testCase.inputFile)) + +val (comments, code) = splitCommentsAndCodes(input) + // If `--IMPORT` found, load code from another test case file, then insert them // into the head in this test. val importedTestCaseName = comments.filter(_.startsWith("--IMPORT ")).map(_.substring(9)) val importedCode = importedTestCaseName.flatMap { testCaseName =>
[spark] branch master updated (d81df56 -> ee6f899)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from d81df56 [SPARK-31223][ML] Set seed in np.random to regenerate test data add ee6f899 [SPARK-30934][ML][FOLLOW-UP] Update ml-guide to include MulticlassClassificationEvaluator weight support in highlights No new revisions were added by this update. Summary of changes: docs/ml-guide.md | 8 1 file changed, 4 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31237][SQL][TESTS] Replace 3-letter time zones by zone offsets
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 6b568d7 [SPARK-31237][SQL][TESTS] Replace 3-letter time zones by zone offsets 6b568d7 is described below commit 6b568d77fc0284b0b8a26b16fd105a7c5f54f874 Author: Maxim Gekk AuthorDate: Thu Mar 26 13:36:00 2020 +0800 [SPARK-31237][SQL][TESTS] Replace 3-letter time zones by zone offsets In the PR, I propose to add a few `ZoneId` constant values to the `DateTimeTestUtils` object, and reuse the constants in tests. Proposed the following constants: - PST = -08:00 - UTC = +00:00 - CEST = +02:00 - CET = +01:00 - JST = +09:00 - MIT = -09:30 - LA = America/Los_Angeles All proposed constant values (except `LA`) are initialized by zone offsets according to their definitions. This will allow to avoid: - Using of 3-letter time zones that have been already deprecated in JDK, see _Three-letter time zone IDs_ in https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html - Incorrect mapping of 3-letter time zones to zone offsets, see SPARK-31237. For example, `PST` is mapped to `America/Los_Angeles` instead of the `-08:00` zone offset. Also this should improve stability and maintainability of test suites. No By running affected test suites. Closes #28001 from MaxGekk/replace-pst. Authored-by: Maxim Gekk Signed-off-by: Wenchen Fan (cherry picked from commit cec9604eaec2e5ff17e705ed60565bd7506c6374) Signed-off-by: Wenchen Fan --- .../sql/catalyst/csv/CSVInferSchemaSuite.scala | 28 ++-- .../sql/catalyst/csv/UnivocityParserSuite.scala| 45 +++--- .../spark/sql/catalyst/expressions/CastSuite.scala | 27 ++-- .../catalyst/expressions/CodeGenerationSuite.scala | 5 +- .../expressions/CollectionExpressionsSuite.scala | 3 +- .../catalyst/expressions/CsvExpressionsSuite.scala | 47 +++--- .../expressions/DateExpressionsSuite.scala | 177 ++-- .../expressions/JsonExpressionsSuite.scala | 77 + .../sql/catalyst/util/DateTimeTestUtils.scala | 17 +- .../sql/catalyst/util/DateTimeUtilsSuite.scala | 178 ++--- .../apache/spark/sql/util/ArrowUtilsSuite.scala| 3 +- .../spark/sql/util/TimestampFormatterSuite.scala | 11 +- .../apache/spark/sql/DataFrameFunctionsSuite.scala | 3 +- .../org/apache/spark/sql/DataFramePivotSuite.scala | 2 +- .../org/apache/spark/sql/DataFrameSuite.scala | 4 +- .../org/apache/spark/sql/DateFunctionsSuite.scala | 21 +-- .../sql/execution/datasources/csv/CSVSuite.scala | 4 +- .../sql/execution/datasources/json/JsonSuite.scala | 10 +- .../parquet/ParquetPartitionDiscoverySuite.scala | 10 +- .../apache/spark/sql/internal/SQLConfSuite.scala | 5 +- .../spark/sql/sources/PartitionedWriteSuite.scala | 6 +- .../sql/streaming/EventTimeWatermarkSuite.scala| 3 +- 22 files changed, 346 insertions(+), 340 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index ee73da3..b014eb9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types._ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { test("String fields types are inferred correctly from null types") { -val options = new CSVOptions(Map("timestampFormat" -> "-MM-dd HH:mm:ss"), false, "GMT") +val options = new CSVOptions(Map("timestampFormat" -> "-MM-dd HH:mm:ss"), false, "UTC") val inferSchema = new CSVInferSchema(options) assert(inferSchema.inferField(NullType, "") == NullType) @@ -48,7 +48,7 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { } test("String fields types are inferred correctly from other types") { -val options = new CSVOptions(Map("timestampFormat" -> "-MM-dd HH:mm:ss"), false, "GMT") +val options = new CSVOptions(Map("timestampFormat" -> "-MM-dd HH:mm:ss"), false, "UTC") val inferSchema = new CSVInferSchema(options) assert(inferSchema.inferField(LongType, "1.0") == DoubleType) @@ -69,18 +69,18 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { } test("Timestamp field types are inferred correctly via custom data format") { -var options = new CSVOptions(Map("timestampFormat" -> "-mm"), false, "GMT") +var options = new CSVOptions(Map("timestampFormat" -> "-mm"), false, "UTC") var inferSchema = new CSVInferSchema(options)
[spark] branch branch-3.0 updated: [SPARK-30934][ML][FOLLOW-UP] Update ml-guide to include MulticlassClassificationEvaluator weight support in highlights
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new a36e3c4 [SPARK-30934][ML][FOLLOW-UP] Update ml-guide to include MulticlassClassificationEvaluator weight support in highlights a36e3c4 is described below commit a36e3c4c898b513b90e58859e3ca8e550d5cb0cd Author: Huaxin Gao AuthorDate: Thu Mar 26 14:24:53 2020 +0800 [SPARK-30934][ML][FOLLOW-UP] Update ml-guide to include MulticlassClassificationEvaluator weight support in highlights ### What changes were proposed in this pull request? Update ml-guide to include ```MulticlassClassificationEvaluator``` weight support in highlights ### Why are the changes needed? ```MulticlassClassificationEvaluator``` weight support is very important, so should include it in highlights ### Does this PR introduce any user-facing change? Yes after: ![image](https://user-images.githubusercontent.com/13592258/77614952-6ccd8680-6eeb-11ea-9354-fa20004132df.png) ### How was this patch tested? manually build and check Closes #28031 from huaxingao/highlights-followup. Authored-by: Huaxin Gao Signed-off-by: zhengruifeng (cherry picked from commit ee6f8991a792e24a5b1c020d958877187af2f41b) Signed-off-by: zhengruifeng --- docs/ml-guide.md | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/ml-guide.md b/docs/ml-guide.md index 5ce6b4f..ddce98b 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -91,10 +91,10 @@ The list below highlights some of the new features and enhancements added to MLl release of Spark: * Multiple columns support was added to `Binarizer` ([SPARK-23578](https://issues.apache.org/jira/browse/SPARK-23578)), `StringIndexer` ([SPARK-11215](https://issues.apache.org/jira/browse/SPARK-11215)), `StopWordsRemover` ([SPARK-29808](https://issues.apache.org/jira/browse/SPARK-29808)) and PySpark `QuantileDiscretizer` ([SPARK-22796](https://issues.apache.org/jira/browse/SPARK-22796)). -* Support Tree-Based Feature Transformation was added +* Tree-Based Feature Transformation was added ([SPARK-13677](https://issues.apache.org/jira/browse/SPARK-13677)). * Two new evaluators `MultilabelClassificationEvaluator` ([SPARK-16692](https://issues.apache.org/jira/browse/SPARK-16692)) and `RankingEvaluator` ([SPARK-28045](https://issues.apache.org/jira/browse/SPARK-28045)) were added. -* Sample weights support was added in `DecisionTreeClassifier/Regressor` ([SPARK-19591](https://issues.apache.org/jira/browse/SPARK-19591)), `RandomForestClassifier/Regressor` ([SPARK-9478](https://issues.apache.org/jira/browse/SPARK-9478)), `GBTClassifier/Regressor` ([SPARK-9612](https://issues.apache.org/jira/browse/SPARK-9612)), `RegressionEvaluator` ([SPARK-24102](https://issues.apache.org/jira/browse/SPARK-24102)), `BinaryClassificationEvaluator` ([SPARK-24103](https://issues.apach [...] +* Sample weights support was added in `DecisionTreeClassifier/Regressor` ([SPARK-19591](https://issues.apache.org/jira/browse/SPARK-19591)), `RandomForestClassifier/Regressor` ([SPARK-9478](https://issues.apache.org/jira/browse/SPARK-9478)), `GBTClassifier/Regressor` ([SPARK-9612](https://issues.apache.org/jira/browse/SPARK-9612)), `MulticlassClassificationEvaluator` ([SPARK-24101](https://issues.apache.org/jira/browse/SPARK-24101)), `RegressionEvaluator` ([SPARK-24102](https://issues.a [...] * R API for `PowerIterationClustering` was added ([SPARK-19827](https://issues.apache.org/jira/browse/SPARK-19827)). * Added Spark ML listener for tracking ML pipeline status @@ -105,10 +105,10 @@ release of Spark: ([SPARK-28399](https://issues.apache.org/jira/browse/SPARK-28399)). * [`Factorization Machines`](ml-classification-regression.html#factorization-machines) classifier and regressor were added ([SPARK-29224](https://issues.apache.org/jira/browse/SPARK-29224)). -* Gaussian Naive Bayes ([SPARK-16872](https://issues.apache.org/jira/browse/SPARK-16872)) and Complement Naive Bayes ([SPARK-29942](https://issues.apache.org/jira/browse/SPARK-29942)) were added. +* Gaussian Naive Bayes Classifier ([SPARK-16872](https://issues.apache.org/jira/browse/SPARK-16872)) and Complement Naive Bayes Classifier ([SPARK-29942](https://issues.apache.org/jira/browse/SPARK-29942)) were added. * ML function parity between Scala and Python ([SPARK-28958](https://issues.apache.org/jira/browse/SPARK-28958)). -* `predictRaw` is made public in all the Classification models. `predictProbability` is made public in all the Classification models except `LinearSVCModel`. +* `predictRaw` is made public in all the Classification models. `predictProbability` is made public in all the Classification models except `LinearSVCModel`
[spark] branch master updated: [SPARK-31275][WEBUI] Improve the metrics format in ExecutionPage for StageId
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new bc37fdc [SPARK-31275][WEBUI] Improve the metrics format in ExecutionPage for StageId bc37fdc is described below commit bc37fdc77130ce4f60806db0bb2b1b8914452040 Author: Kousuke Saruta AuthorDate: Fri Mar 27 13:35:28 2020 +0800 [SPARK-31275][WEBUI] Improve the metrics format in ExecutionPage for StageId ### What changes were proposed in this pull request? In ExecutionPage, metrics format for stageId, attemptId and taskId are displayed like `(stageId (attemptId): taskId)` for now. I changed this format like `(stageId.attemptId taskId)`. ### Why are the changes needed? As cloud-fan suggested [here](https://github.com/apache/spark/pull/27927#discussion_r398591519), `stageId.attemptId` is more standard in Spark. ### Does this PR introduce any user-facing change? Yes. Before applying this change, we can see the UI like as follows. ![with-checked](https://user-images.githubusercontent.com/4736016/77682421-42a6c200-6fda-11ea-92e4-e9f4554adb71.png) And after this change applied, we can like as follows. ![fix-merics-format-with-checked](https://user-images.githubusercontent.com/4736016/77682493-61a55400-6fda-11ea-801f-91a67da698fd.png) ### How was this patch tested? Modified `SQLMetricsSuite` and manual test. Closes #28039 from sarutak/improve-metrics-format. Authored-by: Kousuke Saruta Signed-off-by: Wenchen Fan --- .../spark/sql/execution/ui/static/spark-sql-viz.js | 2 +- .../spark/sql/execution/metric/SQLMetrics.scala | 12 ++-- .../spark/sql/execution/ui/ExecutionPage.scala | 2 +- .../spark/sql/execution/metric/SQLMetricsSuite.scala | 20 ++-- .../sql/execution/metric/SQLMetricsTestUtils.scala | 18 +- 5 files changed, 27 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js index 0fb7dab..bb393d9 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js @@ -73,7 +73,7 @@ function setupTooltipForSparkPlanNode(nodeId) { // labelSeparator should be a non-graphical character in order not to affect the width of boxes. var labelSeparator = "\x01"; -var stageAndTaskMetricsPattern = "^(.*)(\\(stage.*attempt.*task[^)]*\\))(.*)$"; +var stageAndTaskMetricsPattern = "^(.*)(\\(stage.*task[^)]*\\))(.*)$"; /* * Helper function to pre-process the graph layout. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 65aabe0..1394e0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -116,7 +116,7 @@ object SQLMetrics { // data size total (min, med, max): // 100GB (100MB, 1GB, 10GB) val acc = new SQLMetric(SIZE_METRIC, -1) -acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"), +acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"), countFailedValues = false) acc } @@ -126,7 +126,7 @@ object SQLMetrics { // duration(min, med, max): // 5s (800ms, 1s, 2s) val acc = new SQLMetric(TIMING_METRIC, -1) -acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"), +acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"), countFailedValues = false) acc } @@ -134,7 +134,7 @@ object SQLMetrics { def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = { // Same with createTimingMetric, just normalize the unit of time to millisecond. val acc = new SQLMetric(NS_TIMING_METRIC, -1) -acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"), +acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"), countFailedValues = false) acc } @@ -150,7 +150,7 @@ object SQLMetrics { // probe avg (min, med, max): // (1.2, 2.2, 6.3) val acc = new SQLMetric(AVERAGE_METRIC) -acc.register(sc, name = Some(s"$name (min, med, max (stageId (attemptId): taskId))"), +acc.register(sc, name = Some(s"$name (min, med, max (stageId: taskId))"), countFailedValues = false) acc } @@ -169,11 +169,11 @@ object SQLMetrics { * and represent it
[spark] branch branch-3.0 updated: [SPARK-31186][PYSPARK][SQL] toPandas should not fail on duplicate column names
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 6fea291 [SPARK-31186][PYSPARK][SQL] toPandas should not fail on duplicate column names 6fea291 is described below commit 6fea291762af3e802cb4c237bdad51ebf5d7152c Author: Liang-Chi Hsieh AuthorDate: Fri Mar 27 12:10:30 2020 +0900 [SPARK-31186][PYSPARK][SQL] toPandas should not fail on duplicate column names ### What changes were proposed in this pull request? When `toPandas` API works on duplicate column names produced from operators like join, we see the error like: ``` ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all(). ``` This patch fixes the error in `toPandas` API. ### Why are the changes needed? To make `toPandas` work on dataframe with duplicate column names. ### Does this PR introduce any user-facing change? Yes. Previously calling `toPandas` API on a dataframe with duplicate column names will fail. After this patch, it will produce correct result. ### How was this patch tested? Unit test. Closes #28025 from viirya/SPARK-31186. Authored-by: Liang-Chi Hsieh Signed-off-by: HyukjinKwon (cherry picked from commit 559d3e4051500d5c49e9a7f3ac33aac3de19c9c6) Signed-off-by: HyukjinKwon --- python/pyspark/sql/pandas/conversion.py| 48 +++--- python/pyspark/sql/tests/test_dataframe.py | 18 +++ 2 files changed, 56 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 8548cd2..47cf8bb 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -21,6 +21,7 @@ if sys.version >= '3': xrange = range else: from itertools import izip as zip +from collections import Counter from pyspark import since from pyspark.rdd import _load_from_socket @@ -131,9 +132,16 @@ class PandasConversionMixin(object): # Below is toPandas without Arrow optimization. pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) +column_counter = Counter(self.columns) + +dtype = [None] * len(self.schema) +for fieldIdx, field in enumerate(self.schema): +# For duplicate column name, we use `iloc` to access it. +if column_counter[field.name] > 1: +pandas_col = pdf.iloc[:, fieldIdx] +else: +pandas_col = pdf[field.name] -dtype = {} -for field in self.schema: pandas_type = PandasConversionMixin._to_corrected_pandas_type(field.dataType) # SPARK-21766: if an integer field is nullable and has null values, it can be # inferred by pandas as float column. Once we convert the column with NaN back @@ -141,16 +149,36 @@ class PandasConversionMixin(object): # float type, not the corrected type from the schema in this case. if pandas_type is not None and \ not(isinstance(field.dataType, IntegralType) and field.nullable and -pdf[field.name].isnull().any()): -dtype[field.name] = pandas_type +pandas_col.isnull().any()): +dtype[fieldIdx] = pandas_type # Ensure we fall back to nullable numpy types, even when whole column is null: -if isinstance(field.dataType, IntegralType) and pdf[field.name].isnull().any(): -dtype[field.name] = np.float64 -if isinstance(field.dataType, BooleanType) and pdf[field.name].isnull().any(): -dtype[field.name] = np.object +if isinstance(field.dataType, IntegralType) and pandas_col.isnull().any(): +dtype[fieldIdx] = np.float64 +if isinstance(field.dataType, BooleanType) and pandas_col.isnull().any(): +dtype[fieldIdx] = np.object + +df = pd.DataFrame() +for index, t in enumerate(dtype): +column_name = self.schema[index].name + +# For duplicate column name, we use `iloc` to access it. +if column_counter[column_name] > 1: +series = pdf.iloc[:, index] +else: +series = pdf[column_name] + +if t is not None: +series = series.astype(t, copy=False) + +# `insert` API makes copy of data, we only do it for Series of duplicate column names. +# `pdf.iloc[:, index] = pdf.iloc[:, index]...` doesn't always work because `iloc` could +# return a view or a copy depending by context. +if column_counter[column_name] > 1: +
[spark] branch master updated (9e0fee9 -> 559d3e4)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 9e0fee9 [SPARK-31262][SQL][TESTS] Fix bug tests imported bracketed comments add 559d3e4 [SPARK-31186][PYSPARK][SQL] toPandas should not fail on duplicate column names No new revisions were added by this update. Summary of changes: python/pyspark/sql/pandas/conversion.py| 48 +++--- python/pyspark/sql/tests/test_dataframe.py | 18 +++ 2 files changed, 56 insertions(+), 10 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31275][WEBUI] Improve the metrics format in ExecutionPage for StageId
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new a7c58b1 [SPARK-31275][WEBUI] Improve the metrics format in ExecutionPage for StageId a7c58b1 is described below commit a7c58b1ae5a05f509a034ca410a50c41ce94cf5f Author: Kousuke Saruta AuthorDate: Fri Mar 27 13:35:28 2020 +0800 [SPARK-31275][WEBUI] Improve the metrics format in ExecutionPage for StageId ### What changes were proposed in this pull request? In ExecutionPage, metrics format for stageId, attemptId and taskId are displayed like `(stageId (attemptId): taskId)` for now. I changed this format like `(stageId.attemptId taskId)`. ### Why are the changes needed? As cloud-fan suggested [here](https://github.com/apache/spark/pull/27927#discussion_r398591519), `stageId.attemptId` is more standard in Spark. ### Does this PR introduce any user-facing change? Yes. Before applying this change, we can see the UI like as follows. ![with-checked](https://user-images.githubusercontent.com/4736016/77682421-42a6c200-6fda-11ea-92e4-e9f4554adb71.png) And after this change applied, we can like as follows. ![fix-merics-format-with-checked](https://user-images.githubusercontent.com/4736016/77682493-61a55400-6fda-11ea-801f-91a67da698fd.png) ### How was this patch tested? Modified `SQLMetricsSuite` and manual test. Closes #28039 from sarutak/improve-metrics-format. Authored-by: Kousuke Saruta Signed-off-by: Wenchen Fan (cherry picked from commit bc37fdc77130ce4f60806db0bb2b1b8914452040) Signed-off-by: Wenchen Fan --- .../spark/sql/execution/ui/static/spark-sql-viz.js | 2 +- .../spark/sql/execution/metric/SQLMetrics.scala | 12 ++-- .../spark/sql/execution/ui/ExecutionPage.scala | 2 +- .../spark/sql/execution/metric/SQLMetricsSuite.scala | 20 ++-- .../sql/execution/metric/SQLMetricsTestUtils.scala | 18 +- 5 files changed, 27 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js index b23ae9a..bded921 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js @@ -78,7 +78,7 @@ function setupTooltipForSparkPlanNode(nodeId) { // labelSeparator should be a non-graphical character in order not to affect the width of boxes. var labelSeparator = "\x01"; -var stageAndTaskMetricsPattern = "^(.*)(\\(stage.*attempt.*task[^)]*\\))(.*)$"; +var stageAndTaskMetricsPattern = "^(.*)(\\(stage.*task[^)]*\\))(.*)$"; /* * Helper function to pre-process the graph layout. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 65aabe0..1394e0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -116,7 +116,7 @@ object SQLMetrics { // data size total (min, med, max): // 100GB (100MB, 1GB, 10GB) val acc = new SQLMetric(SIZE_METRIC, -1) -acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"), +acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"), countFailedValues = false) acc } @@ -126,7 +126,7 @@ object SQLMetrics { // duration(min, med, max): // 5s (800ms, 1s, 2s) val acc = new SQLMetric(TIMING_METRIC, -1) -acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"), +acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"), countFailedValues = false) acc } @@ -134,7 +134,7 @@ object SQLMetrics { def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = { // Same with createTimingMetric, just normalize the unit of time to millisecond. val acc = new SQLMetric(NS_TIMING_METRIC, -1) -acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"), +acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"), countFailedValues = false) acc } @@ -150,7 +150,7 @@ object SQLMetrics { // probe avg (min, med, max): // (1.2, 2.2, 6.3) val acc = new SQLMetric(AVERAGE_METRIC) -acc.register(sc, name = Some(s"$name (min, med, max (stageId (attemptId): taskId))"), +acc.register(sc, name = Some(s"$name (min, med, max (stageId: taskId))"),
[spark] branch master updated: [SPARK-31204][SQL] HiveResult compatibility for DatasourceV2 command
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a97d3b9 [SPARK-31204][SQL] HiveResult compatibility for DatasourceV2 command a97d3b9 is described below commit a97d3b9f4f4ddd215ecaa7f96c64aeba6e825f74 Author: Terry Kim AuthorDate: Fri Mar 27 12:48:14 2020 +0800 [SPARK-31204][SQL] HiveResult compatibility for DatasourceV2 command ### What changes were proposed in this pull request? `HiveResult` performs some conversions for commands to be compatible with Hive output, e.g.: ``` // If it is a describe command for a Hive table, we want to have the output format be similar with Hive. case ExecutedCommandExec(_: DescribeCommandBase) => ... // SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp. case command ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended => ``` This conversion is needed for DatasourceV2 commands as well and this PR proposes to add the conversion for v2 commands `SHOW TABLES` and `DESCRIBE TABLE`. ### Why are the changes needed? This is a bug where conversion is not applied to v2 commands. ### Does this PR introduce any user-facing change? Yes, now the outputs for v2 commands `SHOW TABLES` and `DESCRIBE TABLE` are compatible with HIVE output. For example, with a table created as: ``` CREATE TABLE testcat.ns.tbl (id bigint COMMENT 'col1') USING foo ``` The output of `SHOW TABLES` has changed from ``` nstable ``` to ``` table ``` And the output of `DESCRIBE TABLE` has changed from ``` idbigintcol1 # Partitioning Not partitioned ``` to ``` id bigint col1 # Partitioning Not partitioned ``` ### How was this patch tested? Added unit tests. Closes #28004 from imback82/hive_result. Authored-by: Terry Kim Signed-off-by: Wenchen Fan --- .../apache/spark/sql/execution/HiveResult.scala| 29 +--- .../spark/sql/execution/HiveResultSuite.scala | 32 ++ 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala index ff820bf..21874bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala @@ -24,6 +24,7 @@ import java.time.{Instant, LocalDate} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.execution.command.{DescribeCommandBase, ExecutedCommandExec, ShowTablesCommand} +import org.apache.spark.sql.execution.datasources.v2.{DescribeTableExec, ShowTablesExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval @@ -38,18 +39,17 @@ object HiveResult { */ def hiveResultString(executedPlan: SparkPlan): Seq[String] = executedPlan match { case ExecutedCommandExec(_: DescribeCommandBase) => - // If it is a describe command for a Hive table, we want to have the output format - // be similar with Hive. - executedPlan.executeCollectPublic().map { -case Row(name: String, dataType: String, comment) => - Seq(name, dataType, -Option(comment.asInstanceOf[String]).getOrElse("")) -.map(s => String.format(s"%-20s", s)) -.mkString("\t") - } -// SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp. + formatDescribeTableOutput(executedPlan.executeCollectPublic()) +case _: DescribeTableExec => + formatDescribeTableOutput(executedPlan.executeCollectPublic()) +// SHOW TABLES in Hive only output table names while our v1 command outputs +// database, table name, isTemp. case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended => command.executeCollect().map(_.getString(1)) +// SHOW TABLES in Hive only output table names while our v2 command outputs +// namespace and table name. +case command : ShowTablesExec => + command.executeCollect().map(_.getString(1)) case other => val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq // We need the types so we can output struct field names @@ -59,6 +59,15 @@ object HiveResult { .map(_.mkString("\t")) } + private def formatDescribeTableOutput(rows: Array[Row]): Seq[String] = { +rows.map { +
[spark] branch branch-3.0 updated: [SPARK-31204][SQL] HiveResult compatibility for DatasourceV2 command
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new dde7e45 [SPARK-31204][SQL] HiveResult compatibility for DatasourceV2 command dde7e45 is described below commit dde7e457e8aed561dfdc5309952bbfc99ddfc1a6 Author: Terry Kim AuthorDate: Fri Mar 27 12:48:14 2020 +0800 [SPARK-31204][SQL] HiveResult compatibility for DatasourceV2 command ### What changes were proposed in this pull request? `HiveResult` performs some conversions for commands to be compatible with Hive output, e.g.: ``` // If it is a describe command for a Hive table, we want to have the output format be similar with Hive. case ExecutedCommandExec(_: DescribeCommandBase) => ... // SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp. case command ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended => ``` This conversion is needed for DatasourceV2 commands as well and this PR proposes to add the conversion for v2 commands `SHOW TABLES` and `DESCRIBE TABLE`. ### Why are the changes needed? This is a bug where conversion is not applied to v2 commands. ### Does this PR introduce any user-facing change? Yes, now the outputs for v2 commands `SHOW TABLES` and `DESCRIBE TABLE` are compatible with HIVE output. For example, with a table created as: ``` CREATE TABLE testcat.ns.tbl (id bigint COMMENT 'col1') USING foo ``` The output of `SHOW TABLES` has changed from ``` nstable ``` to ``` table ``` And the output of `DESCRIBE TABLE` has changed from ``` idbigintcol1 # Partitioning Not partitioned ``` to ``` id bigint col1 # Partitioning Not partitioned ``` ### How was this patch tested? Added unit tests. Closes #28004 from imback82/hive_result. Authored-by: Terry Kim Signed-off-by: Wenchen Fan (cherry picked from commit a97d3b9f4f4ddd215ecaa7f96c64aeba6e825f74) Signed-off-by: Wenchen Fan --- .../apache/spark/sql/execution/HiveResult.scala| 29 +--- .../spark/sql/execution/HiveResultSuite.scala | 32 ++ 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala index ff820bf..21874bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala @@ -24,6 +24,7 @@ import java.time.{Instant, LocalDate} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.execution.command.{DescribeCommandBase, ExecutedCommandExec, ShowTablesCommand} +import org.apache.spark.sql.execution.datasources.v2.{DescribeTableExec, ShowTablesExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval @@ -38,18 +39,17 @@ object HiveResult { */ def hiveResultString(executedPlan: SparkPlan): Seq[String] = executedPlan match { case ExecutedCommandExec(_: DescribeCommandBase) => - // If it is a describe command for a Hive table, we want to have the output format - // be similar with Hive. - executedPlan.executeCollectPublic().map { -case Row(name: String, dataType: String, comment) => - Seq(name, dataType, -Option(comment.asInstanceOf[String]).getOrElse("")) -.map(s => String.format(s"%-20s", s)) -.mkString("\t") - } -// SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp. + formatDescribeTableOutput(executedPlan.executeCollectPublic()) +case _: DescribeTableExec => + formatDescribeTableOutput(executedPlan.executeCollectPublic()) +// SHOW TABLES in Hive only output table names while our v1 command outputs +// database, table name, isTemp. case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended => command.executeCollect().map(_.getString(1)) +// SHOW TABLES in Hive only output table names while our v2 command outputs +// namespace and table name. +case command : ShowTablesExec => + command.executeCollect().map(_.getString(1)) case other => val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq // We need the types so we can output struct field names @@ -59,6 +59,15 @@ object HiveResult {
[spark] branch branch-3.0 updated: [SPARK-31170][SQL] Spark SQL Cli should respect hive-site.xml and spark.sql.warehouse.dir
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 5f5ee4d [SPARK-31170][SQL] Spark SQL Cli should respect hive-site.xml and spark.sql.warehouse.dir 5f5ee4d is described below commit 5f5ee4d84acc933112c52b1818a865139c2af05a Author: Kent Yao AuthorDate: Fri Mar 27 12:05:45 2020 +0800 [SPARK-31170][SQL] Spark SQL Cli should respect hive-site.xml and spark.sql.warehouse.dir ### What changes were proposed in this pull request? In Spark CLI, we create a hive `CliSessionState` and it does not load the `hive-site.xml`. So the configurations in `hive-site.xml` will not take effects like other spark-hive integration apps. Also, the warehouse directory is not correctly picked. If the `default` database does not exist, the `CliSessionState` will create one during the first time it talks to the metastore. The `Location` of the default DB will be neither the value of `spark.sql.warehousr.dir` nor the user-specified value of `hive.metastore.warehourse.dir`, but the default value of `hive.metastore.warehourse.dir `which will always be `/user/hive/warehouse`. This PR fixes CLiSuite failure with the hive-1.2 profile in https://github.com/apache/spark/pull/27933. In https://github.com/apache/spark/pull/27933, we fix the issue in JIRA by deciding the warehouse dir using all properties from spark conf and Hadoop conf, but properties from `--hiveconf` is not included, they will be applied to the `CliSessionState` instance after it initialized. When this command-line option key is `hive.metastore.warehouse.dir`, the actual warehouse dir is overridden. Because of the logic in Hive for creating the non-existing default database changed, that test p [...] ` spark.hive.xxx > spark.hadoop.xxx > --hiveconf xxx > hive-site.xml` througth `ShareState.loadHiveConfFile` before sessionState start ### Why are the changes needed? Bugfix for Spark SQL CLI to pick right confs ### Does this PR introduce any user-facing change? yes, 1. the non-exists default database will be created in the location specified by the users via `spark.sql.warehouse.dir` or `hive.metastore.warehouse.dir`, or the default value of `spark.sql.warehouse.dir` if none of them specified. 2. configurations from `hive-site.xml` will not override command-line options or the properties defined with `spark.hadoo(hive).` prefix in spark conf. ### How was this patch tested? add cli ut Closes #27969 from yaooqinn/SPARK-31170-2. Authored-by: Kent Yao Signed-off-by: Wenchen Fan (cherry picked from commit 8be16907c261657f83f5d5934bcd978d8dacf7ff) Signed-off-by: Wenchen Fan --- .../apache/spark/sql/internal/SharedState.scala| 87 +++- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 9 +- .../src/test/noclasspath/hive-site.xml | 30 +++ .../spark/sql/hive/thriftserver/CliSuite.scala | 96 +- .../spark/sql/hive/HiveSharedStateSuite.scala | 1 - .../spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +- 6 files changed, 159 insertions(+), 66 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 5347264..14b8ea6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -22,6 +22,7 @@ import java.util.UUID import java.util.concurrent.ConcurrentHashMap import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -41,7 +42,6 @@ import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, Streamin import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.Utils - /** * A class that holds all state shared across sessions in a given [[SQLContext]]. * @@ -55,45 +55,10 @@ private[sql] class SharedState( SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf) - // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on - // the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf. - val warehousePath: String = { -val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") -if (configFile != null) { - logInfo(s"loading hive config file: $configFile") - sparkContext.hadoopConfiguration.addResource(configFile) -} - -// hive.metastore.warehouse.dir only stay in hadoopConf -sparkContext.conf.remove("hive.metastore.warehouse.dir") -
[spark] branch master updated (559d3e4 -> 8be1690)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 559d3e4 [SPARK-31186][PYSPARK][SQL] toPandas should not fail on duplicate column names add 8be1690 [SPARK-31170][SQL] Spark SQL Cli should respect hive-site.xml and spark.sql.warehouse.dir No new revisions were added by this update. Summary of changes: .../apache/spark/sql/internal/SharedState.scala| 87 +++- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 9 +- .../src/test/noclasspath}/hive-site.xml| 4 + .../spark/sql/hive/thriftserver/CliSuite.scala | 96 +- .../spark/sql/hive/HiveSharedStateSuite.scala | 1 - .../spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +- 6 files changed, 133 insertions(+), 66 deletions(-) copy sql/{core/src/test/resources => hive-thriftserver/src/test/noclasspath}/hive-site.xml (90%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 78cc2ef [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource 78cc2ef is described below commit 78cc2ef5b663d6d605e3d4febc6fb99e20b7f165 Author: Maxim Gekk AuthorDate: Thu Mar 26 13:14:28 2020 -0700 [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource ### What changes were proposed in this pull request? This PR (SPARK-31238) aims the followings. 1. Modified ORC Vectorized Reader, in particular, OrcColumnVector v1.2 and v2.3. After the changes, it uses `DateTimeUtils. rebaseJulianToGregorianDays()` added by https://github.com/apache/spark/pull/27915 . The method performs rebasing days from the hybrid calendar (Julian + Gregorian) to Proleptic Gregorian calendar. It builds a local date in the original calendar, extracts date fields `year`, `month` and `day` from the local date, and builds another local date in the target calend [...] 2. Introduced rebasing dates while saving ORC files, in particular, I modified `OrcShimUtils. getDateWritable` v1.2 and v2.3, and returned `DaysWritable` instead of Hive's `DateWritable`. The `DaysWritable` class was added by the PR https://github.com/apache/spark/pull/27890 (and fixed by https://github.com/apache/spark/pull/27962). I moved `DaysWritable` from `sql/hive` to `sql/core` to re-use it in ORC datasource. ### Why are the changes needed? For the backward compatibility with Spark 2.4 and earlier versions. The changes allow users to read dates/timestamps saved by previous version, and get the same result. ### Does this PR introduce any user-facing change? Yes. Before the changes, loading the date `1200-01-01` saved by Spark 2.4.5 returns the following: ```scala scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false) +--+ |dt| +--+ |1200-01-08| +--+ ``` After the changes ```scala scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false) +--+ |dt| +--+ |1200-01-01| +--+ ``` ### How was this patch tested? - By running `OrcSourceSuite` and `HiveOrcSourceSuite`. - Add new test `SPARK-31238: compatibility with Spark 2.4 in reading dates` to `OrcSuite` which reads an ORC file saved by Spark 2.4.5 via the commands: ```shell $ export TZ="America/Los_Angeles" ``` ```scala scala> sql("select cast('1200-01-01' as date) dt").write.mode("overwrite").orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc") scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false) +--+ |dt| +--+ |1200-01-01| +--+ ``` - Add round trip test `SPARK-31238: rebasing dates in write`. The test `SPARK-31238: compatibility with Spark 2.4 in reading dates` confirms rebasing in read. So, we can check rebasing in write. Closes #28016 from MaxGekk/rebase-date-orc. Authored-by: Maxim Gekk Signed-off-by: Dongjoon Hyun (cherry picked from commit d72ec8574113f9a7e87f3d7ec56c8447267b0506) Signed-off-by: Dongjoon Hyun --- .../sql/execution/datasources}/DaysWritable.scala | 10 ++-- .../test-data/before_1582_date_v2_4.snappy.orc | Bin 0 -> 201 bytes .../execution/datasources/orc/OrcSourceSuite.scala | 28 - .../sql/execution/datasources/orc/OrcTest.scala| 5 .../execution/datasources/orc/OrcColumnVector.java | 15 ++- .../execution/datasources/orc}/DaysWritable.scala | 17 ++--- .../execution/datasources/orc/OrcShimUtils.scala | 4 +-- .../execution/datasources/orc/OrcColumnVector.java | 15 ++- .../execution/datasources/orc/OrcShimUtils.scala | 5 ++-- .../org/apache/spark/sql/hive/HiveInspectors.scala | 1 + 10 files changed, 88 insertions(+), 12 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala similarity index 92% copy from sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala copy to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala index 1eec8d7..00b710f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.hive +package org.apache.spark.sql.execution.datasources import java.io.{DataInput, DataOutput,
[spark] branch master updated (33f532a -> d72ec85)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 33f532a [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator add d72ec85 [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource No new revisions were added by this update. Summary of changes: .../sql/execution/datasources}/DaysWritable.scala | 10 ++-- .../test-data/before_1582_date_v2_4.snappy.orc | Bin 0 -> 201 bytes .../execution/datasources/orc/OrcSourceSuite.scala | 28 - .../sql/execution/datasources/orc/OrcTest.scala| 5 .../execution/datasources/orc/OrcColumnVector.java | 15 ++- .../execution/datasources/orc}/DaysWritable.scala | 17 ++--- .../execution/datasources/orc/OrcShimUtils.scala | 4 +-- .../execution/datasources/orc/OrcColumnVector.java | 15 ++- .../execution/datasources/orc/OrcShimUtils.scala | 5 ++-- .../org/apache/spark/sql/hive/HiveInspectors.scala | 1 + 10 files changed, 88 insertions(+), 12 deletions(-) copy sql/{hive/src/main/scala/org/apache/spark/sql/hive => core/src/main/scala/org/apache/spark/sql/execution/datasources}/DaysWritable.scala (92%) create mode 100644 sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.orc rename sql/{hive/src/main/scala/org/apache/spark/sql/hive => core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc}/DaysWritable.scala (81%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: Revert "[SPARK-31258][BUILD] Pin the avro version in SBT"
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 4217f75 Revert "[SPARK-31258][BUILD] Pin the avro version in SBT" 4217f75 is described below commit 4217f75b3f05f323018a3a9986ecb9ae587688a8 Author: Dongjoon Hyun AuthorDate: Thu Mar 26 13:49:39 2020 -0700 Revert "[SPARK-31258][BUILD] Pin the avro version in SBT" This reverts commit 916a25a46bca7196416372bacc3fc260a6ef658f. --- project/SparkBuild.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 4578857..7ee079c 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -554,8 +554,7 @@ object DependencyOverrides { dependencyOverrides += "com.google.guava" % "guava" % "14.0.1", dependencyOverrides += "commons-io" % "commons-io" % "2.4", dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7.3", -dependencyOverrides += "jline" % "jline" % "2.14.6", -dependencyOverrides += "org.apache.avro" % "avro" % "1.8.2") +dependencyOverrides += "jline" % "jline" % "2.14.6") } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31234][SQL] ResetCommand should reset config to sc.conf only
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 557623b [SPARK-31234][SQL] ResetCommand should reset config to sc.conf only 557623b is described below commit 557623b5a1ebd50b0f604843628b9d0ed8ff19ef Author: Kent Yao AuthorDate: Thu Mar 26 15:03:16 2020 +0800 [SPARK-31234][SQL] ResetCommand should reset config to sc.conf only ### What changes were proposed in this pull request? Currently, ResetCommand clear all configurations, including sql configs, static sql configs and spark context level configs. for example: ```sql spark-sql> set xyz=abc; xyz abc spark-sql> set; spark.app.id local-1585055396930 spark.app.name SparkSQL::10.242.189.214 spark.driver.host 10.242.189.214 spark.driver.port 65094 spark.executor.id driver spark.jars spark.master local[*] spark.sql.catalogImplementation hive spark.sql.hive.version 1.2.1 spark.submit.deployMode client xyz abc spark-sql> reset; spark-sql> set; spark-sql> set spark.sql.hive.version; spark.sql.hive.version 1.2.1 spark-sql> set spark.app.id; spark.app.id ``` In this PR, we restore spark confs to RuntimeConfig after it is cleared ### Why are the changes needed? reset command overkills configs which are static. ### Does this PR introduce any user-facing change? yes, the ResetCommand do not change static configs now ### How was this patch tested? add ut Closes #28003 from yaooqinn/SPARK-31234. Authored-by: Kent Yao Signed-off-by: Wenchen Fan (cherry picked from commit 44bd36ad7b315f4c7592cdc1edf04356fcd23645) Signed-off-by: Wenchen Fan --- docs/sql-ref-syntax-aux-conf-mgmt-reset.md| 4 ++-- .../apache/spark/sql/execution/command/SetCommand.scala | 9 +++-- .../org/apache/spark/sql/internal/SQLConfSuite.scala | 15 +++ 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/docs/sql-ref-syntax-aux-conf-mgmt-reset.md b/docs/sql-ref-syntax-aux-conf-mgmt-reset.md index 8ee6151..5ebc7b9 100644 --- a/docs/sql-ref-syntax-aux-conf-mgmt-reset.md +++ b/docs/sql-ref-syntax-aux-conf-mgmt-reset.md @@ -20,7 +20,7 @@ license: | --- ### Description -Reset all the properties specific to the current session to their default values. After RESET command, executing SET command will output empty. +Reset any runtime configurations specific to the current session which were set via the [SET](sql-ref-syntax-aux-conf-mgmt-set.html) command to their default values. ### Syntax {% highlight sql %} @@ -30,7 +30,7 @@ RESET ### Examples {% highlight sql %} --- Reset all the properties specific to the current session to their default values. +-- Reset any runtime configurations specific to the current session which were set via the SET command to their default values. RESET; {% endhighlight %} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index 39b08e2..a12b261 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -157,7 +157,8 @@ object SetCommand { } /** - * This command is for resetting SQLConf to the default values. Command that runs + * This command is for resetting SQLConf to the default values. Any configurations that were set + * via [[SetCommand]] will get reset to default value. Command that runs * {{{ * reset; * }}} @@ -165,7 +166,11 @@ object SetCommand { case object ResetCommand extends RunnableCommand with IgnoreCachedData { override def run(sparkSession: SparkSession): Seq[Row] = { -sparkSession.sessionState.conf.clear() +val conf = sparkSession.sessionState.conf +conf.clear() +sparkSession.sparkContext.conf.getAll.foreach { case (k, v) => + conf.setConfString(k, v) +} Seq.empty[Row] } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index cde2aa7..c2d8493d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -116,6 +116,21 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { } } + test("reset will not change static sql configs and spark core configs") { +val conf = spark.sparkContext.getConf.getAll.toMap +val appName = conf.get("spark.app.name") +val driverHost = conf.get("spark.driver.host") +val master = conf.get("spark.master") +
[spark] branch master updated (ee6f899 -> 44bd36a)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from ee6f899 [SPARK-30934][ML][FOLLOW-UP] Update ml-guide to include MulticlassClassificationEvaluator weight support in highlights add 44bd36a [SPARK-31234][SQL] ResetCommand should reset config to sc.conf only No new revisions were added by this update. Summary of changes: docs/sql-ref-syntax-aux-conf-mgmt-reset.md| 4 ++-- .../apache/spark/sql/execution/command/SetCommand.scala | 9 +++-- .../org/apache/spark/sql/internal/SQLConfSuite.scala | 15 +++ 3 files changed, 24 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31254][SQL] Use the current session time zone in `HiveResult.toHiveString`
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new b7810dd [SPARK-31254][SQL] Use the current session time zone in `HiveResult.toHiveString` b7810dd is described below commit b7810dd6c6a67dfda307763af8ca13ce4a6f1f99 Author: Maxim Gekk AuthorDate: Thu Mar 26 17:45:29 2020 +0800 [SPARK-31254][SQL] Use the current session time zone in `HiveResult.toHiveString` ### What changes were proposed in this pull request? In the PR, I propose to define `timestampFormatter`, `dateFormatter` and `zoneId` as methods of the `HiveResult` object. This should guarantee that the formatters pick the current session time zone in `toHiveString()` ### Why are the changes needed? Currently, date/timestamp formatters in `HiveResult.toHiveString` are initialized once on instantiation of the `HiveResult` object, and pick up the session time zone. If the sessions time zone is changed, the formatters still use the previous one. ### Does this PR introduce any user-facing change? Yes ### How was this patch tested? By existing test suites, in particular, by `HiveResultSuite` Closes #28024 from MaxGekk/hive-result-datetime-formatters. Authored-by: Maxim Gekk Signed-off-by: Wenchen Fan (cherry picked from commit 600319dcb977827028327f9395644ae3d492e0fe) Signed-off-by: Wenchen Fan --- .../src/main/scala/org/apache/spark/sql/execution/HiveResult.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala index 5a2f16d..ff820bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala @@ -59,9 +59,9 @@ object HiveResult { .map(_.mkString("\t")) } - private lazy val zoneId = DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone) - private lazy val dateFormatter = DateFormatter(zoneId) - private lazy val timestampFormatter = TimestampFormatter.getFractionFormatter(zoneId) + private def zoneId = DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone) + private def dateFormatter = DateFormatter(zoneId) + private def timestampFormatter = TimestampFormatter.getFractionFormatter(zoneId) /** Formats a datum (based on the given data type) and returns the string representation. */ def toHiveString(a: (Any, DataType), nested: Boolean = false): String = a match { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (44bd36a -> 3bd10ce)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 44bd36a [SPARK-31234][SQL] ResetCommand should reset config to sc.conf only add 3bd10ce [SPARK-31227][SQL] Non-nullable null type in complex types should not coerce to nullable type No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/analysis/TypeCoercion.scala | 2 +- .../spark/sql/catalyst/expressions/Cast.scala | 17 +++ .../sql/catalyst/analysis/TypeCoercionSuite.scala | 54 ++ .../spark/sql/catalyst/expressions/CastSuite.scala | 22 + .../apache/spark/sql/DataFrameFunctionsSuite.scala | 7 +++ 5 files changed, 73 insertions(+), 29 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31227][SQL] Non-nullable null type in complex types should not coerce to nullable type
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new bd94cf7 [SPARK-31227][SQL] Non-nullable null type in complex types should not coerce to nullable type bd94cf7 is described below commit bd94cf7bcccd305cac1f301d2cefb043afba35f6 Author: HyukjinKwon AuthorDate: Thu Mar 26 15:42:54 2020 +0800 [SPARK-31227][SQL] Non-nullable null type in complex types should not coerce to nullable type ### What changes were proposed in this pull request? This PR targets for non-nullable null type not to coerce to nullable type in complex types. Non-nullable fields in struct, elements in an array and entries in map can mean empty array, struct and map. They are empty so it does not need to force the nullability when we find common types. This PR also reverts and supersedes https://github.com/apache/spark/commit/d7b97a1d0daf65710317321490a833f696a46f21 ### Why are the changes needed? To make type coercion coherent and consistent. Currently, we correctly keep the nullability even between non-nullable fields: ```scala import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ spark.range(1).select(array(lit(1)).cast(ArrayType(IntegerType, false))).printSchema() spark.range(1).select(array(lit(1)).cast(ArrayType(DoubleType, false))).printSchema() ``` ```scala spark.range(1).selectExpr("concat(array(1), array(1)) as arr").printSchema() ``` ### Does this PR introduce any user-facing change? Yes. ```scala import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ spark.range(1).select(array().cast(ArrayType(IntegerType, false))).printSchema() ``` ```scala spark.range(1).selectExpr("concat(array(), array(1)) as arr").printSchema() ``` **Before:** ``` org.apache.spark.sql.AnalysisException: cannot resolve 'array()' due to data type mismatch: cannot cast array to array;; 'Project [cast(array() as array) AS array()#68] +- Range (0, 1, step=1, splits=Some(12)) at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:149) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:140) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:333) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:333) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237) ``` ``` root |-- arr: array (nullable = false) ||-- element: integer (containsNull = true) ``` **After:** ``` root |-- array(): array (nullable = false) ||-- element: integer (containsNull = false) ``` ``` root |-- arr: array (nullable = false) ||-- element: integer (containsNull = false) ``` ### How was this patch tested? Unittests were added and manually tested. Closes #27991 from HyukjinKwon/SPARK-31227. Authored-by: HyukjinKwon Signed-off-by: Wenchen Fan (cherry picked from commit 3bd10ce007832522e38583592b6f358e185cdb7d) Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/analysis/TypeCoercion.scala | 2 +- .../spark/sql/catalyst/expressions/Cast.scala | 17 +++ .../sql/catalyst/analysis/TypeCoercionSuite.scala | 54 ++ .../spark/sql/catalyst/expressions/CastSuite.scala | 22 + .../apache/spark/sql/DataFrameFunctionsSuite.scala | 7 +++ 5 files changed, 73 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 5a5d7c6..eb9a4d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -160,7 +160,7 @@ object TypeCoercion { } case (MapType(kt1, vt1, valueContainsNull1), MapType(kt2, vt2, valueContainsNull2)) => findTypeFunc(kt1, kt2) -.filter { kt =>
[spark] branch master updated (3bd10ce -> 600319d)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 3bd10ce [SPARK-31227][SQL] Non-nullable null type in complex types should not coerce to nullable type add 600319d [SPARK-31254][SQL] Use the current session time zone in `HiveResult.toHiveString` No new revisions were added by this update. Summary of changes: .../src/main/scala/org/apache/spark/sql/execution/HiveResult.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (600319d -> 8b798c1)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 600319d [SPARK-31254][SQL] Use the current session time zone in `HiveResult.toHiveString` add 8b798c1 [SPARK-31242][SQL][TEST] mergeSparkConf in WithTestConf should also respect spark.sql.legacy.sessionInitWithConfigDefaults No new revisions were added by this update. Summary of changes: .../spark/sql/internal/BaseSessionStateBuilder.scala | 14 ++ .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala| 10 ++ 2 files changed, 20 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31242][SQL][TEST] mergeSparkConf in WithTestConf should also respect spark.sql.legacy.sessionInitWithConfigDefaults
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 7b1fba2 [SPARK-31242][SQL][TEST] mergeSparkConf in WithTestConf should also respect spark.sql.legacy.sessionInitWithConfigDefaults 7b1fba2 is described below commit 7b1fba2978e6e1fb7eacf693be2a5ad8b9a859b2 Author: yi.wu AuthorDate: Thu Mar 26 18:52:56 2020 +0800 [SPARK-31242][SQL][TEST] mergeSparkConf in WithTestConf should also respect spark.sql.legacy.sessionInitWithConfigDefaults ### What changes were proposed in this pull request? Make `mergeSparkConf` in `WithTestConf` respects `spark.sql.legacy.sessionInitWithConfigDefaults`. ### Why are the changes needed? Without the fix, conf specified by `withSQLConf` can be reverted to original value in a cloned SparkSession. For example, you will fail test below without the fix: ``` withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "true") { val cloned = spark.cloneSession() SparkSession.setActiveSession(cloned) assert(SQLConf.get.getConf(SQLConf.CODEGEN_FALLBACK) === true) } ``` So we should fix it just as #24540 did before. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added tests. Closes #28014 from Ngone51/sparksession_clone. Authored-by: yi.wu Signed-off-by: Wenchen Fan (cherry picked from commit 8b798c1bc501f0d2b1c2c80ab64ffd764bc72987) Signed-off-by: Wenchen Fan --- .../spark/sql/internal/BaseSessionStateBuilder.scala | 14 ++ .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala| 10 ++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 9556d4d..3bbdbb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -347,8 +347,14 @@ private[sql] trait WithTestConf { self: BaseSessionStateBuilder => override protected lazy val conf: SQLConf = { val overrideConfigurations = overrideConfs -val conf = parentState.map(_.conf.clone()).getOrElse { - new SQLConf { +parentState.map { s => + val cloned = s.conf.clone() + if (session.sparkContext.conf.get(StaticSQLConf.SQL_LEGACY_SESSION_INIT_WITH_DEFAULTS)) { +mergeSparkConf(conf, session.sparkContext.conf) + } + cloned +}.getOrElse { + val conf = new SQLConf { clear() override def clear(): Unit = { super.clear() @@ -356,8 +362,8 @@ private[sql] trait WithTestConf { self: BaseSessionStateBuilder => overrideConfigurations.foreach { case (key, value) => setConfString(key, value) } } } + mergeSparkConf(conf, session.sparkContext.conf) + conf } -mergeSparkConf(conf, session.sparkContext.conf) -conf } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index feb1450..42f0d96 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3462,6 +3462,16 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark sql("(SELECT map()) UNION ALL (SELECT map(1, 2))"), Seq(Row(Map[Int, Int]()), Row(Map(1 -> 2 } + + test("SPARK-31242: clone SparkSession should respect sessionInitWithConfigDefaults") { +// Note, only the conf explicitly set in SparkConf(e.g. in SharedSparkSessionBase) would cause +// problem before the fix. +withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "true") { + val cloned = spark.cloneSession() + SparkSession.setActiveSession(cloned) + assert(SQLConf.get.getConf(SQLConf.CODEGEN_FALLBACK) === true) +} + } } case class Foo(bar: Option[String]) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-31228][DSTREAMS] Add version information to the configuration of Kafka
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 35d286b [SPARK-31228][DSTREAMS] Add version information to the configuration of Kafka 35d286b is described below commit 35d286bafb248a47a1125908c0208cd759dd0416 Author: beliefer AuthorDate: Thu Mar 26 20:11:15 2020 +0900 [SPARK-31228][DSTREAMS] Add version information to the configuration of Kafka ### What changes were proposed in this pull request? Add version information to the configuration of Kafka. I sorted out some information show below. Item name | Since version | JIRA ID | Commit ID | Note -- | -- | -- | -- | -- spark.streaming.kafka.consumer.cache.enabled | 2.2.1 | SPARK-19185 | 02cf178bb2a7dc8b4c06eb040c44b6453e41ed15#diff-c465bbcc83b2ecc7530d1c0128e4432b | spark.streaming.kafka.consumer.poll.ms | 2.0.1 | SPARK-12177 | 3134f116a3565c3a299fa2e7094acd7304d64280#diff-4597d93a0e951f7199697dba7dd0dc32 | spark.streaming.kafka.consumer.cache.initialCapacity | 2.0.1 | SPARK-12177 | 3134f116a3565c3a299fa2e7094acd7304d64280#diff-4597d93a0e951f7199697dba7dd0dc32 | spark.streaming.kafka.consumer.cache.maxCapacity | 2.0.1 | SPARK-12177 | 3134f116a3565c3a299fa2e7094acd7304d64280#diff-4597d93a0e951f7199697dba7dd0dc32 | spark.streaming.kafka.consumer.cache.loadFactor | 2.0.1 | SPARK-12177 | 3134f116a3565c3a299fa2e7094acd7304d64280#diff-4597d93a0e951f7199697dba7dd0dc32 | spark.streaming.kafka.maxRatePerPartition | 1.3.0 | SPARK-4964 | a119cae48030520da9f26ee9a1270bed7f33031e#diff-26cb4369f86050dc2e75cd16291b2844 | spark.streaming.kafka.minRatePerPartition | 2.4.0 | SPARK-25233 | 135ff16a3510a4dfb3470904004dae9848005019#diff-815f6ec5caf9e4beb355f5f981171f1f | spark.streaming.kafka.allowNonConsecutiveOffsets | 2.3.1 | SPARK-24067 | 1d598b771de3b588a2f377ae7ccf8193156641f2#diff-4597d93a0e951f7199697dba7dd0dc32 | spark.kafka.producer.cache.timeout | 2.2.1 | SPARK-19968 | f6730a70cb47ebb3df7f42209df7b076aece1093#diff-ac8844e8d791a75aaee3d0d10bfc1f2a | spark.kafka.producer.cache.evictorThreadRunInterval | 3.0.0 | SPARK-21869 | 7bff2db9ed803e05a43c2d875c1dea819d81248a#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd | spark.kafka.consumer.cache.capacity | 3.0.0 | SPARK-27687 | efa303581ac61d6f517aacd08883da2d01530bd2#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd | spark.kafka.consumer.cache.jmx.enable | 3.0.0 | SPARK-25151 | 594c9c5a3ece0e913949c7160bb4925e5d289e44#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd | spark.kafka.consumer.cache.timeout | 3.0.0 | SPARK-25151 | 594c9c5a3ece0e913949c7160bb4925e5d289e44#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd | spark.kafka.consumer.cache.evictorThreadRunInterval | 3.0.0 | SPARK-25151 | 594c9c5a3ece0e913949c7160bb4925e5d289e44#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd | spark.kafka.consumer.fetchedData.cache.timeout | 3.0.0 | SPARK-25151 | 594c9c5a3ece0e913949c7160bb4925e5d289e44#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd | spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval | 3.0.0 | SPARK-25151 | 594c9c5a3ece0e913949c7160bb4925e5d289e44#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd | spark.kafka.clusters.${cluster}.auth.bootstrap.servers | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 | spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 | spark.kafka.clusters.${cluster}.security.protocol | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 | spark.kafka.clusters.${cluster}.sasl.kerberos.service.name | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 | spark.kafka.clusters.${cluster}.ssl.truststore.location | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 | spark.kafka.clusters.${cluster}.ssl.truststore.password | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 | spark.kafka.clusters.${cluster}.ssl.keystore.location | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 | spark.kafka.clusters.${cluster}.ssl.keystore.password | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 | spark.kafka.clusters.${cluster}.ssl.key.password | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 | spark.kafka.clusters.${cluster}.sasl.token.mechanism | 3.0.0 | SPARK-27294 |
[spark] branch branch-3.0 updated: [SPARK-31201][SQL] Add an individual config for skewed partition threshold
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 8d0800a [SPARK-31201][SQL] Add an individual config for skewed partition threshold 8d0800a is described below commit 8d0800a0803d3c47938bddefa15328d654739bc5 Author: Wenchen Fan AuthorDate: Thu Mar 26 22:57:01 2020 +0900 [SPARK-31201][SQL] Add an individual config for skewed partition threshold Skew join handling comes with an overhead: we need to read some data repeatedly. We should treat a partition as skewed if it's large enough so that it's beneficial to do so. Currently the size threshold is the advisory partition size, which is 64 MB by default. This is not large enough for the skewed partition size threshold. This PR adds a new config for the threshold and set default value as 256 MB. Avoid skew join handling that may introduce a perf regression. no existing tests Closes #27967 from cloud-fan/aqe. Authored-by: Wenchen Fan Signed-off-by: HyukjinKwon (cherry picked from commit 05498af72e19b058b210815e1053f3fa9b0157d9) Signed-off-by: HyukjinKwon --- docs/sql-performance-tuning.md| 9 - .../main/scala/org/apache/spark/sql/internal/SQLConf.scala| 11 ++- .../spark/sql/execution/adaptive/OptimizeSkewedJoin.scala | 2 +- .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 2 +- 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/docs/sql-performance-tuning.md b/docs/sql-performance-tuning.md index 489575d..9a1cc89 100644 --- a/docs/sql-performance-tuning.md +++ b/docs/sql-performance-tuning.md @@ -242,7 +242,14 @@ Data skew can severely downgrade the performance of join queries. This feature d spark.sql.adaptive.skewJoin.skewedPartitionFactor 10 - A partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than spark.sql.adaptive.advisoryPartitionSizeInBytes. + A partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than spark.sql.adaptive.skewedPartitionThresholdInBytes. + + + + spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes + 256MB + + A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than spark.sql.adaptive.skewJoin.skewedPartitionFactor multiplying the median partition size. Ideally this config should be set larger than spark.sql.adaptive.advisoryPartitionSizeInBytes. \ No newline at end of file diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1b00bed..c61a57e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -455,11 +455,20 @@ object SQLConf { buildConf("spark.sql.adaptive.skewJoin.skewedPartitionFactor") .doc("A partition is considered as skewed if its size is larger than this factor " + "multiplying the median partition size and also larger than " + -s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'") +"'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes'") .intConf .checkValue(_ > 0, "The skew factor must be positive.") .createWithDefault(10) + val SKEW_JOIN_SKEWED_PARTITION_THRESHOLD = +buildConf("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes") + .doc("A partition is considered as skewed if its size in bytes is larger than this " + +s"threshold and also larger than '${SKEW_JOIN_SKEWED_PARTITION_FACTOR.key}' " + +"multiplying the median partition size. Ideally this config should be set larger " + +s"than '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'.") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("256MB") + val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN = buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index e02b9af..b09e563 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -67,7 +67,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { */ private def
[spark] branch master updated (35d286b -> 05498af)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 35d286b [SPARK-31228][DSTREAMS] Add version information to the configuration of Kafka add 05498af [SPARK-31201][SQL] Add an individual config for skewed partition threshold No new revisions were added by this update. Summary of changes: docs/sql-performance-tuning.md | 9 - .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 12 +++- .../spark/sql/execution/adaptive/OptimizeSkewedJoin.scala| 2 +- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 2 +- 4 files changed, 21 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 474b1bb [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling 474b1bb is described below commit 474b1bb5c2bce2f83c4dd8e19b9b7c5b3aebd6c4 Author: Thomas Graves AuthorDate: Thu Mar 26 09:46:36 2020 -0500 [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling ### What changes were proposed in this pull request? This is the core scheduler changes to support Stage level scheduling. The main changes here include modification to the DAGScheduler to look at the ResourceProfiles associated with an RDD and have those applied inside the scheduler. Currently if multiple RDD's in a stage have conflicting ResourceProfiles we throw an error. logic to allow this will happen in SPARK-29153. I added the interfaces to RDD to add and get the REsourceProfile so that I could add unit tests for the scheduler. These are marked as private for now until we finish the feature and will be exposed in SPARK-29150. If you think this is confusing I can remove those and remove the tests and add them back later. I modified the task scheduler to make sure to only schedule on executor that exactly match the resource profile. It will then check those executors to make sure the current resources meet the task needs before assigning it. In here I changed the way we do the custom resource assignment. Other changes here include having the cpus per task passed around so that we can properly account for them. Previously we just used the one global config, but now it can change based on the ResourceProfile. I removed the exceptions that require the cores to be the limiting resource. With this change all the places I found that used executor cores /task cpus as slots has been updated to use the ResourceProfile logic and look to see what resource is limiting. ### Why are the changes needed? Stage level sheduling feature ### Does this PR introduce any user-facing change? No ### How was this patch tested? unit tests and lots of manual testing Closes #27773 from tgravescs/SPARK-29154. Lead-authored-by: Thomas Graves Co-authored-by: Thomas Graves Signed-off-by: Thomas Graves --- .../main/scala/org/apache/spark/SparkContext.scala | 27 +-- .../org/apache/spark/internal/config/Tests.scala | 9 + core/src/main/scala/org/apache/spark/rdd/RDD.scala | 27 +++ .../apache/spark/resource/ResourceProfile.scala| 42 +++-- .../spark/resource/ResourceProfileManager.scala| 11 +- .../org/apache/spark/resource/ResourceUtils.scala | 13 +- .../spark/resource/TaskResourceRequests.scala | 2 +- .../org/apache/spark/scheduler/DAGScheduler.scala | 70 +--- .../apache/spark/scheduler/SchedulerBackend.scala | 8 +- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 178 ++- .../scala/org/apache/spark/scheduler/TaskSet.scala | 3 +- .../apache/spark/scheduler/TaskSetManager.scala| 32 ++-- .../org/apache/spark/scheduler/WorkerOffer.scala | 5 +- .../cluster/CoarseGrainedSchedulerBackend.scala| 24 ++- .../scheduler/local/LocalSchedulerBackend.scala| 9 +- .../deploy/StandaloneDynamicAllocationSuite.scala | 1 - .../CoarseGrainedExecutorBackendSuite.scala| 4 +- .../CoarseGrainedSchedulerBackendSuite.scala | 13 +- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 115 +++- .../scheduler/ExternalClusterManagerSuite.scala| 3 +- .../org/apache/spark/scheduler/FakeTask.scala | 31 +++- .../org/apache/spark/scheduler/PoolSuite.scala | 4 +- .../scheduler/SchedulerIntegrationSuite.scala | 5 +- .../spark/scheduler/TaskSchedulerImplSuite.scala | 192 - .../spark/scheduler/TaskSetManagerSuite.scala | 91 ++ .../mesos/MesosFineGrainedSchedulerBackend.scala | 3 +- 26 files changed, 704 insertions(+), 218 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cdb98db..588e7dc 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1597,13 +1597,17 @@ class SparkContext(config: SparkConf) extends Logging { } /** - * Get the max number of tasks that can be concurrent launched currently. + * Get the max number of tasks that can be concurrent launched based on the ResourceProfile + * being used. * Note that please don't cache the value returned by this method, because the number can change * due to add/remove executors. * + * @param rp ResourceProfile which to use to calculate max concurrent tasks.