HeartSaVioR commented on a change in pull request #33689:
URL: https://github.com/apache/spark/pull/33689#discussion_r685670238
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSessionWindowSuite.scala
##########
@@ -75,21 +76,7 @@ class StreamingSessionWindowSuite extends StreamTest
// always Unix timestamp 0
val inputData = MemoryStream[(String, Long)]
-
- // Split the lines into words, treat words as sessionId of events
- val events = inputData.toDF()
- .select($"_1".as("value"), $"_2".as("timestamp"))
- .withColumn("eventTime", $"timestamp".cast("timestamp"))
- .selectExpr("explode(split(value, ' ')) AS sessionId", "eventTime")
-
- val sessionUpdates = events
- .groupBy(session_window($"eventTime", "10 seconds") as 'session,
'sessionId)
- .agg(count("*").as("numEvents"))
- .selectExpr("sessionId", "CAST(session.start AS LONG)",
"CAST(session.end AS LONG)",
- "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS
durationMs",
- "numEvents")
-
- sessionUpdates.explain()
+ val sessionUpdates = sessionWindowQuery(inputData)
Review comment:
The most changes on this suite are about deduplication on queries. We
can simply use two queries (keyed window vs global window) regardless of output
mode.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]