flatMapGroupsWithState not timing out (spark 2.2.1)

2018-01-12 Thread daniel williams
Hi,

I’m attempting to leverage flatMapGroupsWithState to handle some arbitrary
aggregations and am noticing a couple of things:

   - *ProcessingTimeTimeout* + *setTimeoutDuration* timeout not being
   honored
   - *EventTimeTimeout* + watermark value not being honored.
   - *EventTimeTimeout* + *setTimeoutTimestamp* not being honored

I’ve come to this conclusion due to never hitting a conditional check (with
log output) for the *hasTimedOut* property. Each of these scenarios was
tested in isolation from each other and all three exhibited the same
behavior — failure to reach a timeout event, and Spark induced huge
duration between batches.

The test was 2000 messages read from a Kafka topic with two distinct groups
(1000 messages / group).

To give an idea of what I’m attempting to do: aggregate all events into a
single bucket given some timeout expiry.

Also, it should be noted, in this example I’m attempting to get the *final*
value of the GroupState object as its timedout. This is why I attempt to do
a second pass on the timeout — but that doesn’t really matter as I’m not
even getting the timeout event.

My code is here:

val stream = reader
  .load()
  .selectExpr(
"CAST(key AS STRING)",
"topic",
"CAST(value AS BINARY)",
"timestamp"
  )
  .as[KafkaLoadType].map(el =>
getJacksonReader(classOf[Data]).readValue[Data](new String(el._3)))
  .withWatermark("when", "10 seconds")
  .groupByKey(f => (f.name, f.when))
  .flatMapGroupsWithState[SessionInfo, Result](OutputMode.Append,
GroupStateTimeout.EventTimeTimeout()) {
  case ((name, when),
  events: Iterator[Data], state: GroupState[SessionInfo]) => {

state.setTimeoutTimestamp(DateTime.now.plusMinutes(1).getMillis)

info("Starting flatMapGroupsWithState func")

val asList = events.toList

info(s"${name} iterator size: ${asList.size}")

if (state.exists) {
  info(s"State exists: ${state.get}")
}

var session = state.getOption.getOrElse(SessionInfo.zero(when, name))

asList.foreach(e => {
  session = session.add(e.value)
})

info(s"Updating value to ${session}")

state.update(session)

val result = if (state.hasTimedOut && !state.get.finalized) {
  info("State has timedout ... finalizing")

  state.update(state.get.copy(finalized = true))

  Iterator(Option(state.get).map(r => Result(r.when, r.name,
r.value)).get)
} else if (state.hasTimedOut && state.get.finalized) {
  info("State has timedout AND is finalized")

  val r = state.get

  state.remove()

  Iterator(Option(r).map(r => Result(r.when, r.name, r.value)).get)
} else {
  val result = state.get

  info(s"Returning ${result}")

  //  state.remove()

  Iterator(Option(result).map(r => Result(r.when, r.name, r.value)).get)
}

info("Exiting flatMapGroupsWithState func")

result
  }
}.writeStream.trigger(Trigger.ProcessingTime(500))
  .format("console").option("truncate", false)
  .outputMode(OutputMode.Append)
  .start()

​



Thanks for any help.

dan


Spark 2.2 structured streaming with mapGroupsWithState + window functions

2017-08-28 Thread daniel williams
Hi all,

I've been looking heavily into Spark 2.2 to solve a problem I have by
specifically using mapGroupsWithState.  What I've discovered is that a
*groupBy(window(..))* does not work when being used with a subsequent
*mapGroupsWithState* and produces an AnalysisException of :

*"mapGroupsWithState is not supported with aggregation on a streaming
DataFrame/Dataset;;"*

I have http logs that have been rolled up via a previous jobs window
function in the form of:

{"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"},
"account": "A", "verb": "GET","statusCode": 500, "eventCount": 10}
{"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"},
"account": "A", "verb": "GET","statusCode": 200, "eventCount": 89}

In this data the *when* sub-object is of one minute blocks.  I'd lock to
use a *window* function to aggregate that to 10 minute windows and sum the
eventCount by grouping on account, verb, and statusCode.  From there I'd
like to *mapGroupsWithState* for each *account* and *verb* to produce
buckets for some configurable window, say 10 minutes for example's sake, of
the form:

{"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:20"},
"account": "A", "verb": "GET", "totalRequests": 999, "totalErrors": 198}

*mapGroupsWithState* is perfect for this but, as stated, I've not found a
way to apply a window function *and* use the mapsGroupsWithState.

Example:

ds.withColumn("bucket", $"when.from")

.withWatermark("bucket", "1 minutes")

.groupBy(window($"bucket", "10 minutes"), -- buckets and sums smaller
windowed events into a rolled up larger window event with summed eventCount

  $"account",

  $"verb",

  $"statusCode")

.agg(

  sum($"eventCount")

)

.map(r => Log())

.groupByKey(l => (l.when, l.account, l.verb)) -- maps

.mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout -- will
calculate totalErrors / totalRequests per bucket

   .EventTimeTimeout()) {

   case ((when: Window, account: String, verb: String),

 events: Iterator[Log],

 state: GroupState[SessionInfo]) => {

..

  }
}


Any suggestions would be greatly appreciated.

I've also noticed that *groupByKey().reduceGroups()* does not work
with *mapGroupsWithState
*which is another strategy that I've tried.

Thanks.

dan