This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 2a059e6  [SPARK-30788][SQL] Support `SimpleDateFormat` and 
`FastDateFormat` as legacy date/timestamp formatters
2a059e6 is described below

commit 2a059e65bae93ddb61f7154d81da3fa0c2dcb669
Author: Maxim Gekk <max.g...@gmail.com>
AuthorDate: Wed Feb 12 20:12:38 2020 +0800

    [SPARK-30788][SQL] Support `SimpleDateFormat` and `FastDateFormat` as 
legacy date/timestamp formatters
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to add legacy date/timestamp formatters based on 
`SimpleDateFormat` and `FastDateFormat`:
    - `LegacyFastTimestampFormatter` - uses `FastDateFormat` and supports 
parsing/formatting in microsecond precision. The code was borrowed from Spark 
2.4, see https://github.com/apache/spark/pull/26507 & 
https://github.com/apache/spark/pull/26582
    - `LegacySimpleTimestampFormatter` uses `SimpleDateFormat`, and support the 
`lenient` mode. When the `lenient` parameter is set to `false`, the parser 
become much stronger in checking its input.
    
    ### Why are the changes needed?
    Spark 2.4.x uses the following parsers for parsing/formatting 
date/timestamp strings:
    - `DateTimeFormat` in CSV/JSON datasource
    - `SimpleDateFormat` - is used in JDBC datasource, in partitions parsing.
    - `SimpleDateFormat` in strong mode (`lenient = false`), see 
https://github.com/apache/spark/blob/branch-2.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala#L124.
 It is used by the `date_format`, `from_unixtime`, `unix_timestamp` and 
`to_unix_timestamp` functions.
    
    The PR aims to make Spark 3.0 compatible with Spark 2.4.x in all those 
cases when `spark.sql.legacy.timeParser.enabled` is set to `true`.
    
    ### Does this PR introduce any user-facing change?
    This shouldn't change behavior with default settings. If 
`spark.sql.legacy.timeParser.enabled` is set to `true`, users should observe 
behavior of Spark 2.4.
    
    ### How was this patch tested?
    - Modified tests in `DateExpressionsSuite` to check the legacy parser - 
`SimpleDateFormat`.
    - Added `CSVLegacyTimeParserSuite` and `JsonLegacyTimeParserSuite` to run 
`CSVSuite` and `JsonSuite` with the legacy parser - `FastDateFormat`.
    
    Closes #27524 from MaxGekk/timestamp-formatter-legacy-fallback.
    
    Authored-by: Maxim Gekk <max.g...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit c1986204e59f1e8cc4b611d5a578cb248cb74c28)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/csv/CSVInferSchema.scala    |   4 +-
 .../apache/spark/sql/catalyst/csv/CSVOptions.scala |   4 +-
 .../sql/catalyst/csv/UnivocityGenerator.scala      |   7 +-
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |   7 +-
 .../catalyst/expressions/datetimeExpressions.scala |  52 +--
 .../spark/sql/catalyst/json/JSONOptions.scala      |   4 +-
 .../spark/sql/catalyst/json/JacksonGenerator.scala |   7 +-
 .../spark/sql/catalyst/json/JacksonParser.scala    |   7 +-
 .../spark/sql/catalyst/json/JsonInferSchema.scala  |   4 +-
 .../spark/sql/catalyst/util/DateFormatter.scala    |  66 +++-
 .../sql/catalyst/util/TimestampFormatter.scala     | 132 ++++++-
 .../scala/org/apache/spark/sql/types/Decimal.scala |   2 +-
 .../expressions/DateExpressionsSuite.scala         | 390 +++++++++++----------
 .../scala/org/apache/spark/sql/functions.scala     |   7 +-
 .../test/resources/test-data/bad_after_good.csv    |   2 +-
 .../test/resources/test-data/value-malformed.csv   |   2 +-
 .../org/apache/spark/sql/DateFunctionsSuite.scala  | 346 +++++++++---------
 .../sql/execution/datasources/csv/CSVSuite.scala   |  23 +-
 .../sql/execution/datasources/json/JsonSuite.scala |   7 +
 19 files changed, 654 insertions(+), 419 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index 03cc3cb..c6a0318 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -24,6 +24,7 @@ import scala.util.control.Exception.allCatch
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.TypeCoercion
 import org.apache.spark.sql.catalyst.expressions.ExprUtils
+import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.catalyst.util.TimestampFormatter
 import org.apache.spark.sql.types._
 
@@ -32,7 +33,8 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
   private val timestampParser = TimestampFormatter(
     options.timestampFormat,
     options.zoneId,
-    options.locale)
+    options.locale,
+    legacyFormat = FAST_DATE_FORMAT)
 
   private val decimalParser = if (options.locale == Locale.US) {
     // Special handling the default locale for backward compatibility
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index 5e40d74..8892037 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -146,10 +146,10 @@ class CSVOptions(
   // A language tag in IETF BCP 47 format
   val locale: Locale = 
parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)
 
-  val dateFormat: String = parameters.getOrElse("dateFormat", "uuuu-MM-dd")
+  val dateFormat: String = parameters.getOrElse("dateFormat", 
DateFormatter.defaultPattern)
 
   val timestampFormat: String =
-    parameters.getOrElse("timestampFormat", "uuuu-MM-dd'T'HH:mm:ss.SSSXXX")
+    parameters.getOrElse("timestampFormat", 
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX")
 
   val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
index 05cb91d..00e3d49 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
@@ -23,6 +23,7 @@ import com.univocity.parsers.csv.CsvWriter
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.types._
 
 class UnivocityGenerator(
@@ -44,11 +45,13 @@ class UnivocityGenerator(
   private val timestampFormatter = TimestampFormatter(
     options.timestampFormat,
     options.zoneId,
-    options.locale)
+    options.locale,
+    legacyFormat = FAST_DATE_FORMAT)
   private val dateFormatter = DateFormatter(
     options.dateFormat,
     options.zoneId,
-    options.locale)
+    options.locale,
+    legacyFormat = FAST_DATE_FORMAT)
 
   private def makeConverter(dataType: DataType): ValueConverter = dataType 
match {
     case DateType =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 5510953..cd69c21 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -27,6 +27,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{ExprUtils, 
GenericInternalRow}
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -86,11 +87,13 @@ class UnivocityParser(
   private val timestampFormatter = TimestampFormatter(
     options.timestampFormat,
     options.zoneId,
-    options.locale)
+    options.locale,
+    legacyFormat = FAST_DATE_FORMAT)
   private val dateFormatter = DateFormatter(
     options.dateFormat,
     options.zoneId,
-    options.locale)
+    options.locale,
+    legacyFormat = FAST_DATE_FORMAT)
 
   private val csvFilters = new CSVFilters(filters, requiredSchema)
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index aa2bd5a..1f4c8c0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -30,9 +30,10 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
-import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, LegacyDateFormats, 
TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.DateTimeConstants._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils._
+import org.apache.spark.sql.catalyst.util.LegacyDateFormats.SIMPLE_DATE_FORMAT
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 
@@ -622,13 +623,15 @@ case class DateFormatClass(left: Expression, right: 
Expression, timeZoneId: Opti
 
   @transient private lazy val formatter: Option[TimestampFormatter] = {
     if (right.foldable) {
-      Option(right.eval()).map(format => TimestampFormatter(format.toString, 
zoneId))
+      Option(right.eval()).map { format =>
+        TimestampFormatter(format.toString, zoneId, legacyFormat = 
SIMPLE_DATE_FORMAT)
+      }
     } else None
   }
 
   override protected def nullSafeEval(timestamp: Any, format: Any): Any = {
     val tf = if (formatter.isEmpty) {
-      TimestampFormatter(format.toString, zoneId)
+      TimestampFormatter(format.toString, zoneId, legacyFormat = 
SIMPLE_DATE_FORMAT)
     } else {
       formatter.get
     }
@@ -643,10 +646,14 @@ case class DateFormatClass(left: Expression, right: 
Expression, timeZoneId: Opti
       })
     }.getOrElse {
       val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
+      val ldf = LegacyDateFormats.getClass.getName.stripSuffix("$")
       val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
       defineCodeGen(ctx, ev, (timestamp, format) => {
-        s"""UTF8String.fromString($tf$$.MODULE$$.apply($format.toString(), 
$zid)
-          .format($timestamp))"""
+        s"""|UTF8String.fromString($tf$$.MODULE$$.apply(
+            |  $format.toString(),
+            |  $zid,
+            |  $ldf$$.MODULE$$.SIMPLE_DATE_FORMAT())
+            |.format($timestamp))""".stripMargin
       })
     }
   }
@@ -688,7 +695,7 @@ case class ToUnixTimestamp(
     copy(timeZoneId = Option(timeZoneId))
 
   def this(time: Expression) = {
-    this(time, Literal("uuuu-MM-dd HH:mm:ss"))
+    this(time, Literal(TimestampFormatter.defaultPattern))
   }
 
   override def prettyName: String = "to_unix_timestamp"
@@ -732,7 +739,7 @@ case class UnixTimestamp(timeExp: Expression, format: 
Expression, timeZoneId: Op
     copy(timeZoneId = Option(timeZoneId))
 
   def this(time: Expression) = {
-    this(time, Literal("uuuu-MM-dd HH:mm:ss"))
+    this(time, Literal(TimestampFormatter.defaultPattern))
   }
 
   def this() = {
@@ -758,7 +765,7 @@ abstract class ToTimestamp
   private lazy val constFormat: UTF8String = 
right.eval().asInstanceOf[UTF8String]
   private lazy val formatter: TimestampFormatter =
     try {
-      TimestampFormatter(constFormat.toString, zoneId)
+      TimestampFormatter(constFormat.toString, zoneId, legacyFormat = 
SIMPLE_DATE_FORMAT)
     } catch {
       case NonFatal(_) => null
     }
@@ -791,8 +798,8 @@ abstract class ToTimestamp
           } else {
             val formatString = f.asInstanceOf[UTF8String].toString
             try {
-              TimestampFormatter(formatString, zoneId).parse(
-                t.asInstanceOf[UTF8String].toString) / downScaleFactor
+              TimestampFormatter(formatString, zoneId, legacyFormat = 
SIMPLE_DATE_FORMAT)
+                .parse(t.asInstanceOf[UTF8String].toString) / downScaleFactor
             } catch {
               case NonFatal(_) => null
             }
@@ -831,13 +838,16 @@ abstract class ToTimestamp
         }
       case StringType =>
         val zid = ctx.addReferenceObj("zoneId", zoneId, 
classOf[ZoneId].getName)
-        val locale = ctx.addReferenceObj("locale", Locale.US)
         val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
+        val ldf = LegacyDateFormats.getClass.getName.stripSuffix("$")
         nullSafeCodeGen(ctx, ev, (string, format) => {
           s"""
             try {
-              ${ev.value} = $tf$$.MODULE$$.apply($format.toString(), $zid, 
$locale)
-                .parse($string.toString()) / $downScaleFactor;
+              ${ev.value} = $tf$$.MODULE$$.apply(
+                $format.toString(),
+                $zid,
+                $ldf$$.MODULE$$.SIMPLE_DATE_FORMAT())
+              .parse($string.toString()) / $downScaleFactor;
             } catch (java.lang.IllegalArgumentException e) {
               ${ev.isNull} = true;
             } catch (java.text.ParseException e) {
@@ -908,7 +918,7 @@ case class FromUnixTime(sec: Expression, format: 
Expression, timeZoneId: Option[
   override def prettyName: String = "from_unixtime"
 
   def this(unix: Expression) = {
-    this(unix, Literal("uuuu-MM-dd HH:mm:ss"))
+    this(unix, Literal(TimestampFormatter.defaultPattern))
   }
 
   override def dataType: DataType = StringType
@@ -922,7 +932,7 @@ case class FromUnixTime(sec: Expression, format: 
Expression, timeZoneId: Option[
   private lazy val constFormat: UTF8String = 
right.eval().asInstanceOf[UTF8String]
   private lazy val formatter: TimestampFormatter =
     try {
-      TimestampFormatter(constFormat.toString, zoneId)
+      TimestampFormatter(constFormat.toString, zoneId, legacyFormat = 
SIMPLE_DATE_FORMAT)
     } catch {
       case NonFatal(_) => null
     }
@@ -948,8 +958,9 @@ case class FromUnixTime(sec: Expression, format: 
Expression, timeZoneId: Option[
           null
         } else {
           try {
-            UTF8String.fromString(TimestampFormatter(f.toString, zoneId)
-              .format(time.asInstanceOf[Long] * MICROS_PER_SECOND))
+            UTF8String.fromString(
+              TimestampFormatter(f.toString, zoneId, legacyFormat = 
SIMPLE_DATE_FORMAT)
+                .format(time.asInstanceOf[Long] * MICROS_PER_SECOND))
           } catch {
             case NonFatal(_) => null
           }
@@ -980,13 +991,14 @@ case class FromUnixTime(sec: Expression, format: 
Expression, timeZoneId: Option[
       }
     } else {
       val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
-      val locale = ctx.addReferenceObj("locale", Locale.US)
       val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
+      val ldf = LegacyDateFormats.getClass.getName.stripSuffix("$")
       nullSafeCodeGen(ctx, ev, (seconds, f) => {
         s"""
         try {
-          ${ev.value} = 
UTF8String.fromString($tf$$.MODULE$$.apply($f.toString(), $zid, $locale).
-            format($seconds * 1000000L));
+          ${ev.value} = UTF8String.fromString(
+            $tf$$.MODULE$$.apply($f.toString(), $zid, 
$ldf$$.MODULE$$.SIMPLE_DATE_FORMAT())
+              .format($seconds * 1000000L));
         } catch (java.lang.IllegalArgumentException e) {
           ${ev.isNull} = true;
         }"""
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index cdf4b46..45c4edf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -88,10 +88,10 @@ private[sql] class JSONOptions(
   val zoneId: ZoneId = DateTimeUtils.getZoneId(
     parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
 
-  val dateFormat: String = parameters.getOrElse("dateFormat", "uuuu-MM-dd")
+  val dateFormat: String = parameters.getOrElse("dateFormat", 
DateFormatter.defaultPattern)
 
   val timestampFormat: String =
-    parameters.getOrElse("timestampFormat", "uuuu-MM-dd'T'HH:mm:ss.SSSXXX")
+    parameters.getOrElse("timestampFormat", 
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX")
 
   val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index 9c63593..141360f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.core._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.types._
 
 /**
@@ -80,11 +81,13 @@ private[sql] class JacksonGenerator(
   private val timestampFormatter = TimestampFormatter(
     options.timestampFormat,
     options.zoneId,
-    options.locale)
+    options.locale,
+    legacyFormat = FAST_DATE_FORMAT)
   private val dateFormatter = DateFormatter(
     options.dateFormat,
     options.zoneId,
-    options.locale)
+    options.locale,
+    legacyFormat = FAST_DATE_FORMAT)
 
   private def makeWriter(dataType: DataType): ValueWriter = dataType match {
     case NullType =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 76efa57..1e408cd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
@@ -58,11 +59,13 @@ class JacksonParser(
   private val timestampFormatter = TimestampFormatter(
     options.timestampFormat,
     options.zoneId,
-    options.locale)
+    options.locale,
+    legacyFormat = FAST_DATE_FORMAT)
   private val dateFormatter = DateFormatter(
     options.dateFormat,
     options.zoneId,
-    options.locale)
+    options.locale,
+    legacyFormat = FAST_DATE_FORMAT)
 
   /**
    * Create a converter which converts the JSON documents held by the 
`JsonParser`
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index f030955..82dd6d0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCoercion
 import org.apache.spark.sql.catalyst.expressions.ExprUtils
 import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -40,7 +41,8 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
   private val timestampFormatter = TimestampFormatter(
     options.timestampFormat,
     options.zoneId,
-    options.locale)
+    options.locale,
+    legacyFormat = FAST_DATE_FORMAT)
 
   /**
    * Infer the type of a collection of json records in three stages:
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
index 28189b6..2cf82d1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.sql.catalyst.util
 
+import java.text.SimpleDateFormat
 import java.time.{LocalDate, ZoneId}
-import java.util.Locale
+import java.util.{Date, Locale}
 
 import org.apache.commons.lang3.time.FastDateFormat
 
@@ -51,41 +52,76 @@ class Iso8601DateFormatter(
   }
 }
 
-class LegacyDateFormatter(pattern: String, locale: Locale) extends 
DateFormatter {
-  @transient
-  private lazy val format = FastDateFormat.getInstance(pattern, locale)
+trait LegacyDateFormatter extends DateFormatter {
+  def parseToDate(s: String): Date
+  def formatDate(d: Date): String
 
   override def parse(s: String): Int = {
-    val milliseconds = format.parse(s).getTime
+    val milliseconds = parseToDate(s).getTime
     DateTimeUtils.millisToDays(milliseconds)
   }
 
   override def format(days: Int): String = {
     val date = DateTimeUtils.toJavaDate(days)
-    format.format(date)
+    formatDate(date)
   }
 }
 
+class LegacyFastDateFormatter(pattern: String, locale: Locale) extends 
LegacyDateFormatter {
+  @transient
+  private lazy val fdf = FastDateFormat.getInstance(pattern, locale)
+  override def parseToDate(s: String): Date = fdf.parse(s)
+  override def formatDate(d: Date): String = fdf.format(d)
+}
+
+class LegacySimpleDateFormatter(pattern: String, locale: Locale) extends 
LegacyDateFormatter {
+  @transient
+  private lazy val sdf = new SimpleDateFormat(pattern, locale)
+  override def parseToDate(s: String): Date = sdf.parse(s)
+  override def formatDate(d: Date): String = sdf.format(d)
+}
+
 object DateFormatter {
+  import LegacyDateFormats._
+
   val defaultLocale: Locale = Locale.US
 
-  def apply(format: String, zoneId: ZoneId, locale: Locale): DateFormatter = {
+  def defaultPattern(): String = {
+    if (SQLConf.get.legacyTimeParserEnabled) "yyyy-MM-dd" else "uuuu-MM-dd"
+  }
+
+  private def getFormatter(
+    format: Option[String],
+    zoneId: ZoneId,
+    locale: Locale = defaultLocale,
+    legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): 
DateFormatter = {
+
+    val pattern = format.getOrElse(defaultPattern)
     if (SQLConf.get.legacyTimeParserEnabled) {
-      new LegacyDateFormatter(format, locale)
+      legacyFormat match {
+        case FAST_DATE_FORMAT =>
+          new LegacyFastDateFormatter(pattern, locale)
+        case SIMPLE_DATE_FORMAT | LENIENT_SIMPLE_DATE_FORMAT =>
+          new LegacySimpleDateFormatter(pattern, locale)
+      }
     } else {
-      new Iso8601DateFormatter(format, zoneId, locale)
+      new Iso8601DateFormatter(pattern, zoneId, locale)
     }
   }
 
+  def apply(
+    format: String,
+    zoneId: ZoneId,
+    locale: Locale,
+    legacyFormat: LegacyDateFormat): DateFormatter = {
+    getFormatter(Some(format), zoneId, locale, legacyFormat)
+  }
+
   def apply(format: String, zoneId: ZoneId): DateFormatter = {
-    apply(format, zoneId, defaultLocale)
+    getFormatter(Some(format), zoneId)
   }
 
   def apply(zoneId: ZoneId): DateFormatter = {
-    if (SQLConf.get.legacyTimeParserEnabled) {
-      new LegacyDateFormatter("yyyy-MM-dd", defaultLocale)
-    } else {
-      new Iso8601DateFormatter("uuuu-MM-dd", zoneId, defaultLocale)
-    }
+    getFormatter(None, zoneId)
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index fe1a4fe..4893a7e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -17,19 +17,20 @@
 
 package org.apache.spark.sql.catalyst.util
 
-import java.text.ParseException
+import java.text.{ParseException, ParsePosition, SimpleDateFormat}
 import java.time._
 import java.time.format.DateTimeParseException
 import java.time.temporal.ChronoField.MICRO_OF_SECOND
 import java.time.temporal.TemporalQueries
-import java.util.{Locale, TimeZone}
+import java.util.{Calendar, GregorianCalendar, Locale, TimeZone}
 import java.util.concurrent.TimeUnit.SECONDS
 
 import org.apache.commons.lang3.time.FastDateFormat
 
-import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.convertSpecialTimestamp
+import org.apache.spark.sql.catalyst.util.DateTimeConstants._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{ 
convertSpecialTimestamp, SQLTimestamp}
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.Decimal
 
 sealed trait TimestampFormatter extends Serializable {
   /**
@@ -90,44 +91,139 @@ class FractionTimestampFormatter(zoneId: ZoneId)
   override protected lazy val formatter = 
DateTimeFormatterHelper.fractionFormatter
 }
 
-class LegacyTimestampFormatter(
+/**
+ * The custom sub-class of `GregorianCalendar` is needed to get access to
+ * protected `fields` immediately after parsing. We cannot use
+ * the `get()` method because it performs normalization of the fraction
+ * part. Accordingly, the `MILLISECOND` field doesn't contain original value.
+ *
+ * Also this class allows to set raw value to the `MILLISECOND` field
+ * directly before formatting.
+ */
+class MicrosCalendar(tz: TimeZone, digitsInFraction: Int)
+  extends GregorianCalendar(tz, Locale.US) {
+  // Converts parsed `MILLISECOND` field to seconds fraction in microsecond 
precision.
+  // For example if the fraction pattern is `SSSS` then `digitsInFraction` = 
4, and
+  // if the `MILLISECOND` field was parsed to `1234`.
+  def getMicros(): SQLTimestamp = {
+    // Append 6 zeros to the field: 1234 -> 1234000000
+    val d = fields(Calendar.MILLISECOND) * MICROS_PER_SECOND
+    // Take the first 6 digits from `d`: 1234000000 -> 123400
+    // The rest contains exactly `digitsInFraction`: `0000` = 10 ^ 
digitsInFraction
+    // So, the result is `(1234 * 1000000) / (10 ^ digitsInFraction)
+    d / Decimal.POW_10(digitsInFraction)
+  }
+
+  // Converts the seconds fraction in microsecond precision to a value
+  // that can be correctly formatted according to the specified fraction 
pattern.
+  // The method performs operations opposite to `getMicros()`.
+  def setMicros(micros: Long): Unit = {
+    val d = micros * Decimal.POW_10(digitsInFraction)
+    fields(Calendar.MILLISECOND) = (d / MICROS_PER_SECOND).toInt
+  }
+}
+
+class LegacyFastTimestampFormatter(
     pattern: String,
     zoneId: ZoneId,
     locale: Locale) extends TimestampFormatter {
 
-  @transient private lazy val format =
+  @transient private lazy val fastDateFormat =
     FastDateFormat.getInstance(pattern, TimeZone.getTimeZone(zoneId), locale)
+  @transient private lazy val cal = new MicrosCalendar(
+    fastDateFormat.getTimeZone,
+    fastDateFormat.getPattern.count(_ == 'S'))
+
+  def parse(s: String): SQLTimestamp = {
+    cal.clear() // Clear the calendar because it can be re-used many times
+    if (!fastDateFormat.parse(s, new ParsePosition(0), cal)) {
+      throw new IllegalArgumentException(s"'$s' is an invalid timestamp")
+    }
+    val micros = cal.getMicros()
+    cal.set(Calendar.MILLISECOND, 0)
+    cal.getTimeInMillis * MICROS_PER_MILLIS + micros
+  }
+
+  def format(timestamp: SQLTimestamp): String = {
+    cal.setTimeInMillis(Math.floorDiv(timestamp, MICROS_PER_SECOND) * 
MILLIS_PER_SECOND)
+    cal.setMicros(Math.floorMod(timestamp, MICROS_PER_SECOND))
+    fastDateFormat.format(cal)
+  }
+}
 
-  protected def toMillis(s: String): Long = format.parse(s).getTime
+class LegacySimpleTimestampFormatter(
+    pattern: String,
+    zoneId: ZoneId,
+    locale: Locale,
+    lenient: Boolean = true) extends TimestampFormatter {
+  @transient private lazy val sdf = {
+    val formatter = new SimpleDateFormat(pattern, locale)
+    formatter.setTimeZone(TimeZone.getTimeZone(zoneId))
+    formatter.setLenient(lenient)
+    formatter
+  }
 
-  override def parse(s: String): Long = toMillis(s) * MICROS_PER_MILLIS
+  override def parse(s: String): Long = {
+    sdf.parse(s).getTime * MICROS_PER_MILLIS
+  }
 
   override def format(us: Long): String = {
-    format.format(DateTimeUtils.toJavaTimestamp(us))
+    val timestamp = DateTimeUtils.toJavaTimestamp(us)
+    sdf.format(timestamp)
   }
 }
 
+object LegacyDateFormats extends Enumeration {
+  type LegacyDateFormat = Value
+  val FAST_DATE_FORMAT, SIMPLE_DATE_FORMAT, LENIENT_SIMPLE_DATE_FORMAT = Value
+}
+
 object TimestampFormatter {
+  import LegacyDateFormats._
+
   val defaultLocale: Locale = Locale.US
 
-  def apply(format: String, zoneId: ZoneId, locale: Locale): 
TimestampFormatter = {
+  def defaultPattern(): String = s"${DateFormatter.defaultPattern()} HH:mm:ss"
+
+  private def getFormatter(
+    format: Option[String],
+    zoneId: ZoneId,
+    locale: Locale = defaultLocale,
+    legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): 
TimestampFormatter = {
+
+    val pattern = format.getOrElse(defaultPattern)
     if (SQLConf.get.legacyTimeParserEnabled) {
-      new LegacyTimestampFormatter(format, zoneId, locale)
+      legacyFormat match {
+        case FAST_DATE_FORMAT =>
+          new LegacyFastTimestampFormatter(pattern, zoneId, locale)
+        case SIMPLE_DATE_FORMAT =>
+          new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient 
= false)
+        case LENIENT_SIMPLE_DATE_FORMAT =>
+          new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient 
= true)
+      }
     } else {
-      new Iso8601TimestampFormatter(format, zoneId, locale)
+      new Iso8601TimestampFormatter(pattern, zoneId, locale)
     }
   }
 
+  def apply(
+    format: String,
+    zoneId: ZoneId,
+    locale: Locale,
+    legacyFormat: LegacyDateFormat): TimestampFormatter = {
+    getFormatter(Some(format), zoneId, locale, legacyFormat)
+  }
+
+  def apply(format: String, zoneId: ZoneId, legacyFormat: LegacyDateFormat): 
TimestampFormatter = {
+    getFormatter(Some(format), zoneId, defaultLocale, legacyFormat)
+  }
+
   def apply(format: String, zoneId: ZoneId): TimestampFormatter = {
-    apply(format, zoneId, defaultLocale)
+    getFormatter(Some(format), zoneId)
   }
 
   def apply(zoneId: ZoneId): TimestampFormatter = {
-    if (SQLConf.get.legacyTimeParserEnabled) {
-      new LegacyTimestampFormatter("yyyy-MM-dd HH:mm:ss", zoneId, 
defaultLocale)
-    } else {
-      new Iso8601TimestampFormatter("uuuu-MM-dd HH:mm:ss", zoneId, 
defaultLocale)
-    }
+    getFormatter(None, zoneId)
   }
 
   def getFractionFormatter(zoneId: ZoneId): TimestampFormatter = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
index 9ce64b0..f32e48e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
@@ -541,7 +541,7 @@ object Decimal {
   /** Maximum number of decimal digits a Long can represent */
   val MAX_LONG_DIGITS = 18
 
-  private val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => 
math.pow(10, i).toLong)
+  val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => math.pow(10, 
i).toLong)
 
   private val BIG_DEC_ZERO = BigDecimal(0)
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index 274d0be..f04149a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util.{DateTimeUtils, 
IntervalUtils, Timesta
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneGMT
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 
@@ -241,41 +242,45 @@ class DateExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("DateFormat") {
-    checkEvaluation(
-      DateFormatClass(Literal.create(null, TimestampType), Literal("y"), 
gmtId),
-      null)
-    checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId),
-      Literal.create(null, StringType), gmtId), null)
-
-    checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId),
-      Literal("y"), gmtId), "2015")
-    checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), gmtId), "2013")
-    checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId),
-      Literal("H"), gmtId), "0")
-    checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), gmtId), "13")
-
-    checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, pstId),
-      Literal("y"), pstId), "2015")
-    checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), pstId), "2013")
-    checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, pstId),
-      Literal("H"), pstId), "0")
-    checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), pstId), "5")
-
-    checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, jstId),
-      Literal("y"), jstId), "2015")
-    checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), jstId), "2013")
-    checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, jstId),
-      Literal("H"), jstId), "0")
-    checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), jstId), "22")
-
-    // SPARK-28072 The codegen path should work
-    checkEvaluation(
-      expression = DateFormatClass(
-        BoundReference(ordinal = 0, dataType = TimestampType, nullable = true),
-        BoundReference(ordinal = 1, dataType = StringType, nullable = true),
-        jstId),
-      expected = "22",
-      inputRow = InternalRow(DateTimeUtils.fromJavaTimestamp(ts), 
UTF8String.fromString("H")))
+    Seq(false, true).foreach { legacyParser =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+        checkEvaluation(
+          DateFormatClass(Literal.create(null, TimestampType), Literal("y"), 
gmtId),
+          null)
+        checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId),
+          Literal.create(null, StringType), gmtId), null)
+
+        checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId),
+          Literal("y"), gmtId), "2015")
+        checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), gmtId), 
"2013")
+        checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId),
+          Literal("H"), gmtId), "0")
+        checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), gmtId), 
"13")
+
+        checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, pstId),
+          Literal("y"), pstId), "2015")
+        checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), pstId), 
"2013")
+        checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, pstId),
+          Literal("H"), pstId), "0")
+        checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), pstId), "5")
+
+        checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, jstId),
+          Literal("y"), jstId), "2015")
+        checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), jstId), 
"2013")
+        checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, jstId),
+          Literal("H"), jstId), "0")
+        checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), jstId), 
"22")
+
+        // SPARK-28072 The codegen path should work
+        checkEvaluation(
+          expression = DateFormatClass(
+            BoundReference(ordinal = 0, dataType = TimestampType, nullable = 
true),
+            BoundReference(ordinal = 1, dataType = StringType, nullable = 
true),
+            jstId),
+          expected = "22",
+          inputRow = InternalRow(DateTimeUtils.fromJavaTimestamp(ts), 
UTF8String.fromString("H")))
+      }
+    }
   }
 
   test("Hour") {
@@ -705,162 +710,189 @@ class DateExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("from_unixtime") {
-    val fmt1 = "yyyy-MM-dd HH:mm:ss"
-    val sdf1 = new SimpleDateFormat(fmt1, Locale.US)
-    val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
-    val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
-    for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) {
-      val timeZoneId = Option(tz.getID)
-      sdf1.setTimeZone(tz)
-      sdf2.setTimeZone(tz)
+    Seq(false, true).foreach { legacyParser =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+        val fmt1 = "yyyy-MM-dd HH:mm:ss"
+        val sdf1 = new SimpleDateFormat(fmt1, Locale.US)
+        val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
+        val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
+        for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) {
+          val timeZoneId = Option(tz.getID)
+          sdf1.setTimeZone(tz)
+          sdf2.setTimeZone(tz)
 
-      checkEvaluation(
-        FromUnixTime(Literal(0L), Literal(fmt1), timeZoneId),
-        sdf1.format(new Timestamp(0)))
-      checkEvaluation(FromUnixTime(
-        Literal(1000L), Literal(fmt1), timeZoneId),
-        sdf1.format(new Timestamp(1000000)))
-      checkEvaluation(
-        FromUnixTime(Literal(-1000L), Literal(fmt2), timeZoneId),
-        sdf2.format(new Timestamp(-1000000)))
-      checkEvaluation(
-        FromUnixTime(Literal.create(null, LongType), Literal.create(null, 
StringType), timeZoneId),
-        null)
-      checkEvaluation(
-        FromUnixTime(Literal.create(null, LongType), Literal(fmt1), 
timeZoneId),
-        null)
-      checkEvaluation(
-        FromUnixTime(Literal(1000L), Literal.create(null, StringType), 
timeZoneId),
-        null)
-      checkEvaluation(
-        FromUnixTime(Literal(0L), Literal("not a valid format"), timeZoneId), 
null)
+          checkEvaluation(
+            FromUnixTime(Literal(0L), Literal(fmt1), timeZoneId),
+            sdf1.format(new Timestamp(0)))
+          checkEvaluation(FromUnixTime(
+            Literal(1000L), Literal(fmt1), timeZoneId),
+            sdf1.format(new Timestamp(1000000)))
+          checkEvaluation(
+            FromUnixTime(Literal(-1000L), Literal(fmt2), timeZoneId),
+            sdf2.format(new Timestamp(-1000000)))
+          checkEvaluation(
+            FromUnixTime(
+              Literal.create(null, LongType),
+              Literal.create(null, StringType), timeZoneId),
+            null)
+          checkEvaluation(
+            FromUnixTime(Literal.create(null, LongType), Literal(fmt1), 
timeZoneId),
+            null)
+          checkEvaluation(
+            FromUnixTime(Literal(1000L), Literal.create(null, StringType), 
timeZoneId),
+            null)
+          checkEvaluation(
+            FromUnixTime(Literal(0L), Literal("not a valid format"), 
timeZoneId), null)
 
-      // SPARK-28072 The codegen path for non-literal input should also work
-      checkEvaluation(
-        expression = FromUnixTime(
-          BoundReference(ordinal = 0, dataType = LongType, nullable = true),
-          BoundReference(ordinal = 1, dataType = StringType, nullable = true),
-          timeZoneId),
-        expected = UTF8String.fromString(sdf1.format(new Timestamp(0))),
-        inputRow = InternalRow(0L, UTF8String.fromString(fmt1)))
+          // SPARK-28072 The codegen path for non-literal input should also 
work
+          checkEvaluation(
+            expression = FromUnixTime(
+              BoundReference(ordinal = 0, dataType = LongType, nullable = 
true),
+              BoundReference(ordinal = 1, dataType = StringType, nullable = 
true),
+              timeZoneId),
+            expected = UTF8String.fromString(sdf1.format(new Timestamp(0))),
+            inputRow = InternalRow(0L, UTF8String.fromString(fmt1)))
+        }
+      }
     }
   }
 
   test("unix_timestamp") {
-    val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
-    val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
-    val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
-    val fmt3 = "yy-MM-dd"
-    val sdf3 = new SimpleDateFormat(fmt3, Locale.US)
-    sdf3.setTimeZone(TimeZoneGMT)
-
-    withDefaultTimeZone(TimeZoneGMT) {
-      for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) {
-        val timeZoneId = Option(tz.getID)
-        sdf1.setTimeZone(tz)
-        sdf2.setTimeZone(tz)
-
-        val date1 = Date.valueOf("2015-07-24")
-        checkEvaluation(UnixTimestamp(
-          Literal(sdf1.format(new Timestamp(0))), Literal("yyyy-MM-dd 
HH:mm:ss"), timeZoneId), 0L)
-        checkEvaluation(UnixTimestamp(
-          Literal(sdf1.format(new Timestamp(1000000))), Literal("yyyy-MM-dd 
HH:mm:ss"), timeZoneId),
-          1000L)
-        checkEvaluation(
-          UnixTimestamp(
-            Literal(new Timestamp(1000000)), Literal("yyyy-MM-dd HH:mm:ss"), 
timeZoneId),
-          1000L)
-        checkEvaluation(
-          UnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"), 
timeZoneId),
-          
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1),
 tz)))
-        checkEvaluation(
-          UnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), 
Literal(fmt2), timeZoneId),
-          -1000L)
-        checkEvaluation(UnixTimestamp(
-          Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), 
timeZoneId),
-          MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(
-            DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz)))
-        val t1 = UnixTimestamp(
-          CurrentTimestamp(), Literal("yyyy-MM-dd 
HH:mm:ss")).eval().asInstanceOf[Long]
-        val t2 = UnixTimestamp(
-          CurrentTimestamp(), Literal("yyyy-MM-dd 
HH:mm:ss")).eval().asInstanceOf[Long]
-        assert(t2 - t1 <= 1)
-        checkEvaluation(
-          UnixTimestamp(
-            Literal.create(null, DateType), Literal.create(null, StringType), 
timeZoneId),
-          null)
-        checkEvaluation(
-          UnixTimestamp(Literal.create(null, DateType), Literal("yyyy-MM-dd 
HH:mm:ss"), timeZoneId),
-          null)
-        checkEvaluation(
-          UnixTimestamp(Literal(date1), Literal.create(null, StringType), 
timeZoneId),
-          
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1),
 tz)))
-        checkEvaluation(
-          UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format"), 
timeZoneId), null)
+    Seq(false, true).foreach { legacyParser =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+        val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
+        val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
+        val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
+        val fmt3 = "yy-MM-dd"
+        val sdf3 = new SimpleDateFormat(fmt3, Locale.US)
+        sdf3.setTimeZone(TimeZoneGMT)
+
+        withDefaultTimeZone(TimeZoneGMT) {
+          for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) {
+            val timeZoneId = Option(tz.getID)
+            sdf1.setTimeZone(tz)
+            sdf2.setTimeZone(tz)
+
+            val date1 = Date.valueOf("2015-07-24")
+            checkEvaluation(UnixTimestamp(
+              Literal(sdf1.format(new Timestamp(0))),
+              Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), 0L)
+            checkEvaluation(UnixTimestamp(
+              Literal(sdf1.format(new Timestamp(1000000))),
+              Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId),
+              1000L)
+            checkEvaluation(
+              UnixTimestamp(
+                Literal(new Timestamp(1000000)), Literal("yyyy-MM-dd 
HH:mm:ss"), timeZoneId),
+              1000L)
+            checkEvaluation(
+              UnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"), 
timeZoneId),
+              MILLISECONDS.toSeconds(
+                DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), 
tz)))
+            checkEvaluation(
+              UnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))),
+                Literal(fmt2), timeZoneId),
+              -1000L)
+            checkEvaluation(UnixTimestamp(
+              Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), 
timeZoneId),
+              MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(
+                DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz)))
+            val t1 = UnixTimestamp(
+              CurrentTimestamp(), Literal("yyyy-MM-dd 
HH:mm:ss")).eval().asInstanceOf[Long]
+            val t2 = UnixTimestamp(
+              CurrentTimestamp(), Literal("yyyy-MM-dd 
HH:mm:ss")).eval().asInstanceOf[Long]
+            assert(t2 - t1 <= 1)
+            checkEvaluation(
+              UnixTimestamp(
+                Literal.create(null, DateType), Literal.create(null, 
StringType), timeZoneId),
+              null)
+            checkEvaluation(
+              UnixTimestamp(
+                Literal.create(null, DateType),
+                Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId),
+              null)
+            checkEvaluation(
+              UnixTimestamp(Literal(date1), Literal.create(null, StringType), 
timeZoneId),
+              MILLISECONDS.toSeconds(
+                DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), 
tz)))
+            checkEvaluation(
+              UnixTimestamp(Literal("2015-07-24"), Literal("not a valid 
format"), timeZoneId), null)
+          }
+        }
       }
     }
   }
 
   test("to_unix_timestamp") {
-    val fmt1 = "yyyy-MM-dd HH:mm:ss"
-    val sdf1 = new SimpleDateFormat(fmt1, Locale.US)
-    val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
-    val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
-    val fmt3 = "yy-MM-dd"
-    val sdf3 = new SimpleDateFormat(fmt3, Locale.US)
-    sdf3.setTimeZone(TimeZoneGMT)
-
-    withDefaultTimeZone(TimeZoneGMT) {
-      for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) {
-        val timeZoneId = Option(tz.getID)
-        sdf1.setTimeZone(tz)
-        sdf2.setTimeZone(tz)
-
-        val date1 = Date.valueOf("2015-07-24")
-        checkEvaluation(ToUnixTimestamp(
-          Literal(sdf1.format(new Timestamp(0))), Literal(fmt1), timeZoneId), 
0L)
-        checkEvaluation(ToUnixTimestamp(
-          Literal(sdf1.format(new Timestamp(1000000))), Literal(fmt1), 
timeZoneId),
-          1000L)
-        checkEvaluation(ToUnixTimestamp(
-          Literal(new Timestamp(1000000)), Literal(fmt1)),
-          1000L)
-        checkEvaluation(
-          ToUnixTimestamp(Literal(date1), Literal(fmt1), timeZoneId),
-          
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1),
 tz)))
-        checkEvaluation(
-          ToUnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), 
Literal(fmt2), timeZoneId),
-          -1000L)
-        checkEvaluation(ToUnixTimestamp(
-          Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), 
timeZoneId),
-          MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(
-            DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz)))
-        val t1 = ToUnixTimestamp(
-          CurrentTimestamp(), Literal(fmt1)).eval().asInstanceOf[Long]
-        val t2 = ToUnixTimestamp(
-          CurrentTimestamp(), Literal(fmt1)).eval().asInstanceOf[Long]
-        assert(t2 - t1 <= 1)
-        checkEvaluation(ToUnixTimestamp(
-          Literal.create(null, DateType), Literal.create(null, StringType), 
timeZoneId), null)
-        checkEvaluation(
-          ToUnixTimestamp(
-            Literal.create(null, DateType), Literal(fmt1), timeZoneId),
-          null)
-        checkEvaluation(ToUnixTimestamp(
-          Literal(date1), Literal.create(null, StringType), timeZoneId),
-          
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1),
 tz)))
-        checkEvaluation(
-          ToUnixTimestamp(Literal("2015-07-24"), Literal("not a valid 
format"), timeZoneId), null)
+    Seq(false, true).foreach { legacyParser =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+        val fmt1 = "yyyy-MM-dd HH:mm:ss"
+        val sdf1 = new SimpleDateFormat(fmt1, Locale.US)
+        val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
+        val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
+        val fmt3 = "yy-MM-dd"
+        val sdf3 = new SimpleDateFormat(fmt3, Locale.US)
+        sdf3.setTimeZone(TimeZoneGMT)
+
+        withDefaultTimeZone(TimeZoneGMT) {
+          for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) {
+            val timeZoneId = Option(tz.getID)
+            sdf1.setTimeZone(tz)
+            sdf2.setTimeZone(tz)
+
+            val date1 = Date.valueOf("2015-07-24")
+            checkEvaluation(ToUnixTimestamp(
+              Literal(sdf1.format(new Timestamp(0))), Literal(fmt1), 
timeZoneId), 0L)
+            checkEvaluation(ToUnixTimestamp(
+              Literal(sdf1.format(new Timestamp(1000000))), Literal(fmt1), 
timeZoneId),
+              1000L)
+            checkEvaluation(ToUnixTimestamp(
+              Literal(new Timestamp(1000000)), Literal(fmt1)),
+              1000L)
+            checkEvaluation(
+              ToUnixTimestamp(Literal(date1), Literal(fmt1), timeZoneId),
+              MILLISECONDS.toSeconds(
+                DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), 
tz)))
+            checkEvaluation(
+              ToUnixTimestamp(
+                Literal(sdf2.format(new Timestamp(-1000000))),
+                Literal(fmt2), timeZoneId),
+              -1000L)
+            checkEvaluation(ToUnixTimestamp(
+              Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), 
timeZoneId),
+              MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(
+                DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz)))
+            val t1 = ToUnixTimestamp(
+              CurrentTimestamp(), Literal(fmt1)).eval().asInstanceOf[Long]
+            val t2 = ToUnixTimestamp(
+              CurrentTimestamp(), Literal(fmt1)).eval().asInstanceOf[Long]
+            assert(t2 - t1 <= 1)
+            checkEvaluation(ToUnixTimestamp(
+              Literal.create(null, DateType), Literal.create(null, 
StringType), timeZoneId), null)
+            checkEvaluation(
+              ToUnixTimestamp(
+                Literal.create(null, DateType), Literal(fmt1), timeZoneId),
+              null)
+            checkEvaluation(ToUnixTimestamp(
+              Literal(date1), Literal.create(null, StringType), timeZoneId),
+              MILLISECONDS.toSeconds(
+                DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), 
tz)))
+            checkEvaluation(
+              ToUnixTimestamp(
+                Literal("2015-07-24"),
+                Literal("not a valid format"), timeZoneId), null)
 
-        // SPARK-28072 The codegen path for non-literal input should also work
-        checkEvaluation(
-          expression = ToUnixTimestamp(
-            BoundReference(ordinal = 0, dataType = StringType, nullable = 
true),
-            BoundReference(ordinal = 1, dataType = StringType, nullable = 
true),
-            timeZoneId),
-          expected = 0L,
-          inputRow = InternalRow(
-            UTF8String.fromString(sdf1.format(new Timestamp(0))), 
UTF8String.fromString(fmt1)))
+            // SPARK-28072 The codegen path for non-literal input should also 
work
+            checkEvaluation(
+              expression = ToUnixTimestamp(
+                BoundReference(ordinal = 0, dataType = StringType, nullable = 
true),
+                BoundReference(ordinal = 1, dataType = StringType, nullable = 
true),
+                timeZoneId),
+              expected = 0L,
+              inputRow = InternalRow(
+                UTF8String.fromString(sdf1.format(new Timestamp(0))), 
UTF8String.fromString(fmt1)))
+          }
+        }
       }
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index d125581..2d5504a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -31,6 +31,7 @@ import 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, 
ResolvedHint}
+import org.apache.spark.sql.catalyst.util.TimestampFormatter
 import org.apache.spark.sql.execution.SparkSqlParser
 import org.apache.spark.sql.expressions.{Aggregator, SparkUserDefinedFunction, 
UserDefinedAggregator, UserDefinedFunction}
 import org.apache.spark.sql.internal.SQLConf
@@ -2881,7 +2882,7 @@ object functions {
    * @since 1.5.0
    */
   def from_unixtime(ut: Column): Column = withExpr {
-    FromUnixTime(ut.expr, Literal("uuuu-MM-dd HH:mm:ss"))
+    FromUnixTime(ut.expr, Literal(TimestampFormatter.defaultPattern))
   }
 
   /**
@@ -2913,7 +2914,7 @@ object functions {
    * @since 1.5.0
    */
   def unix_timestamp(): Column = withExpr {
-    UnixTimestamp(CurrentTimestamp(), Literal("uuuu-MM-dd HH:mm:ss"))
+    UnixTimestamp(CurrentTimestamp(), 
Literal(TimestampFormatter.defaultPattern))
   }
 
   /**
@@ -2927,7 +2928,7 @@ object functions {
    * @since 1.5.0
    */
   def unix_timestamp(s: Column): Column = withExpr {
-    UnixTimestamp(s.expr, Literal("uuuu-MM-dd HH:mm:ss"))
+    UnixTimestamp(s.expr, Literal(TimestampFormatter.defaultPattern))
   }
 
   /**
diff --git a/sql/core/src/test/resources/test-data/bad_after_good.csv 
b/sql/core/src/test/resources/test-data/bad_after_good.csv
index 4621a7d..1a7c265 100644
--- a/sql/core/src/test/resources/test-data/bad_after_good.csv
+++ b/sql/core/src/test/resources/test-data/bad_after_good.csv
@@ -1,2 +1,2 @@
 "good record",1999-08-01
-"bad record",1999-088-01
+"bad record",1999-088_01
diff --git a/sql/core/src/test/resources/test-data/value-malformed.csv 
b/sql/core/src/test/resources/test-data/value-malformed.csv
index 8945ed7..6e6f08f 100644
--- a/sql/core/src/test/resources/test-data/value-malformed.csv
+++ b/sql/core/src/test/resources/test-data/value-malformed.csv
@@ -1,2 +1,2 @@
-0,2013-111-11 12:13:14
+0,2013-111_11 12:13:14
 1,1983-08-04
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index bb8cdf3..41d53c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -96,15 +96,19 @@ class DateFunctionsSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("date format") {
-    val df = Seq((d, sdf.format(d), ts)).toDF("a", "b", "c")
+    Seq(false, true).foreach { legacyParser =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+        val df = Seq((d, sdf.format(d), ts)).toDF("a", "b", "c")
 
-    checkAnswer(
-      df.select(date_format($"a", "y"), date_format($"b", "y"), 
date_format($"c", "y")),
-      Row("2015", "2015", "2013"))
+        checkAnswer(
+          df.select(date_format($"a", "y"), date_format($"b", "y"), 
date_format($"c", "y")),
+          Row("2015", "2015", "2013"))
 
-    checkAnswer(
-      df.selectExpr("date_format(a, 'y')", "date_format(b, 'y')", 
"date_format(c, 'y')"),
-      Row("2015", "2015", "2013"))
+        checkAnswer(
+          df.selectExpr("date_format(a, 'y')", "date_format(b, 'y')", 
"date_format(c, 'y')"),
+          Row("2015", "2015", "2013"))
+      }
+    }
   }
 
   test("year") {
@@ -525,170 +529,194 @@ class DateFunctionsSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("from_unixtime") {
-    val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
-    val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
-    val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
-    val fmt3 = "yy-MM-dd HH-mm-ss"
-    val sdf3 = new SimpleDateFormat(fmt3, Locale.US)
-    val df = Seq((1000, "yyyy-MM-dd HH:mm:ss.SSS"), (-1000, "yy-MM-dd 
HH-mm-ss")).toDF("a", "b")
-    checkAnswer(
-      df.select(from_unixtime(col("a"))),
-      Seq(Row(sdf1.format(new Timestamp(1000000))), Row(sdf1.format(new 
Timestamp(-1000000)))))
-    checkAnswer(
-      df.select(from_unixtime(col("a"), fmt2)),
-      Seq(Row(sdf2.format(new Timestamp(1000000))), Row(sdf2.format(new 
Timestamp(-1000000)))))
-    checkAnswer(
-      df.select(from_unixtime(col("a"), fmt3)),
-      Seq(Row(sdf3.format(new Timestamp(1000000))), Row(sdf3.format(new 
Timestamp(-1000000)))))
-    checkAnswer(
-      df.selectExpr("from_unixtime(a)"),
-      Seq(Row(sdf1.format(new Timestamp(1000000))), Row(sdf1.format(new 
Timestamp(-1000000)))))
-    checkAnswer(
-      df.selectExpr(s"from_unixtime(a, '$fmt2')"),
-      Seq(Row(sdf2.format(new Timestamp(1000000))), Row(sdf2.format(new 
Timestamp(-1000000)))))
-    checkAnswer(
-      df.selectExpr(s"from_unixtime(a, '$fmt3')"),
-      Seq(Row(sdf3.format(new Timestamp(1000000))), Row(sdf3.format(new 
Timestamp(-1000000)))))
+    Seq(false, true).foreach { legacyParser =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+        val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
+        val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
+        val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
+        val fmt3 = "yy-MM-dd HH-mm-ss"
+        val sdf3 = new SimpleDateFormat(fmt3, Locale.US)
+        val df = Seq((1000, "yyyy-MM-dd HH:mm:ss.SSS"), (-1000, "yy-MM-dd 
HH-mm-ss")).toDF("a", "b")
+        checkAnswer(
+          df.select(from_unixtime(col("a"))),
+          Seq(Row(sdf1.format(new Timestamp(1000000))), Row(sdf1.format(new 
Timestamp(-1000000)))))
+        checkAnswer(
+          df.select(from_unixtime(col("a"), fmt2)),
+          Seq(Row(sdf2.format(new Timestamp(1000000))), Row(sdf2.format(new 
Timestamp(-1000000)))))
+        checkAnswer(
+          df.select(from_unixtime(col("a"), fmt3)),
+          Seq(Row(sdf3.format(new Timestamp(1000000))), Row(sdf3.format(new 
Timestamp(-1000000)))))
+        checkAnswer(
+          df.selectExpr("from_unixtime(a)"),
+          Seq(Row(sdf1.format(new Timestamp(1000000))), Row(sdf1.format(new 
Timestamp(-1000000)))))
+        checkAnswer(
+          df.selectExpr(s"from_unixtime(a, '$fmt2')"),
+          Seq(Row(sdf2.format(new Timestamp(1000000))), Row(sdf2.format(new 
Timestamp(-1000000)))))
+        checkAnswer(
+          df.selectExpr(s"from_unixtime(a, '$fmt3')"),
+          Seq(Row(sdf3.format(new Timestamp(1000000))), Row(sdf3.format(new 
Timestamp(-1000000)))))
+      }
+    }
   }
 
   private def secs(millis: Long): Long = 
TimeUnit.MILLISECONDS.toSeconds(millis)
 
   test("unix_timestamp") {
-    val date1 = Date.valueOf("2015-07-24")
-    val date2 = Date.valueOf("2015-07-25")
-    val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3")
-    val ts2 = Timestamp.valueOf("2015-07-25 02:02:02.2")
-    val s1 = "2015/07/24 10:00:00.5"
-    val s2 = "2015/07/25 02:02:02.6"
-    val ss1 = "2015-07-24 10:00:00"
-    val ss2 = "2015-07-25 02:02:02"
-    val fmt = "yyyy/MM/dd HH:mm:ss.S"
-    val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", 
"s", "ss")
-    checkAnswer(df.select(unix_timestamp(col("ts"))), Seq(
-      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
-    checkAnswer(df.select(unix_timestamp(col("ss"))), Seq(
-      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
-    checkAnswer(df.select(unix_timestamp(col("d"), fmt)), Seq(
-      Row(secs(date1.getTime)), Row(secs(date2.getTime))))
-    checkAnswer(df.select(unix_timestamp(col("s"), fmt)), Seq(
-      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
-    checkAnswer(df.selectExpr("unix_timestamp(ts)"), Seq(
-      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
-    checkAnswer(df.selectExpr("unix_timestamp(ss)"), Seq(
-      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
-    checkAnswer(df.selectExpr(s"unix_timestamp(d, '$fmt')"), Seq(
-      Row(secs(date1.getTime)), Row(secs(date2.getTime))))
-    checkAnswer(df.selectExpr(s"unix_timestamp(s, '$fmt')"), Seq(
-      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
-
-    val x1 = "2015-07-24 10:00:00"
-    val x2 = "2015-25-07 02:02:02"
-    val x3 = "2015-07-24 25:02:02"
-    val x4 = "2015-24-07 26:02:02"
-    val ts3 = Timestamp.valueOf("2015-07-24 02:25:02")
-    val ts4 = Timestamp.valueOf("2015-07-24 00:10:00")
-
-    val df1 = Seq(x1, x2, x3, x4).toDF("x")
-    checkAnswer(df1.select(unix_timestamp(col("x"))), Seq(
-      Row(secs(ts1.getTime)), Row(null), Row(null), Row(null)))
-    checkAnswer(df1.selectExpr("unix_timestamp(x)"), Seq(
-      Row(secs(ts1.getTime)), Row(null), Row(null), Row(null)))
-    checkAnswer(df1.select(unix_timestamp(col("x"), "yyyy-dd-MM HH:mm:ss")), 
Seq(
-      Row(null), Row(secs(ts2.getTime)), Row(null), Row(null)))
-    checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd mm:HH:ss')"), 
Seq(
-      Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), Row(null)))
-
-    // invalid format
-    checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd aa:HH:ss')"), 
Seq(
-      Row(null), Row(null), Row(null), Row(null)))
-
-    // february
-    val y1 = "2016-02-29"
-    val y2 = "2017-02-29"
-    val ts5 = Timestamp.valueOf("2016-02-29 00:00:00")
-    val df2 = Seq(y1, y2).toDF("y")
-    checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq(
-      Row(secs(ts5.getTime)), Row(null)))
-
-    val now = sql("select unix_timestamp()").collect().head.getLong(0)
-    checkAnswer(
-      sql(s"select cast ($now as timestamp)"),
-      Row(new java.util.Date(TimeUnit.SECONDS.toMillis(now))))
+    Seq(false, true).foreach { legacyParser =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+        val date1 = Date.valueOf("2015-07-24")
+        val date2 = Date.valueOf("2015-07-25")
+        val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3")
+        val ts2 = Timestamp.valueOf("2015-07-25 02:02:02.2")
+        val s1 = "2015/07/24 10:00:00.5"
+        val s2 = "2015/07/25 02:02:02.6"
+        val ss1 = "2015-07-24 10:00:00"
+        val ss2 = "2015-07-25 02:02:02"
+        val fmt = "yyyy/MM/dd HH:mm:ss.S"
+        val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", 
"ts", "s", "ss")
+        checkAnswer(df.select(unix_timestamp(col("ts"))), Seq(
+          Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
+        checkAnswer(df.select(unix_timestamp(col("ss"))), Seq(
+          Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
+        checkAnswer(df.select(unix_timestamp(col("d"), fmt)), Seq(
+          Row(secs(date1.getTime)), Row(secs(date2.getTime))))
+        checkAnswer(df.select(unix_timestamp(col("s"), fmt)), Seq(
+          Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
+        checkAnswer(df.selectExpr("unix_timestamp(ts)"), Seq(
+          Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
+        checkAnswer(df.selectExpr("unix_timestamp(ss)"), Seq(
+          Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
+        checkAnswer(df.selectExpr(s"unix_timestamp(d, '$fmt')"), Seq(
+          Row(secs(date1.getTime)), Row(secs(date2.getTime))))
+        checkAnswer(df.selectExpr(s"unix_timestamp(s, '$fmt')"), Seq(
+          Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
+
+        val x1 = "2015-07-24 10:00:00"
+        val x2 = "2015-25-07 02:02:02"
+        val x3 = "2015-07-24 25:02:02"
+        val x4 = "2015-24-07 26:02:02"
+        val ts3 = Timestamp.valueOf("2015-07-24 02:25:02")
+        val ts4 = Timestamp.valueOf("2015-07-24 00:10:00")
+
+        val df1 = Seq(x1, x2, x3, x4).toDF("x")
+        checkAnswer(df1.select(unix_timestamp(col("x"))), Seq(
+          Row(secs(ts1.getTime)), Row(null), Row(null), Row(null)))
+        checkAnswer(df1.selectExpr("unix_timestamp(x)"), Seq(
+          Row(secs(ts1.getTime)), Row(null), Row(null), Row(null)))
+        checkAnswer(df1.select(unix_timestamp(col("x"), "yyyy-dd-MM 
HH:mm:ss")), Seq(
+          Row(null), Row(secs(ts2.getTime)), Row(null), Row(null)))
+        checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd 
mm:HH:ss')"), Seq(
+          Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), 
Row(null)))
+
+        // invalid format
+        checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd 
aa:HH:ss')"), Seq(
+          Row(null), Row(null), Row(null), Row(null)))
+
+        // february
+        val y1 = "2016-02-29"
+        val y2 = "2017-02-29"
+        val ts5 = Timestamp.valueOf("2016-02-29 00:00:00")
+        val df2 = Seq(y1, y2).toDF("y")
+        checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq(
+          Row(secs(ts5.getTime)), Row(null)))
+
+        val now = sql("select unix_timestamp()").collect().head.getLong(0)
+        checkAnswer(
+          sql(s"select cast ($now as timestamp)"),
+          Row(new java.util.Date(TimeUnit.SECONDS.toMillis(now))))
+      }
+    }
   }
 
   test("to_unix_timestamp") {
-    val date1 = Date.valueOf("2015-07-24")
-    val date2 = Date.valueOf("2015-07-25")
-    val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3")
-    val ts2 = Timestamp.valueOf("2015-07-25 02:02:02.2")
-    val s1 = "2015/07/24 10:00:00.5"
-    val s2 = "2015/07/25 02:02:02.6"
-    val ss1 = "2015-07-24 10:00:00"
-    val ss2 = "2015-07-25 02:02:02"
-    val fmt = "yyyy/MM/dd HH:mm:ss.S"
-    val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", 
"s", "ss")
-    checkAnswer(df.selectExpr("to_unix_timestamp(ts)"), Seq(
-      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
-    checkAnswer(df.selectExpr("to_unix_timestamp(ss)"), Seq(
-      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
-    checkAnswer(df.selectExpr(s"to_unix_timestamp(d, '$fmt')"), Seq(
-      Row(secs(date1.getTime)), Row(secs(date2.getTime))))
-    checkAnswer(df.selectExpr(s"to_unix_timestamp(s, '$fmt')"), Seq(
-      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
-
-    val x1 = "2015-07-24 10:00:00"
-    val x2 = "2015-25-07 02:02:02"
-    val x3 = "2015-07-24 25:02:02"
-    val x4 = "2015-24-07 26:02:02"
-    val ts3 = Timestamp.valueOf("2015-07-24 02:25:02")
-    val ts4 = Timestamp.valueOf("2015-07-24 00:10:00")
-
-    val df1 = Seq(x1, x2, x3, x4).toDF("x")
-    checkAnswer(df1.selectExpr("to_unix_timestamp(x)"), Seq(
-      Row(secs(ts1.getTime)), Row(null), Row(null), Row(null)))
-    checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd 
mm:HH:ss')"), Seq(
-      Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), Row(null)))
-
-    // february
-    val y1 = "2016-02-29"
-    val y2 = "2017-02-29"
-    val ts5 = Timestamp.valueOf("2016-02-29 00:00:00")
-    val df2 = Seq(y1, y2).toDF("y")
-    checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq(
-      Row(secs(ts5.getTime)), Row(null)))
-
-    // invalid format
-    checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd 
bb:HH:ss')"), Seq(
-      Row(null), Row(null), Row(null), Row(null)))
+    Seq(false, true).foreach { legacyParser =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+        val date1 = Date.valueOf("2015-07-24")
+        val date2 = Date.valueOf("2015-07-25")
+        val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3")
+        val ts2 = Timestamp.valueOf("2015-07-25 02:02:02.2")
+        val s1 = "2015/07/24 10:00:00.5"
+        val s2 = "2015/07/25 02:02:02.6"
+        val ss1 = "2015-07-24 10:00:00"
+        val ss2 = "2015-07-25 02:02:02"
+        val fmt = "yyyy/MM/dd HH:mm:ss.S"
+        val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", 
"ts", "s", "ss")
+        checkAnswer(df.selectExpr("to_unix_timestamp(ts)"), Seq(
+          Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
+        checkAnswer(df.selectExpr("to_unix_timestamp(ss)"), Seq(
+          Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
+        checkAnswer(df.selectExpr(s"to_unix_timestamp(d, '$fmt')"), Seq(
+          Row(secs(date1.getTime)), Row(secs(date2.getTime))))
+        checkAnswer(df.selectExpr(s"to_unix_timestamp(s, '$fmt')"), Seq(
+          Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
+
+        val x1 = "2015-07-24 10:00:00"
+        val x2 = "2015-25-07 02:02:02"
+        val x3 = "2015-07-24 25:02:02"
+        val x4 = "2015-24-07 26:02:02"
+        val ts3 = Timestamp.valueOf("2015-07-24 02:25:02")
+        val ts4 = Timestamp.valueOf("2015-07-24 00:10:00")
+
+        val df1 = Seq(x1, x2, x3, x4).toDF("x")
+        checkAnswer(df1.selectExpr("to_unix_timestamp(x)"), Seq(
+          Row(secs(ts1.getTime)), Row(null), Row(null), Row(null)))
+        checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd 
mm:HH:ss')"), Seq(
+          Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), 
Row(null)))
+
+        // february
+        val y1 = "2016-02-29"
+        val y2 = "2017-02-29"
+        val ts5 = Timestamp.valueOf("2016-02-29 00:00:00")
+        val df2 = Seq(y1, y2).toDF("y")
+        checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq(
+          Row(secs(ts5.getTime)), Row(null)))
+
+        // invalid format
+        checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd 
bb:HH:ss')"), Seq(
+          Row(null), Row(null), Row(null), Row(null)))
+      }
+    }
   }
 
 
   test("to_timestamp") {
-    val date1 = Date.valueOf("2015-07-24")
-    val date2 = Date.valueOf("2015-07-25")
-    val ts_date1 = Timestamp.valueOf("2015-07-24 00:00:00")
-    val ts_date2 = Timestamp.valueOf("2015-07-25 00:00:00")
-    val ts1 = Timestamp.valueOf("2015-07-24 10:00:00")
-    val ts2 = Timestamp.valueOf("2015-07-25 02:02:02")
-    val s1 = "2015/07/24 10:00:00.5"
-    val s2 = "2015/07/25 02:02:02.6"
-    val ts1m = Timestamp.valueOf("2015-07-24 10:00:00.5")
-    val ts2m = Timestamp.valueOf("2015-07-25 02:02:02.6")
-    val ss1 = "2015-07-24 10:00:00"
-    val ss2 = "2015-07-25 02:02:02"
-    val fmt = "yyyy/MM/dd HH:mm:ss.S"
-    val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", 
"s", "ss")
-
-    checkAnswer(df.select(to_timestamp(col("ss"))),
-      df.select(unix_timestamp(col("ss")).cast("timestamp")))
-    checkAnswer(df.select(to_timestamp(col("ss"))), Seq(
-      Row(ts1), Row(ts2)))
-    checkAnswer(df.select(to_timestamp(col("s"), fmt)), Seq(
-      Row(ts1m), Row(ts2m)))
-    checkAnswer(df.select(to_timestamp(col("ts"), fmt)), Seq(
-      Row(ts1), Row(ts2)))
-    checkAnswer(df.select(to_timestamp(col("d"), "yyyy-MM-dd")), Seq(
-      Row(ts_date1), Row(ts_date2)))
+    Seq(false, true).foreach { legacyParser =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+        val date1 = Date.valueOf("2015-07-24")
+        val date2 = Date.valueOf("2015-07-25")
+        val ts_date1 = Timestamp.valueOf("2015-07-24 00:00:00")
+        val ts_date2 = Timestamp.valueOf("2015-07-25 00:00:00")
+        val ts1 = Timestamp.valueOf("2015-07-24 10:00:00")
+        val ts2 = Timestamp.valueOf("2015-07-25 02:02:02")
+        val s1 = "2015/07/24 10:00:00.5"
+        val s2 = "2015/07/25 02:02:02.6"
+        val ts1m = Timestamp.valueOf("2015-07-24 10:00:00.5")
+        val ts2m = Timestamp.valueOf("2015-07-25 02:02:02.6")
+        val ss1 = "2015-07-24 10:00:00"
+        val ss2 = "2015-07-25 02:02:02"
+        val fmt = "yyyy/MM/dd HH:mm:ss.S"
+        val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", 
"ts", "s", "ss")
+
+        checkAnswer(df.select(to_timestamp(col("ss"))),
+          df.select(unix_timestamp(col("ss")).cast("timestamp")))
+        checkAnswer(df.select(to_timestamp(col("ss"))), Seq(
+          Row(ts1), Row(ts2)))
+        if (legacyParser) {
+          // In Spark 2.4 and earlier, to_timestamp() parses in seconds 
precision and cuts off
+          // the fractional part of seconds. The behavior was changed by 
SPARK-27438.
+          val legacyFmt = "yyyy/MM/dd HH:mm:ss"
+          checkAnswer(df.select(to_timestamp(col("s"), legacyFmt)), Seq(
+            Row(ts1), Row(ts2)))
+        } else {
+          checkAnswer(df.select(to_timestamp(col("s"), fmt)), Seq(
+            Row(ts1m), Row(ts2m)))
+        }
+        checkAnswer(df.select(to_timestamp(col("ts"), fmt)), Seq(
+          Row(ts1), Row(ts2)))
+        checkAnswer(df.select(to_timestamp(col("d"), "yyyy-MM-dd")), Seq(
+          Row(ts_date1), Row(ts_date2)))
+      }
+    }
   }
 
   test("datediff") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 97dfbbd..b1105b4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1182,7 +1182,7 @@ abstract class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvDa
         .schema(schemaWithCorrField1)
         .csv(testFile(valueMalformedFile))
       checkAnswer(df2,
-        Row(0, null, "0,2013-111-11 12:13:14") ::
+        Row(0, null, "0,2013-111_11 12:13:14") ::
         Row(1, java.sql.Date.valueOf("1983-08-04"), null) ::
         Nil)
 
@@ -1199,7 +1199,7 @@ abstract class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvDa
         .schema(schemaWithCorrField2)
         .csv(testFile(valueMalformedFile))
       checkAnswer(df3,
-        Row(0, "0,2013-111-11 12:13:14", null) ::
+        Row(0, "0,2013-111_11 12:13:14", null) ::
         Row(1, null, java.sql.Date.valueOf("1983-08-04")) ::
         Nil)
 
@@ -1435,7 +1435,7 @@ abstract class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvDa
     assert(df.filter($"_corrupt_record".isNull).count() == 1)
     checkAnswer(
       df.select(columnNameOfCorruptRecord),
-      Row("0,2013-111-11 12:13:14") :: Row(null) :: Nil
+      Row("0,2013-111_11 12:13:14") :: Row(null) :: Nil
     )
   }
 
@@ -2093,7 +2093,7 @@ abstract class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvDa
     Seq("csv", "").foreach { reader =>
       withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> reader) {
         withTempPath { path =>
-          val df = Seq(("0", "2013-111-11")).toDF("a", "b")
+          val df = Seq(("0", "2013-111_11")).toDF("a", "b")
           df.write
             .option("header", "true")
             .csv(path.getAbsolutePath)
@@ -2109,7 +2109,7 @@ abstract class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvDa
             .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
             .schema(schemaWithCorrField)
             .csv(path.getAbsoluteFile.toString)
-          checkAnswer(readDF, Row(0, null, "0,2013-111-11") :: Nil)
+          checkAnswer(readDF, Row(0, null, "0,2013-111_11") :: Nil)
         }
       }
     }
@@ -2216,7 +2216,7 @@ abstract class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvDa
                 val readback = spark.read
                   .option("mode", mode)
                   .option("header", true)
-                  .option("timestampFormat", "uuuu-MM-dd HH:mm:ss")
+                  .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
                   .option("multiLine", multiLine)
                   .schema("c0 string, c1 integer, c2 timestamp")
                   .csv(path.getAbsolutePath)
@@ -2235,7 +2235,7 @@ abstract class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvDa
   }
 
   test("filters push down - malformed input in PERMISSIVE mode") {
-    val invalidTs = "2019-123-14 20:35:30"
+    val invalidTs = "2019-123_14 20:35:30"
     val invalidRow = s"0,$invalidTs,999"
     val validTs = "2019-12-14 20:35:30"
     Seq(true, false).foreach { filterPushdown =>
@@ -2252,7 +2252,7 @@ abstract class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvDa
               .option("mode", "PERMISSIVE")
               .option("columnNameOfCorruptRecord", "c3")
               .option("header", true)
-              .option("timestampFormat", "uuuu-MM-dd HH:mm:ss")
+              .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
               .schema("c0 integer, c1 timestamp, c2 integer, c3 string")
               .csv(path.getAbsolutePath)
               .where(condition)
@@ -2309,3 +2309,10 @@ class CSVv2Suite extends CSVSuite {
       .sparkConf
       .set(SQLConf.USE_V1_SOURCE_LIST, "")
 }
+
+class CSVLegacyTimeParserSuite extends CSVSuite {
+  override protected def sparkConf: SparkConf =
+    super
+      .sparkConf
+      .set(SQLConf.LEGACY_TIME_PARSER_ENABLED, true)
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index b20da22..7abe818 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2572,3 +2572,10 @@ class JsonV2Suite extends JsonSuite {
       .sparkConf
       .set(SQLConf.USE_V1_SOURCE_LIST, "")
 }
+
+class JsonLegacyTimeParserSuite extends JsonSuite {
+  override protected def sparkConf: SparkConf =
+    super
+      .sparkConf
+      .set(SQLConf.LEGACY_TIME_PARSER_ENABLED, true)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to