GitHub user marmbrus opened a pull request:
https://github.com/apache/spark/pull/12048
[SPARK-14255] [SQL] Streaming Aggregation
This PR adds the ability to perform aggregations inside of a
`ContinuousQuery`. In order to implement this feature, the planning of
aggregation has augmented with a new `StatefulAggregationStrategy`. Unlike
batch aggregation, stateful-aggregation uses the `StateStore` (introduced in
#11645) to persist the results of partial aggregation across different
invocations. The resulting physical plan performs the aggregation using the
following progression:
- Partial Aggregation
- Shuffle
- Partial Merge (now there is at most 1 tuple per group)
- StateStoreRestore (now there is 1 tuple from this batch + optionally
one from the previous)
- Partial Merge (now there is at most 1 tuple per group)
- StateStoreSave (saves the tuple for the next batch)
- Complete (output the current result of the aggregation)
The following refactoring was also performed to allow us to plug into
existing code:
- The logic for breaking down and de-duping the physical execution of
aggregation has been move into a new pattern `PhysicalAggregation`
- The `AttributeReference` used to identify the result of an
`AggregateFunction` as been moved into the `AggregateExpression` container.
This change moves the reference into the same object as the other intermediate
references used in aggregation and eliminates the need to pass around a
`Map[(AggregateFunction, Boolean), Attribute]`. Further clean up (using a
different aggregation container for logical/physical plans) is deferred to a
followup.
- Some planning logic is moved from the `SessionState` into the
`QueryExecution` to make it easier to override in the streaming case.
- The ability to write a `StreamTest` that checks only the output of the
last batch has been added to simulate the future addition of output modes.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/marmbrus/spark statefulAgg
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/12048.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #12048
----
commit 00f059d385ca48989a003809a12dab88fdae3371
Author: Michael Armbrust <[email protected]>
Date: 2016-03-25T00:36:43Z
Basic Stateful Agg
commit 17a829bf82ef929d5363561ef8645bfe54a3dec8
Author: Michael Armbrust <[email protected]>
Date: 2016-03-29T01:25:11Z
add testing for delta queries
commit b4fed4ae1516b2831b05ab44a8e60ae49a760f97
Author: Michael Armbrust <[email protected]>
Date: 2016-03-29T01:25:22Z
aggregation refactoring
commit d6f7941172c3ac6adb9b2aa767504388e1186826
Author: Michael Armbrust <[email protected]>
Date: 2016-03-29T17:13:54Z
plug state managment into the query execution
commit 7a1969237145137fefa54869131a14013410bae8
Author: Michael Armbrust <[email protected]>
Date: 2016-03-29T18:22:59Z
Add get/put to StateStore
commit f7bb8d2475fd5f12c65f1dd37315e7ff8d47e7bb
Author: Michael Armbrust <[email protected]>
Date: 2016-03-29T18:23:08Z
cleanup and refactoring
commit b46c325fb5418356154abaa692d79e8002492cb7
Author: Michael Armbrust <[email protected]>
Date: 2016-03-29T21:59:32Z
cleanup
commit 1ef82c6ec4620ee3b13dab974d0f8325b3a8e545
Author: Michael Armbrust <[email protected]>
Date: 2016-03-29T22:43:18Z
Merge remote-tracking branch 'apache/master' into statefulAgg
Conflicts:
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
commit 7a5e0ae1a8da4b589cf768dda826480095a1f3fb
Author: Michael Armbrust <[email protected]>
Date: 2016-03-29T22:45:50Z
style
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]