I was trying to find a way to resessionize features in different events based
on the event timestamps using Spark and I found a code example that uses
mapGroupsWithStateto resessionize events using processing timestamps in
their repo.
https://github.com/apache/spark/blob/v2.3.0/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
To quickly test if this sessionization thing works with event timestamps, I
added withWatermark("timestamp", "10 seconds") (treating processing time as
the event timestamp) and changed ProcessingTimeTimeout to EventTimeTimeout.
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", value = true)
.load()
// Split the lines into words, treat words as sessionId of events
val events = lines
.withWatermark("timestamp", "10 seconds") // added
.as[(String, Timestamp)]
.flatMap { case (line, timestamp) =>
line.split(" ").map(word => Event(sessionId = word, timestamp))
}
val sessionUpdates = events
.groupByKey(event => event.sessionId)
.mapGroupsWithState[SessionInfo,
SessionUpdate].(GroupStateTimeout.EventTimeTimeout) {
...
}
// Start running the query that prints the session updates to the console
val query = sessionUpdates
.writeStream
.outputMode("update")
.format("console")
.start()
query.awaitTermination()
However,when I ran it, Spark threw org.apache.spark.sql.AnalysisException
and said that Watermark must be specified in the query using
'[Dataset/DataFrame].withWatermark()' for using event-time timeout in a
[map|flatMap]GroupsWithState. Event-time timeout not supported without
watermark, which is not true and confusing, because that 'timestamp' column
is clearly in the physical plan following that exception message:
...
+- EventTimeWatermark timestamp#3: timestamp, interval 10 seconds
+- StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@394a6d2b,socket,List(),...,
[value#2, timestamp#3]
Did I miss something or did something wrong?
Thanks in advance!
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org