This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2a059e6 [SPARK-30788][SQL] Support `SimpleDateFormat` and `FastDateFormat` as legacy date/timestamp formatters 2a059e6 is described below commit 2a059e65bae93ddb61f7154d81da3fa0c2dcb669 Author: Maxim Gekk <max.g...@gmail.com> AuthorDate: Wed Feb 12 20:12:38 2020 +0800 [SPARK-30788][SQL] Support `SimpleDateFormat` and `FastDateFormat` as legacy date/timestamp formatters ### What changes were proposed in this pull request? In the PR, I propose to add legacy date/timestamp formatters based on `SimpleDateFormat` and `FastDateFormat`: - `LegacyFastTimestampFormatter` - uses `FastDateFormat` and supports parsing/formatting in microsecond precision. The code was borrowed from Spark 2.4, see https://github.com/apache/spark/pull/26507 & https://github.com/apache/spark/pull/26582 - `LegacySimpleTimestampFormatter` uses `SimpleDateFormat`, and support the `lenient` mode. When the `lenient` parameter is set to `false`, the parser become much stronger in checking its input. ### Why are the changes needed? Spark 2.4.x uses the following parsers for parsing/formatting date/timestamp strings: - `DateTimeFormat` in CSV/JSON datasource - `SimpleDateFormat` - is used in JDBC datasource, in partitions parsing. - `SimpleDateFormat` in strong mode (`lenient = false`), see https://github.com/apache/spark/blob/branch-2.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala#L124. It is used by the `date_format`, `from_unixtime`, `unix_timestamp` and `to_unix_timestamp` functions. The PR aims to make Spark 3.0 compatible with Spark 2.4.x in all those cases when `spark.sql.legacy.timeParser.enabled` is set to `true`. ### Does this PR introduce any user-facing change? This shouldn't change behavior with default settings. If `spark.sql.legacy.timeParser.enabled` is set to `true`, users should observe behavior of Spark 2.4. ### How was this patch tested? - Modified tests in `DateExpressionsSuite` to check the legacy parser - `SimpleDateFormat`. - Added `CSVLegacyTimeParserSuite` and `JsonLegacyTimeParserSuite` to run `CSVSuite` and `JsonSuite` with the legacy parser - `FastDateFormat`. Closes #27524 from MaxGekk/timestamp-formatter-legacy-fallback. Authored-by: Maxim Gekk <max.g...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit c1986204e59f1e8cc4b611d5a578cb248cb74c28) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/csv/CSVInferSchema.scala | 4 +- .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 4 +- .../sql/catalyst/csv/UnivocityGenerator.scala | 7 +- .../spark/sql/catalyst/csv/UnivocityParser.scala | 7 +- .../catalyst/expressions/datetimeExpressions.scala | 52 +-- .../spark/sql/catalyst/json/JSONOptions.scala | 4 +- .../spark/sql/catalyst/json/JacksonGenerator.scala | 7 +- .../spark/sql/catalyst/json/JacksonParser.scala | 7 +- .../spark/sql/catalyst/json/JsonInferSchema.scala | 4 +- .../spark/sql/catalyst/util/DateFormatter.scala | 66 +++- .../sql/catalyst/util/TimestampFormatter.scala | 132 ++++++- .../scala/org/apache/spark/sql/types/Decimal.scala | 2 +- .../expressions/DateExpressionsSuite.scala | 390 +++++++++++---------- .../scala/org/apache/spark/sql/functions.scala | 7 +- .../test/resources/test-data/bad_after_good.csv | 2 +- .../test/resources/test-data/value-malformed.csv | 2 +- .../org/apache/spark/sql/DateFunctionsSuite.scala | 346 +++++++++--------- .../sql/execution/datasources/csv/CSVSuite.scala | 23 +- .../sql/execution/datasources/json/JsonSuite.scala | 7 + 19 files changed, 654 insertions(+), 419 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 03cc3cb..c6a0318 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -24,6 +24,7 @@ import scala.util.control.Exception.allCatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.catalyst.util.TimestampFormatter import org.apache.spark.sql.types._ @@ -32,7 +33,8 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { private val timestampParser = TimestampFormatter( options.timestampFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) private val decimalParser = if (options.locale == Locale.US) { // Special handling the default locale for backward compatibility diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 5e40d74..8892037 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -146,10 +146,10 @@ class CSVOptions( // A language tag in IETF BCP 47 format val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US) - val dateFormat: String = parameters.getOrElse("dateFormat", "uuuu-MM-dd") + val dateFormat: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern) val timestampFormat: String = - parameters.getOrElse("timestampFormat", "uuuu-MM-dd'T'HH:mm:ss.SSSXXX") + parameters.getOrElse("timestampFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX") val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index 05cb91d..00e3d49 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -23,6 +23,7 @@ import com.univocity.parsers.csv.CsvWriter import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.types._ class UnivocityGenerator( @@ -44,11 +45,13 @@ class UnivocityGenerator( private val timestampFormatter = TimestampFormatter( options.timestampFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) private val dateFormatter = DateFormatter( options.dateFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) private def makeConverter(dataType: DataType): ValueConverter = dataType match { case DateType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 5510953..cd69c21 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -27,6 +27,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow} import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -86,11 +87,13 @@ class UnivocityParser( private val timestampFormatter = TimestampFormatter( options.timestampFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) private val dateFormatter = DateFormatter( options.dateFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) private val csvFilters = new CSVFilters(filters, requiredSchema) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index aa2bd5a..1f4c8c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -30,9 +30,10 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ -import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter} +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, LegacyDateFormats, TimestampFormatter} import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeUtils._ +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.SIMPLE_DATE_FORMAT import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -622,13 +623,15 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti @transient private lazy val formatter: Option[TimestampFormatter] = { if (right.foldable) { - Option(right.eval()).map(format => TimestampFormatter(format.toString, zoneId)) + Option(right.eval()).map { format => + TimestampFormatter(format.toString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT) + } } else None } override protected def nullSafeEval(timestamp: Any, format: Any): Any = { val tf = if (formatter.isEmpty) { - TimestampFormatter(format.toString, zoneId) + TimestampFormatter(format.toString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT) } else { formatter.get } @@ -643,10 +646,14 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti }) }.getOrElse { val tf = TimestampFormatter.getClass.getName.stripSuffix("$") + val ldf = LegacyDateFormats.getClass.getName.stripSuffix("$") val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName) defineCodeGen(ctx, ev, (timestamp, format) => { - s"""UTF8String.fromString($tf$$.MODULE$$.apply($format.toString(), $zid) - .format($timestamp))""" + s"""|UTF8String.fromString($tf$$.MODULE$$.apply( + | $format.toString(), + | $zid, + | $ldf$$.MODULE$$.SIMPLE_DATE_FORMAT()) + |.format($timestamp))""".stripMargin }) } } @@ -688,7 +695,7 @@ case class ToUnixTimestamp( copy(timeZoneId = Option(timeZoneId)) def this(time: Expression) = { - this(time, Literal("uuuu-MM-dd HH:mm:ss")) + this(time, Literal(TimestampFormatter.defaultPattern)) } override def prettyName: String = "to_unix_timestamp" @@ -732,7 +739,7 @@ case class UnixTimestamp(timeExp: Expression, format: Expression, timeZoneId: Op copy(timeZoneId = Option(timeZoneId)) def this(time: Expression) = { - this(time, Literal("uuuu-MM-dd HH:mm:ss")) + this(time, Literal(TimestampFormatter.defaultPattern)) } def this() = { @@ -758,7 +765,7 @@ abstract class ToTimestamp private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String] private lazy val formatter: TimestampFormatter = try { - TimestampFormatter(constFormat.toString, zoneId) + TimestampFormatter(constFormat.toString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT) } catch { case NonFatal(_) => null } @@ -791,8 +798,8 @@ abstract class ToTimestamp } else { val formatString = f.asInstanceOf[UTF8String].toString try { - TimestampFormatter(formatString, zoneId).parse( - t.asInstanceOf[UTF8String].toString) / downScaleFactor + TimestampFormatter(formatString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT) + .parse(t.asInstanceOf[UTF8String].toString) / downScaleFactor } catch { case NonFatal(_) => null } @@ -831,13 +838,16 @@ abstract class ToTimestamp } case StringType => val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName) - val locale = ctx.addReferenceObj("locale", Locale.US) val tf = TimestampFormatter.getClass.getName.stripSuffix("$") + val ldf = LegacyDateFormats.getClass.getName.stripSuffix("$") nullSafeCodeGen(ctx, ev, (string, format) => { s""" try { - ${ev.value} = $tf$$.MODULE$$.apply($format.toString(), $zid, $locale) - .parse($string.toString()) / $downScaleFactor; + ${ev.value} = $tf$$.MODULE$$.apply( + $format.toString(), + $zid, + $ldf$$.MODULE$$.SIMPLE_DATE_FORMAT()) + .parse($string.toString()) / $downScaleFactor; } catch (java.lang.IllegalArgumentException e) { ${ev.isNull} = true; } catch (java.text.ParseException e) { @@ -908,7 +918,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[ override def prettyName: String = "from_unixtime" def this(unix: Expression) = { - this(unix, Literal("uuuu-MM-dd HH:mm:ss")) + this(unix, Literal(TimestampFormatter.defaultPattern)) } override def dataType: DataType = StringType @@ -922,7 +932,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[ private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String] private lazy val formatter: TimestampFormatter = try { - TimestampFormatter(constFormat.toString, zoneId) + TimestampFormatter(constFormat.toString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT) } catch { case NonFatal(_) => null } @@ -948,8 +958,9 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[ null } else { try { - UTF8String.fromString(TimestampFormatter(f.toString, zoneId) - .format(time.asInstanceOf[Long] * MICROS_PER_SECOND)) + UTF8String.fromString( + TimestampFormatter(f.toString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT) + .format(time.asInstanceOf[Long] * MICROS_PER_SECOND)) } catch { case NonFatal(_) => null } @@ -980,13 +991,14 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[ } } else { val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName) - val locale = ctx.addReferenceObj("locale", Locale.US) val tf = TimestampFormatter.getClass.getName.stripSuffix("$") + val ldf = LegacyDateFormats.getClass.getName.stripSuffix("$") nullSafeCodeGen(ctx, ev, (seconds, f) => { s""" try { - ${ev.value} = UTF8String.fromString($tf$$.MODULE$$.apply($f.toString(), $zid, $locale). - format($seconds * 1000000L)); + ${ev.value} = UTF8String.fromString( + $tf$$.MODULE$$.apply($f.toString(), $zid, $ldf$$.MODULE$$.SIMPLE_DATE_FORMAT()) + .format($seconds * 1000000L)); } catch (java.lang.IllegalArgumentException e) { ${ev.isNull} = true; }""" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index cdf4b46..45c4edf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -88,10 +88,10 @@ private[sql] class JSONOptions( val zoneId: ZoneId = DateTimeUtils.getZoneId( parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)) - val dateFormat: String = parameters.getOrElse("dateFormat", "uuuu-MM-dd") + val dateFormat: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern) val timestampFormat: String = - parameters.getOrElse("timestampFormat", "uuuu-MM-dd'T'HH:mm:ss.SSSXXX") + parameters.getOrElse("timestampFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX") val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 9c63593..141360f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -24,6 +24,7 @@ import com.fasterxml.jackson.core._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.types._ /** @@ -80,11 +81,13 @@ private[sql] class JacksonGenerator( private val timestampFormatter = TimestampFormatter( options.timestampFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) private val dateFormatter = DateFormatter( options.dateFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) private def makeWriter(dataType: DataType): ValueWriter = dataType match { case NullType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 76efa57..1e408cd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -58,11 +59,13 @@ class JacksonParser( private val timestampFormatter = TimestampFormatter( options.timestampFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) private val dateFormatter = DateFormatter( options.dateFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) /** * Create a converter which converts the JSON documents held by the `JsonParser` diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index f030955..82dd6d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -40,7 +41,8 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { private val timestampFormatter = TimestampFormatter( options.timestampFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) /** * Infer the type of a collection of json records in three stages: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index 28189b6..2cf82d1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -17,8 +17,9 @@ package org.apache.spark.sql.catalyst.util +import java.text.SimpleDateFormat import java.time.{LocalDate, ZoneId} -import java.util.Locale +import java.util.{Date, Locale} import org.apache.commons.lang3.time.FastDateFormat @@ -51,41 +52,76 @@ class Iso8601DateFormatter( } } -class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter { - @transient - private lazy val format = FastDateFormat.getInstance(pattern, locale) +trait LegacyDateFormatter extends DateFormatter { + def parseToDate(s: String): Date + def formatDate(d: Date): String override def parse(s: String): Int = { - val milliseconds = format.parse(s).getTime + val milliseconds = parseToDate(s).getTime DateTimeUtils.millisToDays(milliseconds) } override def format(days: Int): String = { val date = DateTimeUtils.toJavaDate(days) - format.format(date) + formatDate(date) } } +class LegacyFastDateFormatter(pattern: String, locale: Locale) extends LegacyDateFormatter { + @transient + private lazy val fdf = FastDateFormat.getInstance(pattern, locale) + override def parseToDate(s: String): Date = fdf.parse(s) + override def formatDate(d: Date): String = fdf.format(d) +} + +class LegacySimpleDateFormatter(pattern: String, locale: Locale) extends LegacyDateFormatter { + @transient + private lazy val sdf = new SimpleDateFormat(pattern, locale) + override def parseToDate(s: String): Date = sdf.parse(s) + override def formatDate(d: Date): String = sdf.format(d) +} + object DateFormatter { + import LegacyDateFormats._ + val defaultLocale: Locale = Locale.US - def apply(format: String, zoneId: ZoneId, locale: Locale): DateFormatter = { + def defaultPattern(): String = { + if (SQLConf.get.legacyTimeParserEnabled) "yyyy-MM-dd" else "uuuu-MM-dd" + } + + private def getFormatter( + format: Option[String], + zoneId: ZoneId, + locale: Locale = defaultLocale, + legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): DateFormatter = { + + val pattern = format.getOrElse(defaultPattern) if (SQLConf.get.legacyTimeParserEnabled) { - new LegacyDateFormatter(format, locale) + legacyFormat match { + case FAST_DATE_FORMAT => + new LegacyFastDateFormatter(pattern, locale) + case SIMPLE_DATE_FORMAT | LENIENT_SIMPLE_DATE_FORMAT => + new LegacySimpleDateFormatter(pattern, locale) + } } else { - new Iso8601DateFormatter(format, zoneId, locale) + new Iso8601DateFormatter(pattern, zoneId, locale) } } + def apply( + format: String, + zoneId: ZoneId, + locale: Locale, + legacyFormat: LegacyDateFormat): DateFormatter = { + getFormatter(Some(format), zoneId, locale, legacyFormat) + } + def apply(format: String, zoneId: ZoneId): DateFormatter = { - apply(format, zoneId, defaultLocale) + getFormatter(Some(format), zoneId) } def apply(zoneId: ZoneId): DateFormatter = { - if (SQLConf.get.legacyTimeParserEnabled) { - new LegacyDateFormatter("yyyy-MM-dd", defaultLocale) - } else { - new Iso8601DateFormatter("uuuu-MM-dd", zoneId, defaultLocale) - } + getFormatter(None, zoneId) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index fe1a4fe..4893a7e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -17,19 +17,20 @@ package org.apache.spark.sql.catalyst.util -import java.text.ParseException +import java.text.{ParseException, ParsePosition, SimpleDateFormat} import java.time._ import java.time.format.DateTimeParseException import java.time.temporal.ChronoField.MICRO_OF_SECOND import java.time.temporal.TemporalQueries -import java.util.{Locale, TimeZone} +import java.util.{Calendar, GregorianCalendar, Locale, TimeZone} import java.util.concurrent.TimeUnit.SECONDS import org.apache.commons.lang3.time.FastDateFormat -import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS -import org.apache.spark.sql.catalyst.util.DateTimeUtils.convertSpecialTimestamp +import org.apache.spark.sql.catalyst.util.DateTimeConstants._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{ convertSpecialTimestamp, SQLTimestamp} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.Decimal sealed trait TimestampFormatter extends Serializable { /** @@ -90,44 +91,139 @@ class FractionTimestampFormatter(zoneId: ZoneId) override protected lazy val formatter = DateTimeFormatterHelper.fractionFormatter } -class LegacyTimestampFormatter( +/** + * The custom sub-class of `GregorianCalendar` is needed to get access to + * protected `fields` immediately after parsing. We cannot use + * the `get()` method because it performs normalization of the fraction + * part. Accordingly, the `MILLISECOND` field doesn't contain original value. + * + * Also this class allows to set raw value to the `MILLISECOND` field + * directly before formatting. + */ +class MicrosCalendar(tz: TimeZone, digitsInFraction: Int) + extends GregorianCalendar(tz, Locale.US) { + // Converts parsed `MILLISECOND` field to seconds fraction in microsecond precision. + // For example if the fraction pattern is `SSSS` then `digitsInFraction` = 4, and + // if the `MILLISECOND` field was parsed to `1234`. + def getMicros(): SQLTimestamp = { + // Append 6 zeros to the field: 1234 -> 1234000000 + val d = fields(Calendar.MILLISECOND) * MICROS_PER_SECOND + // Take the first 6 digits from `d`: 1234000000 -> 123400 + // The rest contains exactly `digitsInFraction`: `0000` = 10 ^ digitsInFraction + // So, the result is `(1234 * 1000000) / (10 ^ digitsInFraction) + d / Decimal.POW_10(digitsInFraction) + } + + // Converts the seconds fraction in microsecond precision to a value + // that can be correctly formatted according to the specified fraction pattern. + // The method performs operations opposite to `getMicros()`. + def setMicros(micros: Long): Unit = { + val d = micros * Decimal.POW_10(digitsInFraction) + fields(Calendar.MILLISECOND) = (d / MICROS_PER_SECOND).toInt + } +} + +class LegacyFastTimestampFormatter( pattern: String, zoneId: ZoneId, locale: Locale) extends TimestampFormatter { - @transient private lazy val format = + @transient private lazy val fastDateFormat = FastDateFormat.getInstance(pattern, TimeZone.getTimeZone(zoneId), locale) + @transient private lazy val cal = new MicrosCalendar( + fastDateFormat.getTimeZone, + fastDateFormat.getPattern.count(_ == 'S')) + + def parse(s: String): SQLTimestamp = { + cal.clear() // Clear the calendar because it can be re-used many times + if (!fastDateFormat.parse(s, new ParsePosition(0), cal)) { + throw new IllegalArgumentException(s"'$s' is an invalid timestamp") + } + val micros = cal.getMicros() + cal.set(Calendar.MILLISECOND, 0) + cal.getTimeInMillis * MICROS_PER_MILLIS + micros + } + + def format(timestamp: SQLTimestamp): String = { + cal.setTimeInMillis(Math.floorDiv(timestamp, MICROS_PER_SECOND) * MILLIS_PER_SECOND) + cal.setMicros(Math.floorMod(timestamp, MICROS_PER_SECOND)) + fastDateFormat.format(cal) + } +} - protected def toMillis(s: String): Long = format.parse(s).getTime +class LegacySimpleTimestampFormatter( + pattern: String, + zoneId: ZoneId, + locale: Locale, + lenient: Boolean = true) extends TimestampFormatter { + @transient private lazy val sdf = { + val formatter = new SimpleDateFormat(pattern, locale) + formatter.setTimeZone(TimeZone.getTimeZone(zoneId)) + formatter.setLenient(lenient) + formatter + } - override def parse(s: String): Long = toMillis(s) * MICROS_PER_MILLIS + override def parse(s: String): Long = { + sdf.parse(s).getTime * MICROS_PER_MILLIS + } override def format(us: Long): String = { - format.format(DateTimeUtils.toJavaTimestamp(us)) + val timestamp = DateTimeUtils.toJavaTimestamp(us) + sdf.format(timestamp) } } +object LegacyDateFormats extends Enumeration { + type LegacyDateFormat = Value + val FAST_DATE_FORMAT, SIMPLE_DATE_FORMAT, LENIENT_SIMPLE_DATE_FORMAT = Value +} + object TimestampFormatter { + import LegacyDateFormats._ + val defaultLocale: Locale = Locale.US - def apply(format: String, zoneId: ZoneId, locale: Locale): TimestampFormatter = { + def defaultPattern(): String = s"${DateFormatter.defaultPattern()} HH:mm:ss" + + private def getFormatter( + format: Option[String], + zoneId: ZoneId, + locale: Locale = defaultLocale, + legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): TimestampFormatter = { + + val pattern = format.getOrElse(defaultPattern) if (SQLConf.get.legacyTimeParserEnabled) { - new LegacyTimestampFormatter(format, zoneId, locale) + legacyFormat match { + case FAST_DATE_FORMAT => + new LegacyFastTimestampFormatter(pattern, zoneId, locale) + case SIMPLE_DATE_FORMAT => + new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = false) + case LENIENT_SIMPLE_DATE_FORMAT => + new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = true) + } } else { - new Iso8601TimestampFormatter(format, zoneId, locale) + new Iso8601TimestampFormatter(pattern, zoneId, locale) } } + def apply( + format: String, + zoneId: ZoneId, + locale: Locale, + legacyFormat: LegacyDateFormat): TimestampFormatter = { + getFormatter(Some(format), zoneId, locale, legacyFormat) + } + + def apply(format: String, zoneId: ZoneId, legacyFormat: LegacyDateFormat): TimestampFormatter = { + getFormatter(Some(format), zoneId, defaultLocale, legacyFormat) + } + def apply(format: String, zoneId: ZoneId): TimestampFormatter = { - apply(format, zoneId, defaultLocale) + getFormatter(Some(format), zoneId) } def apply(zoneId: ZoneId): TimestampFormatter = { - if (SQLConf.get.legacyTimeParserEnabled) { - new LegacyTimestampFormatter("yyyy-MM-dd HH:mm:ss", zoneId, defaultLocale) - } else { - new Iso8601TimestampFormatter("uuuu-MM-dd HH:mm:ss", zoneId, defaultLocale) - } + getFormatter(None, zoneId) } def getFractionFormatter(zoneId: ZoneId): TimestampFormatter = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala index 9ce64b0..f32e48e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala @@ -541,7 +541,7 @@ object Decimal { /** Maximum number of decimal digits a Long can represent */ val MAX_LONG_DIGITS = 18 - private val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => math.pow(10, i).toLong) + val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => math.pow(10, i).toLong) private val BIG_DEC_ZERO = BigDecimal(0) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index 274d0be..f04149a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils, Timesta import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneGMT +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -241,41 +242,45 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("DateFormat") { - checkEvaluation( - DateFormatClass(Literal.create(null, TimestampType), Literal("y"), gmtId), - null) - checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId), - Literal.create(null, StringType), gmtId), null) - - checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId), - Literal("y"), gmtId), "2015") - checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), gmtId), "2013") - checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId), - Literal("H"), gmtId), "0") - checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), gmtId), "13") - - checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, pstId), - Literal("y"), pstId), "2015") - checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), pstId), "2013") - checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, pstId), - Literal("H"), pstId), "0") - checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), pstId), "5") - - checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, jstId), - Literal("y"), jstId), "2015") - checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), jstId), "2013") - checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, jstId), - Literal("H"), jstId), "0") - checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), jstId), "22") - - // SPARK-28072 The codegen path should work - checkEvaluation( - expression = DateFormatClass( - BoundReference(ordinal = 0, dataType = TimestampType, nullable = true), - BoundReference(ordinal = 1, dataType = StringType, nullable = true), - jstId), - expected = "22", - inputRow = InternalRow(DateTimeUtils.fromJavaTimestamp(ts), UTF8String.fromString("H"))) + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + checkEvaluation( + DateFormatClass(Literal.create(null, TimestampType), Literal("y"), gmtId), + null) + checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId), + Literal.create(null, StringType), gmtId), null) + + checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId), + Literal("y"), gmtId), "2015") + checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), gmtId), "2013") + checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId), + Literal("H"), gmtId), "0") + checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), gmtId), "13") + + checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, pstId), + Literal("y"), pstId), "2015") + checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), pstId), "2013") + checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, pstId), + Literal("H"), pstId), "0") + checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), pstId), "5") + + checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, jstId), + Literal("y"), jstId), "2015") + checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), jstId), "2013") + checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, jstId), + Literal("H"), jstId), "0") + checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), jstId), "22") + + // SPARK-28072 The codegen path should work + checkEvaluation( + expression = DateFormatClass( + BoundReference(ordinal = 0, dataType = TimestampType, nullable = true), + BoundReference(ordinal = 1, dataType = StringType, nullable = true), + jstId), + expected = "22", + inputRow = InternalRow(DateTimeUtils.fromJavaTimestamp(ts), UTF8String.fromString("H"))) + } + } } test("Hour") { @@ -705,162 +710,189 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("from_unixtime") { - val fmt1 = "yyyy-MM-dd HH:mm:ss" - val sdf1 = new SimpleDateFormat(fmt1, Locale.US) - val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" - val sdf2 = new SimpleDateFormat(fmt2, Locale.US) - for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) { - val timeZoneId = Option(tz.getID) - sdf1.setTimeZone(tz) - sdf2.setTimeZone(tz) + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + val fmt1 = "yyyy-MM-dd HH:mm:ss" + val sdf1 = new SimpleDateFormat(fmt1, Locale.US) + val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" + val sdf2 = new SimpleDateFormat(fmt2, Locale.US) + for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) { + val timeZoneId = Option(tz.getID) + sdf1.setTimeZone(tz) + sdf2.setTimeZone(tz) - checkEvaluation( - FromUnixTime(Literal(0L), Literal(fmt1), timeZoneId), - sdf1.format(new Timestamp(0))) - checkEvaluation(FromUnixTime( - Literal(1000L), Literal(fmt1), timeZoneId), - sdf1.format(new Timestamp(1000000))) - checkEvaluation( - FromUnixTime(Literal(-1000L), Literal(fmt2), timeZoneId), - sdf2.format(new Timestamp(-1000000))) - checkEvaluation( - FromUnixTime(Literal.create(null, LongType), Literal.create(null, StringType), timeZoneId), - null) - checkEvaluation( - FromUnixTime(Literal.create(null, LongType), Literal(fmt1), timeZoneId), - null) - checkEvaluation( - FromUnixTime(Literal(1000L), Literal.create(null, StringType), timeZoneId), - null) - checkEvaluation( - FromUnixTime(Literal(0L), Literal("not a valid format"), timeZoneId), null) + checkEvaluation( + FromUnixTime(Literal(0L), Literal(fmt1), timeZoneId), + sdf1.format(new Timestamp(0))) + checkEvaluation(FromUnixTime( + Literal(1000L), Literal(fmt1), timeZoneId), + sdf1.format(new Timestamp(1000000))) + checkEvaluation( + FromUnixTime(Literal(-1000L), Literal(fmt2), timeZoneId), + sdf2.format(new Timestamp(-1000000))) + checkEvaluation( + FromUnixTime( + Literal.create(null, LongType), + Literal.create(null, StringType), timeZoneId), + null) + checkEvaluation( + FromUnixTime(Literal.create(null, LongType), Literal(fmt1), timeZoneId), + null) + checkEvaluation( + FromUnixTime(Literal(1000L), Literal.create(null, StringType), timeZoneId), + null) + checkEvaluation( + FromUnixTime(Literal(0L), Literal("not a valid format"), timeZoneId), null) - // SPARK-28072 The codegen path for non-literal input should also work - checkEvaluation( - expression = FromUnixTime( - BoundReference(ordinal = 0, dataType = LongType, nullable = true), - BoundReference(ordinal = 1, dataType = StringType, nullable = true), - timeZoneId), - expected = UTF8String.fromString(sdf1.format(new Timestamp(0))), - inputRow = InternalRow(0L, UTF8String.fromString(fmt1))) + // SPARK-28072 The codegen path for non-literal input should also work + checkEvaluation( + expression = FromUnixTime( + BoundReference(ordinal = 0, dataType = LongType, nullable = true), + BoundReference(ordinal = 1, dataType = StringType, nullable = true), + timeZoneId), + expected = UTF8String.fromString(sdf1.format(new Timestamp(0))), + inputRow = InternalRow(0L, UTF8String.fromString(fmt1))) + } + } } } test("unix_timestamp") { - val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US) - val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" - val sdf2 = new SimpleDateFormat(fmt2, Locale.US) - val fmt3 = "yy-MM-dd" - val sdf3 = new SimpleDateFormat(fmt3, Locale.US) - sdf3.setTimeZone(TimeZoneGMT) - - withDefaultTimeZone(TimeZoneGMT) { - for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) { - val timeZoneId = Option(tz.getID) - sdf1.setTimeZone(tz) - sdf2.setTimeZone(tz) - - val date1 = Date.valueOf("2015-07-24") - checkEvaluation(UnixTimestamp( - Literal(sdf1.format(new Timestamp(0))), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), 0L) - checkEvaluation(UnixTimestamp( - Literal(sdf1.format(new Timestamp(1000000))), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), - 1000L) - checkEvaluation( - UnixTimestamp( - Literal(new Timestamp(1000000)), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), - 1000L) - checkEvaluation( - UnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), - MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) - checkEvaluation( - UnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), Literal(fmt2), timeZoneId), - -1000L) - checkEvaluation(UnixTimestamp( - Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId), - MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis( - DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz))) - val t1 = UnixTimestamp( - CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long] - val t2 = UnixTimestamp( - CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long] - assert(t2 - t1 <= 1) - checkEvaluation( - UnixTimestamp( - Literal.create(null, DateType), Literal.create(null, StringType), timeZoneId), - null) - checkEvaluation( - UnixTimestamp(Literal.create(null, DateType), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), - null) - checkEvaluation( - UnixTimestamp(Literal(date1), Literal.create(null, StringType), timeZoneId), - MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) - checkEvaluation( - UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format"), timeZoneId), null) + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US) + val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" + val sdf2 = new SimpleDateFormat(fmt2, Locale.US) + val fmt3 = "yy-MM-dd" + val sdf3 = new SimpleDateFormat(fmt3, Locale.US) + sdf3.setTimeZone(TimeZoneGMT) + + withDefaultTimeZone(TimeZoneGMT) { + for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) { + val timeZoneId = Option(tz.getID) + sdf1.setTimeZone(tz) + sdf2.setTimeZone(tz) + + val date1 = Date.valueOf("2015-07-24") + checkEvaluation(UnixTimestamp( + Literal(sdf1.format(new Timestamp(0))), + Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), 0L) + checkEvaluation(UnixTimestamp( + Literal(sdf1.format(new Timestamp(1000000))), + Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), + 1000L) + checkEvaluation( + UnixTimestamp( + Literal(new Timestamp(1000000)), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), + 1000L) + checkEvaluation( + UnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), + MILLISECONDS.toSeconds( + DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) + checkEvaluation( + UnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), + Literal(fmt2), timeZoneId), + -1000L) + checkEvaluation(UnixTimestamp( + Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId), + MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis( + DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz))) + val t1 = UnixTimestamp( + CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long] + val t2 = UnixTimestamp( + CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long] + assert(t2 - t1 <= 1) + checkEvaluation( + UnixTimestamp( + Literal.create(null, DateType), Literal.create(null, StringType), timeZoneId), + null) + checkEvaluation( + UnixTimestamp( + Literal.create(null, DateType), + Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), + null) + checkEvaluation( + UnixTimestamp(Literal(date1), Literal.create(null, StringType), timeZoneId), + MILLISECONDS.toSeconds( + DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) + checkEvaluation( + UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format"), timeZoneId), null) + } + } } } } test("to_unix_timestamp") { - val fmt1 = "yyyy-MM-dd HH:mm:ss" - val sdf1 = new SimpleDateFormat(fmt1, Locale.US) - val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" - val sdf2 = new SimpleDateFormat(fmt2, Locale.US) - val fmt3 = "yy-MM-dd" - val sdf3 = new SimpleDateFormat(fmt3, Locale.US) - sdf3.setTimeZone(TimeZoneGMT) - - withDefaultTimeZone(TimeZoneGMT) { - for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) { - val timeZoneId = Option(tz.getID) - sdf1.setTimeZone(tz) - sdf2.setTimeZone(tz) - - val date1 = Date.valueOf("2015-07-24") - checkEvaluation(ToUnixTimestamp( - Literal(sdf1.format(new Timestamp(0))), Literal(fmt1), timeZoneId), 0L) - checkEvaluation(ToUnixTimestamp( - Literal(sdf1.format(new Timestamp(1000000))), Literal(fmt1), timeZoneId), - 1000L) - checkEvaluation(ToUnixTimestamp( - Literal(new Timestamp(1000000)), Literal(fmt1)), - 1000L) - checkEvaluation( - ToUnixTimestamp(Literal(date1), Literal(fmt1), timeZoneId), - MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) - checkEvaluation( - ToUnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), Literal(fmt2), timeZoneId), - -1000L) - checkEvaluation(ToUnixTimestamp( - Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId), - MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis( - DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz))) - val t1 = ToUnixTimestamp( - CurrentTimestamp(), Literal(fmt1)).eval().asInstanceOf[Long] - val t2 = ToUnixTimestamp( - CurrentTimestamp(), Literal(fmt1)).eval().asInstanceOf[Long] - assert(t2 - t1 <= 1) - checkEvaluation(ToUnixTimestamp( - Literal.create(null, DateType), Literal.create(null, StringType), timeZoneId), null) - checkEvaluation( - ToUnixTimestamp( - Literal.create(null, DateType), Literal(fmt1), timeZoneId), - null) - checkEvaluation(ToUnixTimestamp( - Literal(date1), Literal.create(null, StringType), timeZoneId), - MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) - checkEvaluation( - ToUnixTimestamp(Literal("2015-07-24"), Literal("not a valid format"), timeZoneId), null) + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + val fmt1 = "yyyy-MM-dd HH:mm:ss" + val sdf1 = new SimpleDateFormat(fmt1, Locale.US) + val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" + val sdf2 = new SimpleDateFormat(fmt2, Locale.US) + val fmt3 = "yy-MM-dd" + val sdf3 = new SimpleDateFormat(fmt3, Locale.US) + sdf3.setTimeZone(TimeZoneGMT) + + withDefaultTimeZone(TimeZoneGMT) { + for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) { + val timeZoneId = Option(tz.getID) + sdf1.setTimeZone(tz) + sdf2.setTimeZone(tz) + + val date1 = Date.valueOf("2015-07-24") + checkEvaluation(ToUnixTimestamp( + Literal(sdf1.format(new Timestamp(0))), Literal(fmt1), timeZoneId), 0L) + checkEvaluation(ToUnixTimestamp( + Literal(sdf1.format(new Timestamp(1000000))), Literal(fmt1), timeZoneId), + 1000L) + checkEvaluation(ToUnixTimestamp( + Literal(new Timestamp(1000000)), Literal(fmt1)), + 1000L) + checkEvaluation( + ToUnixTimestamp(Literal(date1), Literal(fmt1), timeZoneId), + MILLISECONDS.toSeconds( + DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) + checkEvaluation( + ToUnixTimestamp( + Literal(sdf2.format(new Timestamp(-1000000))), + Literal(fmt2), timeZoneId), + -1000L) + checkEvaluation(ToUnixTimestamp( + Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId), + MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis( + DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz))) + val t1 = ToUnixTimestamp( + CurrentTimestamp(), Literal(fmt1)).eval().asInstanceOf[Long] + val t2 = ToUnixTimestamp( + CurrentTimestamp(), Literal(fmt1)).eval().asInstanceOf[Long] + assert(t2 - t1 <= 1) + checkEvaluation(ToUnixTimestamp( + Literal.create(null, DateType), Literal.create(null, StringType), timeZoneId), null) + checkEvaluation( + ToUnixTimestamp( + Literal.create(null, DateType), Literal(fmt1), timeZoneId), + null) + checkEvaluation(ToUnixTimestamp( + Literal(date1), Literal.create(null, StringType), timeZoneId), + MILLISECONDS.toSeconds( + DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) + checkEvaluation( + ToUnixTimestamp( + Literal("2015-07-24"), + Literal("not a valid format"), timeZoneId), null) - // SPARK-28072 The codegen path for non-literal input should also work - checkEvaluation( - expression = ToUnixTimestamp( - BoundReference(ordinal = 0, dataType = StringType, nullable = true), - BoundReference(ordinal = 1, dataType = StringType, nullable = true), - timeZoneId), - expected = 0L, - inputRow = InternalRow( - UTF8String.fromString(sdf1.format(new Timestamp(0))), UTF8String.fromString(fmt1))) + // SPARK-28072 The codegen path for non-literal input should also work + checkEvaluation( + expression = ToUnixTimestamp( + BoundReference(ordinal = 0, dataType = StringType, nullable = true), + BoundReference(ordinal = 1, dataType = StringType, nullable = true), + timeZoneId), + expected = 0L, + inputRow = InternalRow( + UTF8String.fromString(sdf1.format(new Timestamp(0))), UTF8String.fromString(fmt1))) + } + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index d125581..2d5504a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, ResolvedHint} +import org.apache.spark.sql.catalyst.util.TimestampFormatter import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.expressions.{Aggregator, SparkUserDefinedFunction, UserDefinedAggregator, UserDefinedFunction} import org.apache.spark.sql.internal.SQLConf @@ -2881,7 +2882,7 @@ object functions { * @since 1.5.0 */ def from_unixtime(ut: Column): Column = withExpr { - FromUnixTime(ut.expr, Literal("uuuu-MM-dd HH:mm:ss")) + FromUnixTime(ut.expr, Literal(TimestampFormatter.defaultPattern)) } /** @@ -2913,7 +2914,7 @@ object functions { * @since 1.5.0 */ def unix_timestamp(): Column = withExpr { - UnixTimestamp(CurrentTimestamp(), Literal("uuuu-MM-dd HH:mm:ss")) + UnixTimestamp(CurrentTimestamp(), Literal(TimestampFormatter.defaultPattern)) } /** @@ -2927,7 +2928,7 @@ object functions { * @since 1.5.0 */ def unix_timestamp(s: Column): Column = withExpr { - UnixTimestamp(s.expr, Literal("uuuu-MM-dd HH:mm:ss")) + UnixTimestamp(s.expr, Literal(TimestampFormatter.defaultPattern)) } /** diff --git a/sql/core/src/test/resources/test-data/bad_after_good.csv b/sql/core/src/test/resources/test-data/bad_after_good.csv index 4621a7d..1a7c265 100644 --- a/sql/core/src/test/resources/test-data/bad_after_good.csv +++ b/sql/core/src/test/resources/test-data/bad_after_good.csv @@ -1,2 +1,2 @@ "good record",1999-08-01 -"bad record",1999-088-01 +"bad record",1999-088_01 diff --git a/sql/core/src/test/resources/test-data/value-malformed.csv b/sql/core/src/test/resources/test-data/value-malformed.csv index 8945ed7..6e6f08f 100644 --- a/sql/core/src/test/resources/test-data/value-malformed.csv +++ b/sql/core/src/test/resources/test-data/value-malformed.csv @@ -1,2 +1,2 @@ -0,2013-111-11 12:13:14 +0,2013-111_11 12:13:14 1,1983-08-04 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index bb8cdf3..41d53c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -96,15 +96,19 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { } test("date format") { - val df = Seq((d, sdf.format(d), ts)).toDF("a", "b", "c") + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + val df = Seq((d, sdf.format(d), ts)).toDF("a", "b", "c") - checkAnswer( - df.select(date_format($"a", "y"), date_format($"b", "y"), date_format($"c", "y")), - Row("2015", "2015", "2013")) + checkAnswer( + df.select(date_format($"a", "y"), date_format($"b", "y"), date_format($"c", "y")), + Row("2015", "2015", "2013")) - checkAnswer( - df.selectExpr("date_format(a, 'y')", "date_format(b, 'y')", "date_format(c, 'y')"), - Row("2015", "2015", "2013")) + checkAnswer( + df.selectExpr("date_format(a, 'y')", "date_format(b, 'y')", "date_format(c, 'y')"), + Row("2015", "2015", "2013")) + } + } } test("year") { @@ -525,170 +529,194 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { } test("from_unixtime") { - val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US) - val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" - val sdf2 = new SimpleDateFormat(fmt2, Locale.US) - val fmt3 = "yy-MM-dd HH-mm-ss" - val sdf3 = new SimpleDateFormat(fmt3, Locale.US) - val df = Seq((1000, "yyyy-MM-dd HH:mm:ss.SSS"), (-1000, "yy-MM-dd HH-mm-ss")).toDF("a", "b") - checkAnswer( - df.select(from_unixtime(col("a"))), - Seq(Row(sdf1.format(new Timestamp(1000000))), Row(sdf1.format(new Timestamp(-1000000))))) - checkAnswer( - df.select(from_unixtime(col("a"), fmt2)), - Seq(Row(sdf2.format(new Timestamp(1000000))), Row(sdf2.format(new Timestamp(-1000000))))) - checkAnswer( - df.select(from_unixtime(col("a"), fmt3)), - Seq(Row(sdf3.format(new Timestamp(1000000))), Row(sdf3.format(new Timestamp(-1000000))))) - checkAnswer( - df.selectExpr("from_unixtime(a)"), - Seq(Row(sdf1.format(new Timestamp(1000000))), Row(sdf1.format(new Timestamp(-1000000))))) - checkAnswer( - df.selectExpr(s"from_unixtime(a, '$fmt2')"), - Seq(Row(sdf2.format(new Timestamp(1000000))), Row(sdf2.format(new Timestamp(-1000000))))) - checkAnswer( - df.selectExpr(s"from_unixtime(a, '$fmt3')"), - Seq(Row(sdf3.format(new Timestamp(1000000))), Row(sdf3.format(new Timestamp(-1000000))))) + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US) + val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" + val sdf2 = new SimpleDateFormat(fmt2, Locale.US) + val fmt3 = "yy-MM-dd HH-mm-ss" + val sdf3 = new SimpleDateFormat(fmt3, Locale.US) + val df = Seq((1000, "yyyy-MM-dd HH:mm:ss.SSS"), (-1000, "yy-MM-dd HH-mm-ss")).toDF("a", "b") + checkAnswer( + df.select(from_unixtime(col("a"))), + Seq(Row(sdf1.format(new Timestamp(1000000))), Row(sdf1.format(new Timestamp(-1000000))))) + checkAnswer( + df.select(from_unixtime(col("a"), fmt2)), + Seq(Row(sdf2.format(new Timestamp(1000000))), Row(sdf2.format(new Timestamp(-1000000))))) + checkAnswer( + df.select(from_unixtime(col("a"), fmt3)), + Seq(Row(sdf3.format(new Timestamp(1000000))), Row(sdf3.format(new Timestamp(-1000000))))) + checkAnswer( + df.selectExpr("from_unixtime(a)"), + Seq(Row(sdf1.format(new Timestamp(1000000))), Row(sdf1.format(new Timestamp(-1000000))))) + checkAnswer( + df.selectExpr(s"from_unixtime(a, '$fmt2')"), + Seq(Row(sdf2.format(new Timestamp(1000000))), Row(sdf2.format(new Timestamp(-1000000))))) + checkAnswer( + df.selectExpr(s"from_unixtime(a, '$fmt3')"), + Seq(Row(sdf3.format(new Timestamp(1000000))), Row(sdf3.format(new Timestamp(-1000000))))) + } + } } private def secs(millis: Long): Long = TimeUnit.MILLISECONDS.toSeconds(millis) test("unix_timestamp") { - val date1 = Date.valueOf("2015-07-24") - val date2 = Date.valueOf("2015-07-25") - val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3") - val ts2 = Timestamp.valueOf("2015-07-25 02:02:02.2") - val s1 = "2015/07/24 10:00:00.5" - val s2 = "2015/07/25 02:02:02.6" - val ss1 = "2015-07-24 10:00:00" - val ss2 = "2015-07-25 02:02:02" - val fmt = "yyyy/MM/dd HH:mm:ss.S" - val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss") - checkAnswer(df.select(unix_timestamp(col("ts"))), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - checkAnswer(df.select(unix_timestamp(col("ss"))), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - checkAnswer(df.select(unix_timestamp(col("d"), fmt)), Seq( - Row(secs(date1.getTime)), Row(secs(date2.getTime)))) - checkAnswer(df.select(unix_timestamp(col("s"), fmt)), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - checkAnswer(df.selectExpr("unix_timestamp(ts)"), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - checkAnswer(df.selectExpr("unix_timestamp(ss)"), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - checkAnswer(df.selectExpr(s"unix_timestamp(d, '$fmt')"), Seq( - Row(secs(date1.getTime)), Row(secs(date2.getTime)))) - checkAnswer(df.selectExpr(s"unix_timestamp(s, '$fmt')"), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - - val x1 = "2015-07-24 10:00:00" - val x2 = "2015-25-07 02:02:02" - val x3 = "2015-07-24 25:02:02" - val x4 = "2015-24-07 26:02:02" - val ts3 = Timestamp.valueOf("2015-07-24 02:25:02") - val ts4 = Timestamp.valueOf("2015-07-24 00:10:00") - - val df1 = Seq(x1, x2, x3, x4).toDF("x") - checkAnswer(df1.select(unix_timestamp(col("x"))), Seq( - Row(secs(ts1.getTime)), Row(null), Row(null), Row(null))) - checkAnswer(df1.selectExpr("unix_timestamp(x)"), Seq( - Row(secs(ts1.getTime)), Row(null), Row(null), Row(null))) - checkAnswer(df1.select(unix_timestamp(col("x"), "yyyy-dd-MM HH:mm:ss")), Seq( - Row(null), Row(secs(ts2.getTime)), Row(null), Row(null))) - checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd mm:HH:ss')"), Seq( - Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), Row(null))) - - // invalid format - checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd aa:HH:ss')"), Seq( - Row(null), Row(null), Row(null), Row(null))) - - // february - val y1 = "2016-02-29" - val y2 = "2017-02-29" - val ts5 = Timestamp.valueOf("2016-02-29 00:00:00") - val df2 = Seq(y1, y2).toDF("y") - checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq( - Row(secs(ts5.getTime)), Row(null))) - - val now = sql("select unix_timestamp()").collect().head.getLong(0) - checkAnswer( - sql(s"select cast ($now as timestamp)"), - Row(new java.util.Date(TimeUnit.SECONDS.toMillis(now)))) + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + val date1 = Date.valueOf("2015-07-24") + val date2 = Date.valueOf("2015-07-25") + val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3") + val ts2 = Timestamp.valueOf("2015-07-25 02:02:02.2") + val s1 = "2015/07/24 10:00:00.5" + val s2 = "2015/07/25 02:02:02.6" + val ss1 = "2015-07-24 10:00:00" + val ss2 = "2015-07-25 02:02:02" + val fmt = "yyyy/MM/dd HH:mm:ss.S" + val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss") + checkAnswer(df.select(unix_timestamp(col("ts"))), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.select(unix_timestamp(col("ss"))), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.select(unix_timestamp(col("d"), fmt)), Seq( + Row(secs(date1.getTime)), Row(secs(date2.getTime)))) + checkAnswer(df.select(unix_timestamp(col("s"), fmt)), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.selectExpr("unix_timestamp(ts)"), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.selectExpr("unix_timestamp(ss)"), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.selectExpr(s"unix_timestamp(d, '$fmt')"), Seq( + Row(secs(date1.getTime)), Row(secs(date2.getTime)))) + checkAnswer(df.selectExpr(s"unix_timestamp(s, '$fmt')"), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + + val x1 = "2015-07-24 10:00:00" + val x2 = "2015-25-07 02:02:02" + val x3 = "2015-07-24 25:02:02" + val x4 = "2015-24-07 26:02:02" + val ts3 = Timestamp.valueOf("2015-07-24 02:25:02") + val ts4 = Timestamp.valueOf("2015-07-24 00:10:00") + + val df1 = Seq(x1, x2, x3, x4).toDF("x") + checkAnswer(df1.select(unix_timestamp(col("x"))), Seq( + Row(secs(ts1.getTime)), Row(null), Row(null), Row(null))) + checkAnswer(df1.selectExpr("unix_timestamp(x)"), Seq( + Row(secs(ts1.getTime)), Row(null), Row(null), Row(null))) + checkAnswer(df1.select(unix_timestamp(col("x"), "yyyy-dd-MM HH:mm:ss")), Seq( + Row(null), Row(secs(ts2.getTime)), Row(null), Row(null))) + checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd mm:HH:ss')"), Seq( + Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), Row(null))) + + // invalid format + checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd aa:HH:ss')"), Seq( + Row(null), Row(null), Row(null), Row(null))) + + // february + val y1 = "2016-02-29" + val y2 = "2017-02-29" + val ts5 = Timestamp.valueOf("2016-02-29 00:00:00") + val df2 = Seq(y1, y2).toDF("y") + checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq( + Row(secs(ts5.getTime)), Row(null))) + + val now = sql("select unix_timestamp()").collect().head.getLong(0) + checkAnswer( + sql(s"select cast ($now as timestamp)"), + Row(new java.util.Date(TimeUnit.SECONDS.toMillis(now)))) + } + } } test("to_unix_timestamp") { - val date1 = Date.valueOf("2015-07-24") - val date2 = Date.valueOf("2015-07-25") - val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3") - val ts2 = Timestamp.valueOf("2015-07-25 02:02:02.2") - val s1 = "2015/07/24 10:00:00.5" - val s2 = "2015/07/25 02:02:02.6" - val ss1 = "2015-07-24 10:00:00" - val ss2 = "2015-07-25 02:02:02" - val fmt = "yyyy/MM/dd HH:mm:ss.S" - val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss") - checkAnswer(df.selectExpr("to_unix_timestamp(ts)"), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - checkAnswer(df.selectExpr("to_unix_timestamp(ss)"), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - checkAnswer(df.selectExpr(s"to_unix_timestamp(d, '$fmt')"), Seq( - Row(secs(date1.getTime)), Row(secs(date2.getTime)))) - checkAnswer(df.selectExpr(s"to_unix_timestamp(s, '$fmt')"), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - - val x1 = "2015-07-24 10:00:00" - val x2 = "2015-25-07 02:02:02" - val x3 = "2015-07-24 25:02:02" - val x4 = "2015-24-07 26:02:02" - val ts3 = Timestamp.valueOf("2015-07-24 02:25:02") - val ts4 = Timestamp.valueOf("2015-07-24 00:10:00") - - val df1 = Seq(x1, x2, x3, x4).toDF("x") - checkAnswer(df1.selectExpr("to_unix_timestamp(x)"), Seq( - Row(secs(ts1.getTime)), Row(null), Row(null), Row(null))) - checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd mm:HH:ss')"), Seq( - Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), Row(null))) - - // february - val y1 = "2016-02-29" - val y2 = "2017-02-29" - val ts5 = Timestamp.valueOf("2016-02-29 00:00:00") - val df2 = Seq(y1, y2).toDF("y") - checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq( - Row(secs(ts5.getTime)), Row(null))) - - // invalid format - checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd bb:HH:ss')"), Seq( - Row(null), Row(null), Row(null), Row(null))) + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + val date1 = Date.valueOf("2015-07-24") + val date2 = Date.valueOf("2015-07-25") + val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3") + val ts2 = Timestamp.valueOf("2015-07-25 02:02:02.2") + val s1 = "2015/07/24 10:00:00.5" + val s2 = "2015/07/25 02:02:02.6" + val ss1 = "2015-07-24 10:00:00" + val ss2 = "2015-07-25 02:02:02" + val fmt = "yyyy/MM/dd HH:mm:ss.S" + val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss") + checkAnswer(df.selectExpr("to_unix_timestamp(ts)"), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.selectExpr("to_unix_timestamp(ss)"), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.selectExpr(s"to_unix_timestamp(d, '$fmt')"), Seq( + Row(secs(date1.getTime)), Row(secs(date2.getTime)))) + checkAnswer(df.selectExpr(s"to_unix_timestamp(s, '$fmt')"), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + + val x1 = "2015-07-24 10:00:00" + val x2 = "2015-25-07 02:02:02" + val x3 = "2015-07-24 25:02:02" + val x4 = "2015-24-07 26:02:02" + val ts3 = Timestamp.valueOf("2015-07-24 02:25:02") + val ts4 = Timestamp.valueOf("2015-07-24 00:10:00") + + val df1 = Seq(x1, x2, x3, x4).toDF("x") + checkAnswer(df1.selectExpr("to_unix_timestamp(x)"), Seq( + Row(secs(ts1.getTime)), Row(null), Row(null), Row(null))) + checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd mm:HH:ss')"), Seq( + Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), Row(null))) + + // february + val y1 = "2016-02-29" + val y2 = "2017-02-29" + val ts5 = Timestamp.valueOf("2016-02-29 00:00:00") + val df2 = Seq(y1, y2).toDF("y") + checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq( + Row(secs(ts5.getTime)), Row(null))) + + // invalid format + checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd bb:HH:ss')"), Seq( + Row(null), Row(null), Row(null), Row(null))) + } + } } test("to_timestamp") { - val date1 = Date.valueOf("2015-07-24") - val date2 = Date.valueOf("2015-07-25") - val ts_date1 = Timestamp.valueOf("2015-07-24 00:00:00") - val ts_date2 = Timestamp.valueOf("2015-07-25 00:00:00") - val ts1 = Timestamp.valueOf("2015-07-24 10:00:00") - val ts2 = Timestamp.valueOf("2015-07-25 02:02:02") - val s1 = "2015/07/24 10:00:00.5" - val s2 = "2015/07/25 02:02:02.6" - val ts1m = Timestamp.valueOf("2015-07-24 10:00:00.5") - val ts2m = Timestamp.valueOf("2015-07-25 02:02:02.6") - val ss1 = "2015-07-24 10:00:00" - val ss2 = "2015-07-25 02:02:02" - val fmt = "yyyy/MM/dd HH:mm:ss.S" - val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss") - - checkAnswer(df.select(to_timestamp(col("ss"))), - df.select(unix_timestamp(col("ss")).cast("timestamp"))) - checkAnswer(df.select(to_timestamp(col("ss"))), Seq( - Row(ts1), Row(ts2))) - checkAnswer(df.select(to_timestamp(col("s"), fmt)), Seq( - Row(ts1m), Row(ts2m))) - checkAnswer(df.select(to_timestamp(col("ts"), fmt)), Seq( - Row(ts1), Row(ts2))) - checkAnswer(df.select(to_timestamp(col("d"), "yyyy-MM-dd")), Seq( - Row(ts_date1), Row(ts_date2))) + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + val date1 = Date.valueOf("2015-07-24") + val date2 = Date.valueOf("2015-07-25") + val ts_date1 = Timestamp.valueOf("2015-07-24 00:00:00") + val ts_date2 = Timestamp.valueOf("2015-07-25 00:00:00") + val ts1 = Timestamp.valueOf("2015-07-24 10:00:00") + val ts2 = Timestamp.valueOf("2015-07-25 02:02:02") + val s1 = "2015/07/24 10:00:00.5" + val s2 = "2015/07/25 02:02:02.6" + val ts1m = Timestamp.valueOf("2015-07-24 10:00:00.5") + val ts2m = Timestamp.valueOf("2015-07-25 02:02:02.6") + val ss1 = "2015-07-24 10:00:00" + val ss2 = "2015-07-25 02:02:02" + val fmt = "yyyy/MM/dd HH:mm:ss.S" + val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss") + + checkAnswer(df.select(to_timestamp(col("ss"))), + df.select(unix_timestamp(col("ss")).cast("timestamp"))) + checkAnswer(df.select(to_timestamp(col("ss"))), Seq( + Row(ts1), Row(ts2))) + if (legacyParser) { + // In Spark 2.4 and earlier, to_timestamp() parses in seconds precision and cuts off + // the fractional part of seconds. The behavior was changed by SPARK-27438. + val legacyFmt = "yyyy/MM/dd HH:mm:ss" + checkAnswer(df.select(to_timestamp(col("s"), legacyFmt)), Seq( + Row(ts1), Row(ts2))) + } else { + checkAnswer(df.select(to_timestamp(col("s"), fmt)), Seq( + Row(ts1m), Row(ts2m))) + } + checkAnswer(df.select(to_timestamp(col("ts"), fmt)), Seq( + Row(ts1), Row(ts2))) + checkAnswer(df.select(to_timestamp(col("d"), "yyyy-MM-dd")), Seq( + Row(ts_date1), Row(ts_date2))) + } + } } test("datediff") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 97dfbbd..b1105b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1182,7 +1182,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa .schema(schemaWithCorrField1) .csv(testFile(valueMalformedFile)) checkAnswer(df2, - Row(0, null, "0,2013-111-11 12:13:14") :: + Row(0, null, "0,2013-111_11 12:13:14") :: Row(1, java.sql.Date.valueOf("1983-08-04"), null) :: Nil) @@ -1199,7 +1199,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa .schema(schemaWithCorrField2) .csv(testFile(valueMalformedFile)) checkAnswer(df3, - Row(0, "0,2013-111-11 12:13:14", null) :: + Row(0, "0,2013-111_11 12:13:14", null) :: Row(1, null, java.sql.Date.valueOf("1983-08-04")) :: Nil) @@ -1435,7 +1435,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa assert(df.filter($"_corrupt_record".isNull).count() == 1) checkAnswer( df.select(columnNameOfCorruptRecord), - Row("0,2013-111-11 12:13:14") :: Row(null) :: Nil + Row("0,2013-111_11 12:13:14") :: Row(null) :: Nil ) } @@ -2093,7 +2093,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa Seq("csv", "").foreach { reader => withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> reader) { withTempPath { path => - val df = Seq(("0", "2013-111-11")).toDF("a", "b") + val df = Seq(("0", "2013-111_11")).toDF("a", "b") df.write .option("header", "true") .csv(path.getAbsolutePath) @@ -2109,7 +2109,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) .schema(schemaWithCorrField) .csv(path.getAbsoluteFile.toString) - checkAnswer(readDF, Row(0, null, "0,2013-111-11") :: Nil) + checkAnswer(readDF, Row(0, null, "0,2013-111_11") :: Nil) } } } @@ -2216,7 +2216,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa val readback = spark.read .option("mode", mode) .option("header", true) - .option("timestampFormat", "uuuu-MM-dd HH:mm:ss") + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") .option("multiLine", multiLine) .schema("c0 string, c1 integer, c2 timestamp") .csv(path.getAbsolutePath) @@ -2235,7 +2235,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa } test("filters push down - malformed input in PERMISSIVE mode") { - val invalidTs = "2019-123-14 20:35:30" + val invalidTs = "2019-123_14 20:35:30" val invalidRow = s"0,$invalidTs,999" val validTs = "2019-12-14 20:35:30" Seq(true, false).foreach { filterPushdown => @@ -2252,7 +2252,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa .option("mode", "PERMISSIVE") .option("columnNameOfCorruptRecord", "c3") .option("header", true) - .option("timestampFormat", "uuuu-MM-dd HH:mm:ss") + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") .schema("c0 integer, c1 timestamp, c2 integer, c3 string") .csv(path.getAbsolutePath) .where(condition) @@ -2309,3 +2309,10 @@ class CSVv2Suite extends CSVSuite { .sparkConf .set(SQLConf.USE_V1_SOURCE_LIST, "") } + +class CSVLegacyTimeParserSuite extends CSVSuite { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.LEGACY_TIME_PARSER_ENABLED, true) +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index b20da22..7abe818 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2572,3 +2572,10 @@ class JsonV2Suite extends JsonSuite { .sparkConf .set(SQLConf.USE_V1_SOURCE_LIST, "") } + +class JsonLegacyTimeParserSuite extends JsonSuite { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.LEGACY_TIME_PARSER_ENABLED, true) +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org