flatMapGroupsWithState not timing out (spark 2.2.1)
Hi, I’m attempting to leverage flatMapGroupsWithState to handle some arbitrary aggregations and am noticing a couple of things: - *ProcessingTimeTimeout* + *setTimeoutDuration* timeout not being honored - *EventTimeTimeout* + watermark value not being honored. - *EventTimeTimeout* + *setTimeoutTimestamp* not being honored I’ve come to this conclusion due to never hitting a conditional check (with log output) for the *hasTimedOut* property. Each of these scenarios was tested in isolation from each other and all three exhibited the same behavior — failure to reach a timeout event, and Spark induced huge duration between batches. The test was 2000 messages read from a Kafka topic with two distinct groups (1000 messages / group). To give an idea of what I’m attempting to do: aggregate all events into a single bucket given some timeout expiry. Also, it should be noted, in this example I’m attempting to get the *final* value of the GroupState object as its timedout. This is why I attempt to do a second pass on the timeout — but that doesn’t really matter as I’m not even getting the timeout event. My code is here: val stream = reader .load() .selectExpr( "CAST(key AS STRING)", "topic", "CAST(value AS BINARY)", "timestamp" ) .as[KafkaLoadType].map(el => getJacksonReader(classOf[Data]).readValue[Data](new String(el._3))) .withWatermark("when", "10 seconds") .groupByKey(f => (f.name, f.when)) .flatMapGroupsWithState[SessionInfo, Result](OutputMode.Append, GroupStateTimeout.EventTimeTimeout()) { case ((name, when), events: Iterator[Data], state: GroupState[SessionInfo]) => { state.setTimeoutTimestamp(DateTime.now.plusMinutes(1).getMillis) info("Starting flatMapGroupsWithState func") val asList = events.toList info(s"${name} iterator size: ${asList.size}") if (state.exists) { info(s"State exists: ${state.get}") } var session = state.getOption.getOrElse(SessionInfo.zero(when, name)) asList.foreach(e => { session = session.add(e.value) }) info(s"Updating value to ${session}") state.update(session) val result = if (state.hasTimedOut && !state.get.finalized) { info("State has timedout ... finalizing") state.update(state.get.copy(finalized = true)) Iterator(Option(state.get).map(r => Result(r.when, r.name, r.value)).get) } else if (state.hasTimedOut && state.get.finalized) { info("State has timedout AND is finalized") val r = state.get state.remove() Iterator(Option(r).map(r => Result(r.when, r.name, r.value)).get) } else { val result = state.get info(s"Returning ${result}") // state.remove() Iterator(Option(result).map(r => Result(r.when, r.name, r.value)).get) } info("Exiting flatMapGroupsWithState func") result } }.writeStream.trigger(Trigger.ProcessingTime(500)) .format("console").option("truncate", false) .outputMode(OutputMode.Append) .start() Thanks for any help. dan
Spark 2.2 structured streaming with mapGroupsWithState + window functions
Hi all, I've been looking heavily into Spark 2.2 to solve a problem I have by specifically using mapGroupsWithState. What I've discovered is that a *groupBy(window(..))* does not work when being used with a subsequent *mapGroupsWithState* and produces an AnalysisException of : *"mapGroupsWithState is not supported with aggregation on a streaming DataFrame/Dataset;;"* I have http logs that have been rolled up via a previous jobs window function in the form of: {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"}, "account": "A", "verb": "GET","statusCode": 500, "eventCount": 10} {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"}, "account": "A", "verb": "GET","statusCode": 200, "eventCount": 89} In this data the *when* sub-object is of one minute blocks. I'd lock to use a *window* function to aggregate that to 10 minute windows and sum the eventCount by grouping on account, verb, and statusCode. From there I'd like to *mapGroupsWithState* for each *account* and *verb* to produce buckets for some configurable window, say 10 minutes for example's sake, of the form: {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:20"}, "account": "A", "verb": "GET", "totalRequests": 999, "totalErrors": 198} *mapGroupsWithState* is perfect for this but, as stated, I've not found a way to apply a window function *and* use the mapsGroupsWithState. Example: ds.withColumn("bucket", $"when.from") .withWatermark("bucket", "1 minutes") .groupBy(window($"bucket", "10 minutes"), -- buckets and sums smaller windowed events into a rolled up larger window event with summed eventCount $"account", $"verb", $"statusCode") .agg( sum($"eventCount") ) .map(r => Log()) .groupByKey(l => (l.when, l.account, l.verb)) -- maps .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout -- will calculate totalErrors / totalRequests per bucket .EventTimeTimeout()) { case ((when: Window, account: String, verb: String), events: Iterator[Log], state: GroupState[SessionInfo]) => { .. } } Any suggestions would be greatly appreciated. I've also noticed that *groupByKey().reduceGroups()* does not work with *mapGroupsWithState *which is another strategy that I've tried. Thanks. dan