GitHub user marmbrus opened a pull request:
https://github.com/apache/spark/pull/9673
[SPARK-11654][SQL] add reduce to GroupedDataset
This PR adds a new method, `reduce`, to `GroupedDataset`, which allows
similar operations to `reduceByKey` on a traditional `PairRDD`.
```scala
val ds = Seq("abc", "xyz", "hello").toDS()
ds.groupBy(_.length).reduce(_ + _).collect() // not actually commutative :P
res0: Array(3 -> "abcxyz", 5 -> "hello")
```
While implementing this method and its test cases several more deficiencies
were found in our encoder handling. Specifically, in order to support
positional resolution, named resolution and tuple composition, it is important
to keep the unresolved encoder around and to use it when constructing new
`Datasets` with the same object type but different output attributes. We now
divide the encoder lifecycle into three phases (that mirror the lifecycle of
standard expressions) and have checks at various boundaries:
- Unresoved Encoders: all users facing encoders (those constructed by
implicits, static methods, or tuple composition) are unresolved, meaning they
have only `UnresolvedAttributes` for named fields and `BoundReferences` for
fields accessed by ordinal.
- Resolved Encoders: internal to a `[Grouped]Dataset` the encoder is
resolved, meaning all input has been resolved to a specific
`AttributeReference`. Any encoders that are placed into a logical plan for use
in object construction should be resolved.
- BoundEncoder: Are constructed by physical plans, right before actual
conversion from row -> object is performed.
It is left to future work to add explicit checks for resolution and provide
good error messages when it fails. We might also consider enforcing the above
constraints in the type system (i.e. `fromRow` only exists on a
`ResolvedEncoder`), but we should probably wait before spending too much time
on this.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/marmbrus/spark pr/9628
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/9673.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #9673
----
commit 7504c6790bf9bad143bce9f259e1ce98a5b40043
Author: Wenchen Fan <[email protected]>
Date: 2015-11-11T13:43:07Z
add reduce to GroupedDataset
commit 2f1fa786725c53b8892fcd0c754cc1a70132d283
Author: Michael Armbrust <[email protected]>
Date: 2015-11-11T20:24:38Z
Merge remote-tracking branch 'origin/master' into pr/9628
commit e58a0c91d90ee3780939a9a3e9fd37b11f7557a2
Author: Michael Armbrust <[email protected]>
Date: 2015-11-12T02:18:11Z
Working finally
commit ab2dbb9fcd1146e5fdb22a595a2d1722805291a4
Author: Michael Armbrust <[email protected]>
Date: 2015-11-12T08:51:36Z
some cleanup
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]