Re: Spark can't identify the event time column being supplied to withWatermark()

2018-06-08 Thread frankdede
You are exactly right! A few hours ago, I tried many things and finally got
the example working by defining event timestamp column before groupByKey,
just like what you suggested, but I wasn't able to figure out the reasoning
behind my fix.

val sessionUpdates = events
  .withWatermark("timestamp", "10 seconds")
  .groupByKey(event => event.sessionId)

It turns out that it's just impossible for the planner to figure out the
source of the watermark column after applied flatMap.

Thanks Tathagata!

Sent from:

To unsubscribe e-mail:

Spark can't identify the event time column being supplied to withWatermark()

2018-06-08 Thread frankdede
I was trying to find a way to resessionize features in different events based
on the event timestamps using Spark and I found a code example that uses
mapGroupsWithStateto resessionize events using processing timestamps in
their repo.

To quickly test if this sessionization thing works with event timestamps, I
added withWatermark("timestamp", "10 seconds") (treating processing time as
the event timestamp) and changed ProcessingTimeTimeout to EventTimeTimeout.

  val lines = spark.readStream
  .option("host", host)
  .option("port", port)
  .option("includeTimestamp", value = true)

 // Split the lines into words, treat words as sessionId of events
 val events = lines
  .withWatermark("timestamp", "10 seconds") // added
  .as[(String, Timestamp)]
  .flatMap { case (line, timestamp) =>
line.split(" ").map(word => Event(sessionId = word, timestamp))

 val sessionUpdates = events
  .groupByKey(event => event.sessionId)
SessionUpdate].(GroupStateTimeout.EventTimeTimeout) {

  // Start running the query that prints the session updates to the console
 val query = sessionUpdates

However,when I ran it, Spark threw org.apache.spark.sql.AnalysisException
and said that Watermark must be specified in the query using
'[Dataset/DataFrame].withWatermark()' for using event-time timeout in a
[map|flatMap]GroupsWithState. Event-time timeout not supported without
watermark, which is not true and confusing, because that 'timestamp' column
is clearly in the physical plan following that exception message:

+- EventTimeWatermark timestamp#3: timestamp, interval 10 seconds
   +- StreamingRelation
[value#2, timestamp#3]
Did I miss something or did something wrong?

Thanks in advance!

Sent from:

To unsubscribe e-mail: