GitHub user marmbrus opened a pull request:

    https://github.com/apache/spark/pull/9673

    [SPARK-11654][SQL] add reduce to GroupedDataset

    This PR adds a new method, `reduce`, to `GroupedDataset`, which allows 
similar operations to `reduceByKey` on a traditional `PairRDD`.
    
    ```scala
    val ds = Seq("abc", "xyz", "hello").toDS()
    ds.groupBy(_.length).reduce(_ + _).collect()  // not actually commutative :P
    
    res0: Array(3 -> "abcxyz", 5 -> "hello")
    ```
    
    While implementing this method and its test cases several more deficiencies 
were found in our encoder handling.  Specifically, in order to support 
positional resolution, named resolution and tuple composition, it is important 
to keep the unresolved encoder around and to use it when constructing new 
`Datasets` with the same object type but different output attributes.  We now 
divide the encoder lifecycle into three phases (that mirror the lifecycle of 
standard expressions) and have checks at various boundaries:
    
     - Unresoved Encoders: all users facing encoders (those constructed by 
implicits, static methods, or tuple composition) are unresolved, meaning they 
have only `UnresolvedAttributes` for named fields and `BoundReferences` for 
fields accessed by ordinal.
     - Resolved Encoders: internal to a `[Grouped]Dataset` the encoder is 
resolved, meaning all input has been resolved to a specific 
`AttributeReference`.  Any encoders that are placed into a logical plan for use 
in object construction should be resolved.
     - BoundEncoder: Are constructed by physical plans, right before actual 
conversion from row -> object is performed.
    
    It is left to future work to add explicit checks for resolution and provide 
good error messages when it fails.  We might also consider enforcing the above 
constraints in the type system (i.e. `fromRow` only exists on a 
`ResolvedEncoder`), but we should probably wait before spending too much time 
on this.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/marmbrus/spark pr/9628

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/9673.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #9673
    
----
commit 7504c6790bf9bad143bce9f259e1ce98a5b40043
Author: Wenchen Fan <[email protected]>
Date:   2015-11-11T13:43:07Z

    add reduce to GroupedDataset

commit 2f1fa786725c53b8892fcd0c754cc1a70132d283
Author: Michael Armbrust <[email protected]>
Date:   2015-11-11T20:24:38Z

    Merge remote-tracking branch 'origin/master' into pr/9628

commit e58a0c91d90ee3780939a9a3e9fd37b11f7557a2
Author: Michael Armbrust <[email protected]>
Date:   2015-11-12T02:18:11Z

    Working finally

commit ab2dbb9fcd1146e5fdb22a595a2d1722805291a4
Author: Michael Armbrust <[email protected]>
Date:   2015-11-12T08:51:36Z

    some cleanup

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to