Re: Spark 2.2 structured streaming with mapGroupsWithState + window functions
Hi Daniel, I am thinking you could use groupByKey & mapGroupsWithState to send whatever updates ("updated state") you want and then use .groupBy(window). will that work as expected? Thanks, Kant On Mon, Aug 28, 2017 at 7:06 AM, daniel williams wrote: > Hi all, > > I've been looking heavily into Spark 2.2 to solve a problem I have by > specifically using mapGroupsWithState. What I've discovered is that a > *groupBy(window(..))* does not work when being used with a subsequent > *mapGroupsWithState* and produces an AnalysisException of : > > *"mapGroupsWithState is not supported with aggregation on a streaming > DataFrame/Dataset;;"* > > I have http logs that have been rolled up via a previous jobs window > function in the form of: > > {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"}, > "account": "A", "verb": "GET","statusCode": 500, "eventCount": 10} > {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"}, > "account": "A", "verb": "GET","statusCode": 200, "eventCount": 89} > > In this data the *when* sub-object is of one minute blocks. I'd lock to > use a *window* function to aggregate that to 10 minute windows and sum > the eventCount by grouping on account, verb, and statusCode. From there > I'd like to *mapGroupsWithState* for each *account* and *verb* to produce > buckets for some configurable window, say 10 minutes for example's sake, of > the form: > > {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:20"}, > "account": "A", "verb": "GET", "totalRequests": 999, "totalErrors": 198} > > *mapGroupsWithState* is perfect for this but, as stated, I've not found a > way to apply a window function *and* use the mapsGroupsWithState. > > Example: > > ds.withColumn("bucket", $"when.from") > > .withWatermark("bucket", "1 minutes") > > .groupBy(window($"bucket", "10 minutes"), -- buckets and sums smaller > windowed events into a rolled up larger window event with summed eventCount > > $"account", > > $"verb", > > $"statusCode") > > .agg( > > sum($"eventCount") > > ) > > .map(r => Log()) > > .groupByKey(l => (l.when, l.account, l.verb)) -- maps > > .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout -- will > calculate totalErrors / totalRequests per bucket > >.EventTimeTimeout()) { > >case ((when: Window, account: String, verb: String), > > events: Iterator[Log], > > state: GroupState[SessionInfo]) => { > > .. > > } > } > > > Any suggestions would be greatly appreciated. > > I've also noticed that *groupByKey().reduceGroups()* does not work with > *mapGroupsWithState > *which is another strategy that I've tried. > > Thanks. > > dan >
Re: Spark 2.2 structured streaming with mapGroupsWithState + window functions
+1 Is this ticket related https://issues.apache.org/jira/browse/SPARK-21641 ? On Mon, Aug 28, 2017 at 7:06 AM, daniel williams wrote: > Hi all, > > I've been looking heavily into Spark 2.2 to solve a problem I have by > specifically using mapGroupsWithState. What I've discovered is that a > *groupBy(window(..))* does not work when being used with a subsequent > *mapGroupsWithState* and produces an AnalysisException of : > > *"mapGroupsWithState is not supported with aggregation on a streaming > DataFrame/Dataset;;"* > > I have http logs that have been rolled up via a previous jobs window > function in the form of: > > {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"}, > "account": "A", "verb": "GET","statusCode": 500, "eventCount": 10} > {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"}, > "account": "A", "verb": "GET","statusCode": 200, "eventCount": 89} > > In this data the *when* sub-object is of one minute blocks. I'd lock to > use a *window* function to aggregate that to 10 minute windows and sum > the eventCount by grouping on account, verb, and statusCode. From there > I'd like to *mapGroupsWithState* for each *account* and *verb* to produce > buckets for some configurable window, say 10 minutes for example's sake, of > the form: > > {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:20"}, > "account": "A", "verb": "GET", "totalRequests": 999, "totalErrors": 198} > > *mapGroupsWithState* is perfect for this but, as stated, I've not found a > way to apply a window function *and* use the mapsGroupsWithState. > > Example: > > ds.withColumn("bucket", $"when.from") > > .withWatermark("bucket", "1 minutes") > > .groupBy(window($"bucket", "10 minutes"), -- buckets and sums smaller > windowed events into a rolled up larger window event with summed eventCount > > $"account", > > $"verb", > > $"statusCode") > > .agg( > > sum($"eventCount") > > ) > > .map(r => Log()) > > .groupByKey(l => (l.when, l.account, l.verb)) -- maps > > .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout -- will > calculate totalErrors / totalRequests per bucket > >.EventTimeTimeout()) { > >case ((when: Window, account: String, verb: String), > > events: Iterator[Log], > > state: GroupState[SessionInfo]) => { > > .. > > } > } > > > Any suggestions would be greatly appreciated. > > I've also noticed that *groupByKey().reduceGroups()* does not work with > *mapGroupsWithState > *which is another strategy that I've tried. > > Thanks. > > dan >