This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 3ab2f88 [SPARK-31296][SQL][TESTS] Benchmark date-time rebasing in Parquet datasource 3ab2f88 is described below commit 3ab2f8877e4618031b3258cb90eb249145077e08 Author: Maxim Gekk <max.g...@gmail.com> AuthorDate: Mon Mar 30 16:46:31 2020 +0800 [SPARK-31296][SQL][TESTS] Benchmark date-time rebasing in Parquet datasource ### What changes were proposed in this pull request? In the PR, I propose to add new benchmark `DateTimeRebaseBenchmark` which should measure the performance of rebasing of dates/timestamps from/to to the hybrid calendar (Julian+Gregorian) to/from Proleptic Gregorian calendar: 1. In write, it saves separately dates and timestamps before and after 1582 year w/ and w/o rebasing. 2. In read, it loads previously saved parquet files by vectorized reader and by regular reader. Here is the summary of benchmarking: - Saving timestamps is **~6 times slower** - Loading timestamps w/ vectorized **off** is **~4 times slower** - Loading timestamps w/ vectorized **on** is **~10 times slower** ### Why are the changes needed? To know the impact of date-time rebasing introduced by #27915, #27953, #27807. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Run the `DateTimeRebaseBenchmark` benchmark using Amazon EC2: | Item | Description | | ---- | ----| | Region | us-west-2 (Oregon) | | Instance | r3.xlarge | | AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) | | Java | OpenJDK8/11 | Closes #28057 from MaxGekk/rebase-bechmark. Lead-authored-by: Maxim Gekk <max.g...@gmail.com> Co-authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit a1dbcd13a3eeaee50cc1a46e909f9478d6d55177) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../DateTimeRebaseBenchmark-jdk11-results.txt | 53 +++++++ .../benchmarks/DateTimeRebaseBenchmark-results.txt | 53 +++++++ .../benchmark/DateTimeRebaseBenchmark.scala | 161 +++++++++++++++++++++ 3 files changed, 267 insertions(+) diff --git a/sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt b/sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt new file mode 100644 index 0000000..52522f8 --- /dev/null +++ b/sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt @@ -0,0 +1,53 @@ +================================================================================================ +Rebasing dates/timestamps in Parquet datasource +================================================================================================ + +OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1058-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Save dates to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +after 1582, noop 9272 9272 0 10.8 92.7 1.0X +before 1582, noop 9142 9142 0 10.9 91.4 1.0X +after 1582, rebase off 21841 21841 0 4.6 218.4 0.4X +after 1582, rebase on 58245 58245 0 1.7 582.4 0.2X +before 1582, rebase off 19813 19813 0 5.0 198.1 0.5X +before 1582, rebase on 63737 63737 0 1.6 637.4 0.1X + +OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1058-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Load dates from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +after 1582, vec off, rebase off 13004 13063 67 7.7 130.0 1.0X +after 1582, vec off, rebase on 36224 36253 26 2.8 362.2 0.4X +after 1582, vec on, rebase off 3596 3654 54 27.8 36.0 3.6X +after 1582, vec on, rebase on 26144 26253 112 3.8 261.4 0.5X +before 1582, vec off, rebase off 12872 12914 51 7.8 128.7 1.0X +before 1582, vec off, rebase on 37762 37904 153 2.6 377.6 0.3X +before 1582, vec on, rebase off 3522 3592 94 28.4 35.2 3.7X +before 1582, vec on, rebase on 27580 27615 59 3.6 275.8 0.5X + +OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1058-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Save timestamps to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +after 1582, noop 3113 3113 0 32.1 31.1 1.0X +before 1582, noop 3078 3078 0 32.5 30.8 1.0X +after 1582, rebase off 15749 15749 0 6.3 157.5 0.2X +after 1582, rebase on 69106 69106 0 1.4 691.1 0.0X +before 1582, rebase off 15967 15967 0 6.3 159.7 0.2X +before 1582, rebase on 76843 76843 0 1.3 768.4 0.0X + +OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1058-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Load timestamps from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +after 1582, vec off, rebase off 15070 15172 94 6.6 150.7 1.0X +after 1582, vec off, rebase on 43748 43867 157 2.3 437.5 0.3X +after 1582, vec on, rebase off 4805 4859 60 20.8 48.1 3.1X +after 1582, vec on, rebase on 33960 34027 61 2.9 339.6 0.4X +before 1582, vec off, rebase off 15037 15071 52 6.7 150.4 1.0X +before 1582, vec off, rebase on 44590 44749 156 2.2 445.9 0.3X +before 1582, vec on, rebase off 4831 4852 30 20.7 48.3 3.1X +before 1582, vec on, rebase on 35460 35481 18 2.8 354.6 0.4X + + diff --git a/sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt b/sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt new file mode 100644 index 0000000..c9320cf --- /dev/null +++ b/sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt @@ -0,0 +1,53 @@ +================================================================================================ +Rebasing dates/timestamps in Parquet datasource +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1058-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Save dates to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +after 1582, noop 9472 9472 0 10.6 94.7 1.0X +before 1582, noop 9226 9226 0 10.8 92.3 1.0X +after 1582, rebase off 21201 21201 0 4.7 212.0 0.4X +after 1582, rebase on 56471 56471 0 1.8 564.7 0.2X +before 1582, rebase off 20179 20179 0 5.0 201.8 0.5X +before 1582, rebase on 65717 65717 0 1.5 657.2 0.1X + +OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1058-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Load dates from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +after 1582, vec off, rebase off 12294 12434 205 8.1 122.9 1.0X +after 1582, vec off, rebase on 36959 36967 12 2.7 369.6 0.3X +after 1582, vec on, rebase off 3644 3691 49 27.4 36.4 3.4X +after 1582, vec on, rebase on 26764 26852 92 3.7 267.6 0.5X +before 1582, vec off, rebase off 12830 12917 85 7.8 128.3 1.0X +before 1582, vec off, rebase on 38897 39053 229 2.6 389.0 0.3X +before 1582, vec on, rebase off 3638 3693 85 27.5 36.4 3.4X +before 1582, vec on, rebase on 28956 29007 44 3.5 289.6 0.4X + +OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1058-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Save timestamps to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +after 1582, noop 2952 2952 0 33.9 29.5 1.0X +before 1582, noop 2880 2880 0 34.7 28.8 1.0X +after 1582, rebase off 15928 15928 0 6.3 159.3 0.2X +after 1582, rebase on 82816 82816 0 1.2 828.2 0.0X +before 1582, rebase off 15988 15988 0 6.3 159.9 0.2X +before 1582, rebase on 92636 92636 0 1.1 926.4 0.0X + +OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1058-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Load timestamps from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +after 1582, vec off, rebase off 14863 14917 78 6.7 148.6 1.0X +after 1582, vec off, rebase on 54819 54939 140 1.8 548.2 0.3X +after 1582, vec on, rebase off 4905 4941 32 20.4 49.0 3.0X +after 1582, vec on, rebase on 44914 45008 124 2.2 449.1 0.3X +before 1582, vec off, rebase off 14928 14970 48 6.7 149.3 1.0X +before 1582, vec off, rebase on 59752 59996 245 1.7 597.5 0.2X +before 1582, vec on, rebase off 4892 4916 33 20.4 48.9 3.0X +before 1582, vec on, rebase on 46854 46977 198 2.1 468.5 0.3X + + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala new file mode 100644 index 0000000..983d9b4 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import java.time.{LocalDate, LocalDateTime, LocalTime, ZoneOffset} + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.util.DateTimeConstants.SECONDS_PER_DAY +import org.apache.spark.sql.internal.SQLConf + +/** + * Synthetic benchmark for rebasing of date and timestamp in read/write. + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class <this class> --jars <spark core test jar> <sql core test jar> + * 2. build/sbt "sql/test:runMain <this class>" + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>" + * Results will be written to "benchmarks/DateTimeRebaseBenchmark-results.txt". + * }}} + */ +object DateTimeRebaseBenchmark extends SqlBasedBenchmark { + import spark.implicits._ + + private def genTs(cardinality: Int, start: LocalDateTime, end: LocalDateTime): DataFrame = { + val startSec = start.toEpochSecond(ZoneOffset.UTC) + val endSec = end.toEpochSecond(ZoneOffset.UTC) + spark.range(0, cardinality, 1, 1) + .select((($"id" % (endSec - startSec)) + startSec).as("seconds")) + .select($"seconds".cast("timestamp").as("ts")) + } + + private def genTsAfter1582(cardinality: Int): DataFrame = { + val start = LocalDateTime.of(1582, 10, 15, 0, 0, 0) + val end = LocalDateTime.of(3000, 1, 1, 0, 0, 0) + genTs(cardinality, start, end) + } + + private def genTsBefore1582(cardinality: Int): DataFrame = { + val start = LocalDateTime.of(10, 1, 1, 0, 0, 0) + val end = LocalDateTime.of(1580, 1, 1, 0, 0, 0) + genTs(cardinality, start, end) + } + + private def genDate(cardinality: Int, start: LocalDate, end: LocalDate): DataFrame = { + val startSec = LocalDateTime.of(start, LocalTime.MIDNIGHT).toEpochSecond(ZoneOffset.UTC) + val endSec = LocalDateTime.of(end, LocalTime.MIDNIGHT).toEpochSecond(ZoneOffset.UTC) + spark.range(0, cardinality * SECONDS_PER_DAY, SECONDS_PER_DAY, 1) + .select((($"id" % (endSec - startSec)) + startSec).as("seconds")) + .select($"seconds".cast("timestamp").as("ts")) + .select($"ts".cast("date").as("date")) + } + + private def genDateAfter1582(cardinality: Int): DataFrame = { + val start = LocalDate.of(1582, 10, 15) + val end = LocalDate.of(3000, 1, 1) + genDate(cardinality, start, end) + } + + private def genDateBefore1582(cardinality: Int): DataFrame = { + val start = LocalDate.of(10, 1, 1) + val end = LocalDate.of(1580, 1, 1) + genDate(cardinality, start, end) + } + + private def genDF(cardinality: Int, dateTime: String, after1582: Boolean): DataFrame = { + (dateTime, after1582) match { + case ("date", true) => genDateAfter1582(cardinality) + case ("date", false) => genDateBefore1582(cardinality) + case ("timestamp", true) => genTsAfter1582(cardinality) + case ("timestamp", false) => genTsBefore1582(cardinality) + case _ => throw new IllegalArgumentException( + s"cardinality = $cardinality dateTime = $dateTime after1582 = $after1582") + } + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + withTempPath { path => + runBenchmark("Rebasing dates/timestamps in Parquet datasource") { + val rowsNum = 100000000 + Seq("date", "timestamp").foreach { dateTime => + val benchmark = new Benchmark(s"Save ${dateTime}s to parquet", rowsNum, output = output) + benchmark.addCase("after 1582, noop", 1) { _ => + genDF(rowsNum, dateTime, after1582 = true).noop() + } + benchmark.addCase("before 1582, noop", 1) { _ => + genDF(rowsNum, dateTime, after1582 = false).noop() + } + + def save(after1582: Boolean, rebase: Boolean): Unit = { + val period = if (after1582) "after" else "before" + val rebaseFlag = if (rebase) "on" else "off" + val caseName = s"$period 1582, rebase $rebaseFlag" + benchmark.addCase(caseName, 1) { _ => + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> rebase.toString) { + val df = genDF(rowsNum, dateTime, after1582) + val pathToWrite = path.getAbsolutePath + s"/${dateTime}_${period}_1582_$rebaseFlag" + df.write + .mode("overwrite") + .format("parquet") + .save(pathToWrite) + } + } + } + + Seq(true, false).foreach { after1582 => + Seq(false, true).foreach { rebase => + save(after1582, rebase) + } + } + benchmark.run() + + val benchmark2 = new Benchmark( + s"Load ${dateTime}s from parquet", rowsNum, output = output) + + def load(after1582: Boolean, vec: Boolean, rebase: Boolean): Unit = { + val period = if (after1582) "after" else "before" + val rebaseFlag = if (rebase) "on" else "off" + val vecFlag = if (vec) "on" else "off" + val caseName = s"$period 1582, vec $vecFlag, rebase $rebaseFlag" + benchmark2.addCase(caseName, 3) { _ => + withSQLConf( + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vec.toString, + SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> rebase.toString) { + val pathToRead = path.getAbsolutePath + s"/${dateTime}_${period}_1582_$rebaseFlag" + spark.read.format("parquet").load(pathToRead).noop() + } + } + } + + Seq(true, false).foreach { after1582 => + Seq(false, true).foreach { vec => + Seq(false, true).foreach { rebase => + load(after1582, vec, rebase) + } + } + } + + benchmark2.run() + } + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org