This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 888bf1b2ef4 [SPARK-39193][SQL] Fasten Timestamp type inference of 
JSON/CSV data sources
888bf1b2ef4 is described below

commit 888bf1b2ef44a27c3d4be716a72175bbaa8c6453
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Wed May 18 10:59:52 2022 +0800

    [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
    
    ### What changes were proposed in this pull request?
    
    When reading JSON/CSV files with inferring timestamp types 
(`.option("inferTimestamp", true)`), the Timestamp conversion will throw and 
catch exceptions.
    As we are putting decent error messages in the exception:
    ```
      def cannotCastToDateTimeError(
          value: Any, from: DataType, to: DataType, errorContext: String): 
Throwable = {
        val valueString = toSQLValue(value, from)
        new SparkDateTimeException("INVALID_SYNTAX_FOR_CAST",
          Array(toSQLType(to), valueString, SQLConf.ANSI_ENABLED.key, 
errorContext))
      }
    ```
    Throwing and catching the timestamp parsing exceptions is actually not 
cheap. It consumes more than 90% of the type inference time.
    
    This PR improves the default timestamp parsing by returning optional 
results instead of throwing/catching the exceptions. With this PR, the schema 
inference time is reduced by 90% in a local benchmark.
    
    Note this PR is for the default timestamp parser. It doesn't cover the 
scenarios of
    * users provide a customized timestamp format via option
    * users enable legacy timestamp formatter
    We can have follow-ups for it.
    
    ### Why are the changes needed?
    
    Performance improvement
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing UT
    Also manual test the runtime to inferring a JSON file of 624MB with 
inferring timestamp enabled:
    ```
    spark.read.option("inferTimestamp", true).json(file)
    ```
    
    Before the change, it takes 166 seconds
    After the change, it only 16 seconds.
    
    Closes #36562 from gengliangwang/improveInferTS.
    
    Lead-authored-by: Gengliang Wang <gengli...@apache.org>
    Co-authored-by: Gengliang Wang <ltn...@gmail.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../spark/sql/catalyst/csv/CSVInferSchema.scala    |  4 +-
 .../spark/sql/catalyst/json/JsonInferSchema.scala  |  4 +-
 .../sql/catalyst/util/TimestampFormatter.scala     | 51 ++++++++++++++++++++++
 .../catalyst/util/TimestampFormatterSuite.scala    | 15 +++++++
 4 files changed, 70 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index f30fa8a0b5f..8b0c6c49b85 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -178,7 +178,7 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
     // We can only parse the value as TimestampNTZType if it does not have 
zone-offset or
     // time-zone component and can be parsed with the timestamp formatter.
     // Otherwise, it is likely to be a timestamp with timezone.
-    if ((allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, 
false)).isDefined) {
+    if (timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
       SQLConf.get.timestampType
     } else {
       tryParseTimestamp(field)
@@ -187,7 +187,7 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
 
   private def tryParseTimestamp(field: String): DataType = {
     // This case infers a custom `dataFormat` is set.
-    if ((allCatch opt timestampParser.parse(field)).isDefined) {
+    if (timestampParser.parseOptional(field).isDefined) {
       TimestampType
     } else {
       tryParseBoolean(field)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index d08773d8469..f6064bd7195 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -151,10 +151,10 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
         if (options.prefersDecimal && decimalTry.isDefined) {
           decimalTry.get
         } else if (options.inferTimestamp &&
-            (allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, 
false)).isDefined) {
+            timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
           SQLConf.get.timestampType
         } else if (options.inferTimestamp &&
-            (allCatch opt timestampFormatter.parse(field)).isDefined) {
+            timestampFormatter.parseOptional(field).isDefined) {
           TimestampType
         } else {
           StringType
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index 7502e0a463b..8ebe77978b5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -52,6 +52,25 @@ sealed trait TimestampFormatter extends Serializable {
   @throws(classOf[DateTimeException])
   def parse(s: String): Long
 
+  /**
+   * Parses a timestamp in a string and converts it to an optional number of 
microseconds.
+   *
+   * @param s - string with timestamp to parse
+   * @return An optional number of microseconds since epoch. The result is 
None on invalid input.
+   * @throws ParseException can be thrown by legacy parser
+   * @throws DateTimeParseException can be thrown by new parser
+   * @throws DateTimeException unable to obtain local date or time
+   */
+  @throws(classOf[ParseException])
+  @throws(classOf[DateTimeParseException])
+  @throws(classOf[DateTimeException])
+  def parseOptional(s: String): Option[Long] =
+    try {
+      Some(parse(s))
+    } catch {
+      case _: Exception => None
+    }
+
   /**
    * Parses a timestamp in a string and converts it to microseconds since Unix 
Epoch in local time.
    *
@@ -73,6 +92,30 @@ sealed trait TimestampFormatter extends Serializable {
       s"The method `parseWithoutTimeZone(s: String, allowTimeZone: Boolean)` 
should be " +
         "implemented in the formatter of timestamp without time zone")
 
+  /**
+   * Parses a timestamp in a string and converts it to an optional number of 
microseconds since
+   * Unix Epoch in local time.
+   *
+   * @param s - string with timestamp to parse
+   * @param allowTimeZone - indicates strict parsing of timezone
+   * @return An optional number of microseconds since epoch. The result is 
None on invalid input.
+   * @throws ParseException can be thrown by legacy parser
+   * @throws DateTimeParseException can be thrown by new parser
+   * @throws DateTimeException unable to obtain local date or time
+   * @throws IllegalStateException The formatter for timestamp without time 
zone should always
+   *                               implement this method. The exception should 
never be hit.
+   */
+  @throws(classOf[ParseException])
+  @throws(classOf[DateTimeParseException])
+  @throws(classOf[DateTimeException])
+  @throws(classOf[IllegalStateException])
+  def parseWithoutTimeZoneOptional(s: String, allowTimeZone: Boolean): 
Option[Long] =
+    try {
+      Some(parseWithoutTimeZone(s, allowTimeZone))
+    } catch {
+      case _: Exception => None
+    }
+
   /**
    * Parses a timestamp in a string and converts it to microseconds since Unix 
Epoch in local time.
    * Zone-id and zone-offset components are ignored.
@@ -204,6 +247,9 @@ class DefaultTimestampFormatter(
     } catch checkParsedDiff(s, legacyFormatter.parse)
   }
 
+  override def parseOptional(s: String): Option[Long] =
+    DateTimeUtils.stringToTimestamp(UTF8String.fromString(s), zoneId)
+
   override def parseWithoutTimeZone(s: String, allowTimeZone: Boolean): Long = 
{
     try {
       val utf8Value = UTF8String.fromString(s)
@@ -213,6 +259,11 @@ class DefaultTimestampFormatter(
       }
     } catch checkParsedDiff(s, legacyFormatter.parse)
   }
+
+  override def parseWithoutTimeZoneOptional(s: String, allowTimeZone: 
Boolean): Option[Long] = {
+    val utf8Value = UTF8String.fromString(s)
+    DateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, allowTimeZone)
+  }
 }
 
 /**
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala
index c812f8b9b73..e3d7c972baf 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala
@@ -456,4 +456,19 @@ class TimestampFormatterSuite extends 
DatetimeFormatterSuite {
       assert(errMsg.contains("""Invalid input syntax for type "TIMESTAMP": 
'x123'"""))
     }
   }
+
+  test("SPARK-39193: support returning optional parse results in the default 
formatter") {
+    val formatter = new DefaultTimestampFormatter(
+      DateTimeTestUtils.LA,
+      locale = DateFormatter.defaultLocale,
+      legacyFormat = LegacyDateFormats.SIMPLE_DATE_FORMAT,
+      isParsing = true)
+    
assert(formatter.parseOptional("2021-01-01T00:00:00").contains(1609488000000000L))
+    assert(
+      formatter.parseWithoutTimeZoneOptional("2021-01-01T00:00:00", false)
+        .contains(1609459200000000L))
+    assert(formatter.parseOptional("abc").isEmpty)
+    assert(
+      formatter.parseWithoutTimeZoneOptional("abc", false).isEmpty)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to