This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 888bf1b2ef4 [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources 888bf1b2ef4 is described below commit 888bf1b2ef44a27c3d4be716a72175bbaa8c6453 Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Wed May 18 10:59:52 2022 +0800 [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources ### What changes were proposed in this pull request? When reading JSON/CSV files with inferring timestamp types (`.option("inferTimestamp", true)`), the Timestamp conversion will throw and catch exceptions. As we are putting decent error messages in the exception: ``` def cannotCastToDateTimeError( value: Any, from: DataType, to: DataType, errorContext: String): Throwable = { val valueString = toSQLValue(value, from) new SparkDateTimeException("INVALID_SYNTAX_FOR_CAST", Array(toSQLType(to), valueString, SQLConf.ANSI_ENABLED.key, errorContext)) } ``` Throwing and catching the timestamp parsing exceptions is actually not cheap. It consumes more than 90% of the type inference time. This PR improves the default timestamp parsing by returning optional results instead of throwing/catching the exceptions. With this PR, the schema inference time is reduced by 90% in a local benchmark. Note this PR is for the default timestamp parser. It doesn't cover the scenarios of * users provide a customized timestamp format via option * users enable legacy timestamp formatter We can have follow-ups for it. ### Why are the changes needed? Performance improvement ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UT Also manual test the runtime to inferring a JSON file of 624MB with inferring timestamp enabled: ``` spark.read.option("inferTimestamp", true).json(file) ``` Before the change, it takes 166 seconds After the change, it only 16 seconds. Closes #36562 from gengliangwang/improveInferTS. Lead-authored-by: Gengliang Wang <gengli...@apache.org> Co-authored-by: Gengliang Wang <ltn...@gmail.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../spark/sql/catalyst/csv/CSVInferSchema.scala | 4 +- .../spark/sql/catalyst/json/JsonInferSchema.scala | 4 +- .../sql/catalyst/util/TimestampFormatter.scala | 51 ++++++++++++++++++++++ .../catalyst/util/TimestampFormatterSuite.scala | 15 +++++++ 4 files changed, 70 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index f30fa8a0b5f..8b0c6c49b85 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -178,7 +178,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { // We can only parse the value as TimestampNTZType if it does not have zone-offset or // time-zone component and can be parsed with the timestamp formatter. // Otherwise, it is likely to be a timestamp with timezone. - if ((allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, false)).isDefined) { + if (timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { SQLConf.get.timestampType } else { tryParseTimestamp(field) @@ -187,7 +187,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { private def tryParseTimestamp(field: String): DataType = { // This case infers a custom `dataFormat` is set. - if ((allCatch opt timestampParser.parse(field)).isDefined) { + if (timestampParser.parseOptional(field).isDefined) { TimestampType } else { tryParseBoolean(field) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index d08773d8469..f6064bd7195 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -151,10 +151,10 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { if (options.prefersDecimal && decimalTry.isDefined) { decimalTry.get } else if (options.inferTimestamp && - (allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, false)).isDefined) { + timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { SQLConf.get.timestampType } else if (options.inferTimestamp && - (allCatch opt timestampFormatter.parse(field)).isDefined) { + timestampFormatter.parseOptional(field).isDefined) { TimestampType } else { StringType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 7502e0a463b..8ebe77978b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -52,6 +52,25 @@ sealed trait TimestampFormatter extends Serializable { @throws(classOf[DateTimeException]) def parse(s: String): Long + /** + * Parses a timestamp in a string and converts it to an optional number of microseconds. + * + * @param s - string with timestamp to parse + * @return An optional number of microseconds since epoch. The result is None on invalid input. + * @throws ParseException can be thrown by legacy parser + * @throws DateTimeParseException can be thrown by new parser + * @throws DateTimeException unable to obtain local date or time + */ + @throws(classOf[ParseException]) + @throws(classOf[DateTimeParseException]) + @throws(classOf[DateTimeException]) + def parseOptional(s: String): Option[Long] = + try { + Some(parse(s)) + } catch { + case _: Exception => None + } + /** * Parses a timestamp in a string and converts it to microseconds since Unix Epoch in local time. * @@ -73,6 +92,30 @@ sealed trait TimestampFormatter extends Serializable { s"The method `parseWithoutTimeZone(s: String, allowTimeZone: Boolean)` should be " + "implemented in the formatter of timestamp without time zone") + /** + * Parses a timestamp in a string and converts it to an optional number of microseconds since + * Unix Epoch in local time. + * + * @param s - string with timestamp to parse + * @param allowTimeZone - indicates strict parsing of timezone + * @return An optional number of microseconds since epoch. The result is None on invalid input. + * @throws ParseException can be thrown by legacy parser + * @throws DateTimeParseException can be thrown by new parser + * @throws DateTimeException unable to obtain local date or time + * @throws IllegalStateException The formatter for timestamp without time zone should always + * implement this method. The exception should never be hit. + */ + @throws(classOf[ParseException]) + @throws(classOf[DateTimeParseException]) + @throws(classOf[DateTimeException]) + @throws(classOf[IllegalStateException]) + def parseWithoutTimeZoneOptional(s: String, allowTimeZone: Boolean): Option[Long] = + try { + Some(parseWithoutTimeZone(s, allowTimeZone)) + } catch { + case _: Exception => None + } + /** * Parses a timestamp in a string and converts it to microseconds since Unix Epoch in local time. * Zone-id and zone-offset components are ignored. @@ -204,6 +247,9 @@ class DefaultTimestampFormatter( } catch checkParsedDiff(s, legacyFormatter.parse) } + override def parseOptional(s: String): Option[Long] = + DateTimeUtils.stringToTimestamp(UTF8String.fromString(s), zoneId) + override def parseWithoutTimeZone(s: String, allowTimeZone: Boolean): Long = { try { val utf8Value = UTF8String.fromString(s) @@ -213,6 +259,11 @@ class DefaultTimestampFormatter( } } catch checkParsedDiff(s, legacyFormatter.parse) } + + override def parseWithoutTimeZoneOptional(s: String, allowTimeZone: Boolean): Option[Long] = { + val utf8Value = UTF8String.fromString(s) + DateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, allowTimeZone) + } } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala index c812f8b9b73..e3d7c972baf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala @@ -456,4 +456,19 @@ class TimestampFormatterSuite extends DatetimeFormatterSuite { assert(errMsg.contains("""Invalid input syntax for type "TIMESTAMP": 'x123'""")) } } + + test("SPARK-39193: support returning optional parse results in the default formatter") { + val formatter = new DefaultTimestampFormatter( + DateTimeTestUtils.LA, + locale = DateFormatter.defaultLocale, + legacyFormat = LegacyDateFormats.SIMPLE_DATE_FORMAT, + isParsing = true) + assert(formatter.parseOptional("2021-01-01T00:00:00").contains(1609488000000000L)) + assert( + formatter.parseWithoutTimeZoneOptional("2021-01-01T00:00:00", false) + .contains(1609459200000000L)) + assert(formatter.parseOptional("abc").isEmpty) + assert( + formatter.parseWithoutTimeZoneOptional("abc", false).isEmpty) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org