In Spark 1.6 GroupedDataset <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedDataset> has mapGroups, which sounds like what you are looking for. You can also write a custom Aggregator <https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html>
On Tue, Jan 5, 2016 at 8:14 PM, Abhishek Gayakwad <a.gayak...@gmail.com> wrote: > Hello Hivemind, > > Referring to this thread - > https://forums.databricks.com/questions/956/how-do-i-group-my-dataset-by-a-key-or-combination.html. > I have learnt that we can not do much with groupped data apart from using > existing aggregate functions. This blog post was written in may 2015, I > don't know if things are changes from that point of time. I am using 1.4 > version of spark. > > What I am trying to achieve is something very similar to collectset in > hive (actually unique ordered concated values.) e.g. > > 1,2 > 1,3 > 2,4 > 2,5 > 2,4 > > to > 1, "2,3" > 2, "4,5" > > Currently I am achieving this by converting dataframe to RDD, do the > required operations and convert it back to dataframe as shown below. > > public class AvailableSizes implements Serializable { > > public DataFrame calculate(SQLContext ssc, DataFrame salesDataFrame) { > final JavaRDD<Row> rowJavaRDD = salesDataFrame.toJavaRDD(); > > JavaPairRDD<String, Row> pairs = rowJavaRDD.mapToPair( > (PairFunction<Row, String, Row>) row -> { > final Object[] objects = {row.getAs(0), row.getAs(1), > row.getAs(3)}; > return new Tuple2<>(row.getAs(SalesColumns.STYLE.name()), > new GenericRowWithSchema(objects, SalesColumns.getOutputSchema())); > }); > > JavaPairRDD<String, Row> withSizeList = pairs.reduceByKey(new > Function2<Row, Row, Row>() { > @Override > public Row call(Row aRow, Row bRow) { > final String uniqueCommaSeparatedSizes = uniqueSizes(aRow, > bRow); > final Object[] objects = {aRow.getAs(0), aRow.getAs(1), > uniqueCommaSeparatedSizes}; > return new GenericRowWithSchema(objects, > SalesColumns.getOutputSchema()); > } > > private String uniqueSizes(Row aRow, Row bRow) { > final SortedSet<String> allSizes = new TreeSet<>(); > final List<String> aSizes = Arrays.asList(((String) > aRow.getAs(String.valueOf(SalesColumns.SIZE))).split(",")); > final List<String> bSizes = Arrays.asList(((String) > bRow.getAs(String.valueOf(SalesColumns.SIZE))).split(",")); > allSizes.addAll(aSizes); > allSizes.addAll(bSizes); > return csvFormat(allSizes); > } > }); > > final JavaRDD<Row> values = withSizeList.values(); > > return ssc.createDataFrame(values, SalesColumns.getOutputSchema()); > > } > > public String csvFormat(Collection<String> collection) { > return > collection.stream().map(Object::toString).collect(Collectors.joining(",")); > } > } > > Please suggest if there is a better way of doing this. > > Regards, > Abhishek >