This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 87d4eb609d7 [SPARK-39347][SS] Bug fix for time window calculation when event time < 0 87d4eb609d7 is described below commit 87d4eb609d7370d79f42cbcc5985c4fede1781d6 Author: Wei Liu <wei....@databricks.com> AuthorDate: Mon Feb 6 11:47:27 2023 +0900 [SPARK-39347][SS] Bug fix for time window calculation when event time < 0 ### What changes were proposed in this pull request? I tried to understand what was introduced in https://github.com/apache/spark/pull/36737 and made the code more readable and added some test. Many thanks to nyingping! The change in https://github.com/apache/spark/pull/35362 brought a bug when the `timestamp` is less than 0, i.e. before `1970-01-01 00:00:00 UTC`. Then for some windows, spark returns a wrong `windowStart` time. The root cause of this bug is how the module operator(%) works with negative number. For example, ``` scala> 1 % 3 res0: Int = 1 scala> -1 % 3 res1: Int = -1 // Mathematically it should be 2 here ``` This lead to a wrong calculation result of `windowStart`. For a concrete example: ``` * Example calculation: * For simplicity assume windowDuration = slideDuration. * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x | * | |----l1 ----|---- l2 -----| * lastStart timestamp lastStartWrong * Normally when timestamp > startTime (or equally remainder > 0), we get * l1 = remainder = (timestamp - startTime) % slideDuration, lastStart = timeStamp - remainder * However, when timestamp < startTime (or equally remainder < 0), the value of remainder is * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong. * So we need to subtract a slideDuration. ``` ### Why are the changes needed? This is a bug fix. Example from the original PR https://github.com/apache/spark/pull/36737: Here df3 and df4 has time before 1970, so timestamp < 0. ``` val df3 = Seq( ("1969-12-31 00:00:02", 1), ("1969-12-31 00:00:12", 2)).toDF("time", "value") val df4 = Seq( (LocalDateTime.parse("1969-12-31T00:00:02"), 1), (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value") Seq(df3, df4).foreach { df => checkAnswer( df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") .orderBy($"window.start".asc) .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), Seq( Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1), Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2)) ) } ``` Without the change this would error with: ``` == Results == !== Correct Answer - 2 == == Spark Answer - 2 == !struct<> struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int> ![1969-12-30 23:59:55,1969-12-31 00:00:05,1] [1969-12-31 00:00:05,1969-12-31 00:00:15,1] ![1969-12-31 00:00:05,1969-12-31 00:00:15,2] [1969-12-31 00:00:15,1969-12-31 00:00:25,2] ``` Notice how this is shifted with one `slideDuration`. It should start with `[1969-12-30 23:59:55,1969-12-31 00:00:05,1]` but spark returns `[1969-12-31 00:00:05,1969-12-31 00:00:15,1]`, right-shifted of one `slideDuration` (10 seconds). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. ### Benchmark results: 1. Burak's original Implementation ``` [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] burak version 10 17 14 962.7 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: burak version [info] Stopped after 16 iterations, 10604 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] burak version 646 663 19 15.5 64.6 1.0X ``` 2. Current implementation (buggy) ``` [info] Running benchmark: tumbling windows [info] Running case: current - buggy [info] Stopped after 637 iterations, 10008 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] current - buggy 10 16 12 1042.7 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: current - buggy [info] Stopped after 16 iterations, 10143 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] current - buggy 617 634 10 16.2 61.7 1.0X ``` 3. Purposed change in this PR: ``` [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] purposed change 10 16 11 981.2 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: purposed change [info] Stopped after 18 iterations, 10122 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] purposed change 548 562 19 18.3 54.8 1.0X ``` Note that I run them separately, because I found that if you run these tests sequentially, the later one will always get a performance gain. I think the computer is doing some optimizations. Closes #39843 from WweiL/SPARK-38069-time-window-fix. Lead-authored-by: Wei Liu <wei....@databricks.com> Co-authored-by: nieyingping <nieyingp...@alphadata.com.cn> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/catalyst/analysis/ResolveTimeWindows.scala | 10 +++--- .../spark/sql/DataFrameTimeWindowingSuite.scala | 36 ++++++++++++++++++++++ 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala index 6378f4eedd3..f3fc6c9e9db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, LessThan, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime} import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, TIME_WINDOW, WINDOW_TIME} @@ -49,7 +49,8 @@ object TimeWindowing extends Rule[LogicalPlan] { * The windows are calculated as below: * maxNumOverlapping <- ceil(windowDuration / slideDuration) * for (i <- 0 until maxNumOverlapping) - * lastStart <- timestamp - (timestamp - startTime + slideDuration) % slideDuration + * remainder <- (timestamp - startTime) % slideDuration + * lastStart <- timestamp - ((remainder < 0) ? remainder + slideDuration : remainder) * windowStart <- lastStart - i * slideDuration * windowEnd <- windowStart + windowDuration * return windowStart, windowEnd @@ -103,8 +104,9 @@ object TimeWindowing extends Rule[LogicalPlan] { def getWindow(i: Int, dataType: DataType): Expression = { val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType) - val lastStart = timestamp - (timestamp - window.startTime - + window.slideDuration) % window.slideDuration + val remainder = (timestamp - window.startTime) % window.slideDuration + val lastStart = timestamp - CaseWhen(Seq((LessThan(remainder, 0), + remainder + window.slideDuration)), Some(remainder)) val windowStart = lastStart - i * window.slideDuration val windowEnd = windowStart + window.windowDuration diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index 0bbb9460feb..367cdbe8447 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -314,6 +314,42 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2)) ) } + + val df3 = Seq( + ("1969-12-31 00:00:02", 1), + ("1969-12-31 00:00:12", 2)).toDF("time", "value") + val df4 = Seq( + (LocalDateTime.parse("1969-12-31T00:00:02"), 1), + (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value") + + Seq(df3, df4).foreach { df => + checkAnswer( + df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") + .orderBy($"window.start".asc) + .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), + Seq( + Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1), + Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2)) + ) + } + + val df5 = Seq( + ("1968-12-31 00:00:02", 1), + ("1968-12-31 00:00:12", 2)).toDF("time", "value") + val df6 = Seq( + (LocalDateTime.parse("1968-12-31T00:00:02"), 1), + (LocalDateTime.parse("1968-12-31T00:00:12"), 2)).toDF("time", "value") + + Seq(df5, df6).foreach { df => + checkAnswer( + df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") + .orderBy($"window.start".asc) + .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), + Seq( + Row("1968-12-30 23:59:55", "1968-12-31 00:00:05", 1), + Row("1968-12-31 00:00:05", "1968-12-31 00:00:15", 2)) + ) + } } test("multiple time windows in a single operator throws nice exception") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org