In Spark 1.6 GroupedDataset
<http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedDataset>
has
mapGroups, which sounds like what you are looking for.  You can also write
a custom Aggregator
<https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html>

On Tue, Jan 5, 2016 at 8:14 PM, Abhishek Gayakwad <a.gayak...@gmail.com>
wrote:

> Hello Hivemind,
>
> Referring to this thread -
> https://forums.databricks.com/questions/956/how-do-i-group-my-dataset-by-a-key-or-combination.html.
> I have learnt that we can not do much with groupped data apart from using
> existing aggregate functions. This blog post was written in may 2015, I
> don't know if things are changes from that point of time. I am using 1.4
> version of spark.
>
> What I am trying to achieve is something very similar to collectset in
> hive (actually unique ordered concated values.) e.g.
>
> 1,2
> 1,3
> 2,4
> 2,5
> 2,4
>
> to
> 1, "2,3"
> 2, "4,5"
>
> Currently I am achieving this by converting dataframe to RDD, do the
> required operations and convert it back to dataframe as shown below.
>
> public class AvailableSizes implements Serializable {
>
>     public DataFrame calculate(SQLContext ssc, DataFrame salesDataFrame) {
>         final JavaRDD<Row> rowJavaRDD = salesDataFrame.toJavaRDD();
>
>         JavaPairRDD<String, Row> pairs = rowJavaRDD.mapToPair(
>                 (PairFunction<Row, String, Row>) row -> {
>                     final Object[] objects = {row.getAs(0), row.getAs(1), 
> row.getAs(3)};
>                     return new Tuple2<>(row.getAs(SalesColumns.STYLE.name()), 
> new GenericRowWithSchema(objects, SalesColumns.getOutputSchema()));
>                 });
>
>         JavaPairRDD<String, Row> withSizeList = pairs.reduceByKey(new 
> Function2<Row, Row, Row>() {
>             @Override
>             public Row call(Row aRow, Row bRow) {
>                 final String uniqueCommaSeparatedSizes = uniqueSizes(aRow, 
> bRow);
>                 final Object[] objects = {aRow.getAs(0), aRow.getAs(1), 
> uniqueCommaSeparatedSizes};
>                 return new GenericRowWithSchema(objects, 
> SalesColumns.getOutputSchema());
>             }
>
>             private String uniqueSizes(Row aRow, Row bRow) {
>                 final SortedSet<String> allSizes = new TreeSet<>();
>                 final List<String> aSizes = Arrays.asList(((String) 
> aRow.getAs(String.valueOf(SalesColumns.SIZE))).split(","));
>                 final List<String> bSizes = Arrays.asList(((String) 
> bRow.getAs(String.valueOf(SalesColumns.SIZE))).split(","));
>                 allSizes.addAll(aSizes);
>                 allSizes.addAll(bSizes);
>                 return csvFormat(allSizes);
>             }
>         });
>
>         final JavaRDD<Row> values = withSizeList.values();
>
>         return ssc.createDataFrame(values, SalesColumns.getOutputSchema());
>
>     }
>
>     public String csvFormat(Collection<String> collection) {
>         return 
> collection.stream().map(Object::toString).collect(Collectors.joining(","));
>     }
> }
>
> Please suggest if there is a better way of doing this.
>
> Regards,
> Abhishek
>

Reply via email to