This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 24f3d4d1791 [SPARK-42743][SQL] Support analyze TimestampNTZ columns 24f3d4d1791 is described below commit 24f3d4d17913ec90a48ecf9dd23b4db7c19d10c2 Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Fri Mar 10 14:33:45 2023 +0300 [SPARK-42743][SQL] Support analyze TimestampNTZ columns ### What changes were proposed in this pull request? Support analyze TimestampNTZ columns ``` ANALYZE TABLE table_name [ PARTITION clause ] COMPUTE STATISTICS [ NOSCAN | FOR COLUMNS col1 [, ...] | FOR ALL COLUMNS ] ``` ### Why are the changes needed? Support computing statistics of TimestmapNTZ columns, which can be used for optimizations. ### Does this PR introduce _any_ user-facing change? No, the TimestampNTZ type is not released yet. ### How was this patch tested? Update existing UT Closes #40362 from gengliangwang/analyzeColumn. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../spark/sql/catalyst/catalog/interface.scala | 9 +++++-- .../sql/catalyst/util/TimestampFormatter.scala | 8 ++++++ .../execution/command/AnalyzeColumnCommand.scala | 5 ++-- .../spark/sql/execution/command/CommandUtils.scala | 6 ++--- .../spark/sql/StatisticsCollectionTestBase.scala | 29 +++++++++++++++++----- .../apache/spark/sql/hive/StatisticsSuite.scala | 3 ++- 6 files changed, 44 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 829c121c583..6f4c4f27efc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -661,11 +661,13 @@ object CatalogColumnStat extends Logging { def getTimestampFormatter( isParsing: Boolean, format: String = "yyyy-MM-dd HH:mm:ss.SSSSSS", - zoneId: ZoneId = ZoneOffset.UTC): TimestampFormatter = { + zoneId: ZoneId = ZoneOffset.UTC, + forTimestampNTZ: Boolean = false): TimestampFormatter = { TimestampFormatter( format = format, zoneId = zoneId, - isParsing = isParsing) + isParsing = isParsing, + forTimestampNTZ = forTimestampNTZ) } /** @@ -702,6 +704,9 @@ object CatalogColumnStat extends Logging { val externalValue = dataType match { case DateType => DateFormatter().format(v.asInstanceOf[Int]) case TimestampType => getTimestampFormatter(isParsing = false).format(v.asInstanceOf[Long]) + case TimestampNTZType => + getTimestampFormatter(isParsing = false, forTimestampNTZ = true) + .format(v.asInstanceOf[Long]) case BooleanType | _: IntegralType | FloatType | DoubleType => v case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal // This version of Spark does not use min/max for binary/string types so we ignore it. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 8ebe77978b5..392e8ebdc6c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -525,6 +525,14 @@ object TimestampFormatter { getFormatter(Some(format), zoneId, isParsing = isParsing) } + def apply( + format: String, + zoneId: ZoneId, + isParsing: Boolean, + forTimestampNTZ: Boolean): TimestampFormatter = { + getFormatter(Some(format), zoneId, isParsing = isParsing, forTimestampNTZ = forTimestampNTZ) + } + def apply(zoneId: ZoneId): TimestampFormatter = { getFormatter(None, zoneId, isParsing = false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala index d821b127e06..299f41eb55e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableTyp import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.{DatetimeType, _} /** @@ -139,8 +139,7 @@ case class AnalyzeColumnCommand( case _: DecimalType => true case DoubleType | FloatType => true case BooleanType => true - case DateType => true - case TimestampType => true + case _: DatetimeType => true case BinaryType | StringType => true case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index d847868c0ce..c656bdbafa0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -329,8 +329,7 @@ object CommandUtils extends Logging { case _: IntegralType => true case _: DecimalType => true case DoubleType | FloatType => true - case DateType => true - case TimestampType => true + case _: DatetimeType => true case _ => false } @@ -379,8 +378,7 @@ object CommandUtils extends Logging { case _: DecimalType => fixedLenTypeStruct case DoubleType | FloatType => fixedLenTypeStruct case BooleanType => fixedLenTypeStruct - case DateType => fixedLenTypeStruct - case TimestampType => fixedLenTypeStruct + case _: DatetimeType => fixedLenTypeStruct case BinaryType | StringType => // For string and binary type, we don't compute min, max or histogram val nullLit = Literal(null, col.dataType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala index 6c6ef1a118f..04e47ac4a11 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import java.{lang => jl} import java.io.File import java.sql.{Date, Timestamp} +import java.time.LocalDateTime import scala.collection.mutable import scala.util.Random @@ -59,10 +60,12 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils private val t1Internal = date(2016, 5, 8, 0, 0, 1, 123456) private val t1 = new Timestamp(DateTimeUtils.microsToMillis(t1Internal)) t1.setNanos(123456000) + private val tsNTZ1 = LocalDateTime.parse(t1Str.replace(" ", "T")) private val t2Str = "2016-05-09 00:00:02.987654" private val t2Internal = date(2016, 5, 9, 0, 0, 2, 987654) private val t2 = new Timestamp(DateTimeUtils.microsToMillis(t2Internal)) t2.setNanos(987654000) + private val tsNTZ2 = LocalDateTime.parse(t2Str.replace(" ", "T")) private val double1 = 1.123456789 private val double2 = 6.987654321 @@ -74,14 +77,14 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils protected val data = Seq[ (jl.Boolean, jl.Byte, jl.Short, jl.Integer, jl.Long, jl.Double, jl.Float, java.math.BigDecimal, - String, Array[Byte], Date, Timestamp, + String, Array[Byte], Date, Timestamp, LocalDateTime, Seq[Int])]( // scalastyle:off nonascii (false, 1.toByte, 1.toShort, 1, 1L, double1, 1.12345f, - dec1, "string escrito en español", "b1".getBytes, d1, t1, null), + dec1, "string escrito en español", "b1".getBytes, d1, t1, tsNTZ1, null), (true, 2.toByte, 30000.toShort, 40000000, 5536453629L, double2, 7.54321f, - dec2, "日本語で書かれたstring", "a string full of bytes".getBytes, d2, t2, null), - (null, null, null, null, null, null, null, null, null, null, null, null, null) + dec2, "日本語で書かれたstring", "a string full of bytes".getBytes, d2, t2, tsNTZ2, null), + (null, null, null, null, null, null, null, null, null, null, null, null, null, null) // scalastyle:on nonascii ) @@ -103,6 +106,8 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils "cdate" -> CatalogColumnStat(Some(2), Some(d1Str), Some(d2Str), Some(1), Some(4), Some(4)), "ctimestamp" -> CatalogColumnStat(Some(2), Some(t1Str), + Some(t2Str), Some(1), Some(8), Some(8)), + "ctimestamp_ntz" -> CatalogColumnStat(Some(2), Some(t1Str), Some(t2Str), Some(1), Some(8), Some(8)) ) @@ -136,6 +141,9 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils colStats.update("ctimestamp", stats("ctimestamp").copy(histogram = Some(Histogram(1, Array(HistogramBin(t1Internal, t1Internal, 1), HistogramBin(t1Internal, t2Internal, 1)))))) + colStats.update("ctimestamp_ntz", stats("ctimestamp_ntz").copy(histogram = + Some(Histogram(1, Array(HistogramBin(t1Internal, t1Internal, 1), + HistogramBin(t1Internal, t2Internal, 1)))))) colStats } @@ -220,7 +228,14 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils "spark.sql.statistics.colStats.ctimestamp.maxLen" -> "8", "spark.sql.statistics.colStats.ctimestamp.min" -> "2016-05-08 00:00:01.123456", "spark.sql.statistics.colStats.ctimestamp.nullCount" -> "1", - "spark.sql.statistics.colStats.ctimestamp.version" -> strVersion + "spark.sql.statistics.colStats.ctimestamp.version" -> strVersion, + "spark.sql.statistics.colStats.ctimestamp_ntz.avgLen" -> "8", + "spark.sql.statistics.colStats.ctimestamp_ntz.distinctCount" -> "2", + "spark.sql.statistics.colStats.ctimestamp_ntz.max" -> "2016-05-09 00:00:02.987654", + "spark.sql.statistics.colStats.ctimestamp_ntz.maxLen" -> "8", + "spark.sql.statistics.colStats.ctimestamp_ntz.min" -> "2016-05-08 00:00:01.123456", + "spark.sql.statistics.colStats.ctimestamp_ntz.nullCount" -> "1", + "spark.sql.statistics.colStats.ctimestamp_ntz.version" -> strVersion ) val expectedSerializedHistograms = Map( @@ -241,7 +256,9 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils "spark.sql.statistics.colStats.cdate.histogram" -> HistogramSerializer.serialize(statsWithHgms("cdate").histogram.get), "spark.sql.statistics.colStats.ctimestamp.histogram" -> - HistogramSerializer.serialize(statsWithHgms("ctimestamp").histogram.get) + HistogramSerializer.serialize(statsWithHgms("ctimestamp").histogram.get), + "spark.sql.statistics.colStats.ctimestamp_ntz.histogram" -> + HistogramSerializer.serialize(statsWithHgms("ctimestamp_ntz").histogram.get) ) private val randomName = new Random(31) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 65fd2f72727..507c482525c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1154,7 +1154,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto val tableName = "column_stats_test_de" // (data.head.productArity - 1) because the last column does not support stats collection. assert(stats.size == data.head.productArity - 1) - val df = data.toDF(stats.keys.toSeq :+ "carray" : _*) + // Hive can't parse data type "timestamp_ntz" + val df = data.toDF(stats.keys.toSeq :+ "carray" : _*).drop("ctimestamp_ntz") withTable(tableName) { df.write.saveAsTable(tableName) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org