Thanks Michael for replying, Aggregator/UDAF is exactly what I am looking for, but are still on 1.4 and it's gonna take time to get 1.6.
On Wed, Jan 6, 2016 at 10:32 AM, Michael Armbrust <mich...@databricks.com> wrote: > In Spark 1.6 GroupedDataset > <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedDataset> > has > mapGroups, which sounds like what you are looking for. You can also write > a custom Aggregator > <https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html> > > On Tue, Jan 5, 2016 at 8:14 PM, Abhishek Gayakwad <a.gayak...@gmail.com> > wrote: > >> Hello Hivemind, >> >> Referring to this thread - >> https://forums.databricks.com/questions/956/how-do-i-group-my-dataset-by-a-key-or-combination.html. >> I have learnt that we can not do much with groupped data apart from using >> existing aggregate functions. This blog post was written in may 2015, I >> don't know if things are changes from that point of time. I am using 1.4 >> version of spark. >> >> What I am trying to achieve is something very similar to collectset in >> hive (actually unique ordered concated values.) e.g. >> >> 1,2 >> 1,3 >> 2,4 >> 2,5 >> 2,4 >> >> to >> 1, "2,3" >> 2, "4,5" >> >> Currently I am achieving this by converting dataframe to RDD, do the >> required operations and convert it back to dataframe as shown below. >> >> public class AvailableSizes implements Serializable { >> >> public DataFrame calculate(SQLContext ssc, DataFrame salesDataFrame) { >> final JavaRDD<Row> rowJavaRDD = salesDataFrame.toJavaRDD(); >> >> JavaPairRDD<String, Row> pairs = rowJavaRDD.mapToPair( >> (PairFunction<Row, String, Row>) row -> { >> final Object[] objects = {row.getAs(0), row.getAs(1), >> row.getAs(3)}; >> return new >> Tuple2<>(row.getAs(SalesColumns.STYLE.name()), new >> GenericRowWithSchema(objects, SalesColumns.getOutputSchema())); >> }); >> >> JavaPairRDD<String, Row> withSizeList = pairs.reduceByKey(new >> Function2<Row, Row, Row>() { >> @Override >> public Row call(Row aRow, Row bRow) { >> final String uniqueCommaSeparatedSizes = uniqueSizes(aRow, >> bRow); >> final Object[] objects = {aRow.getAs(0), aRow.getAs(1), >> uniqueCommaSeparatedSizes}; >> return new GenericRowWithSchema(objects, >> SalesColumns.getOutputSchema()); >> } >> >> private String uniqueSizes(Row aRow, Row bRow) { >> final SortedSet<String> allSizes = new TreeSet<>(); >> final List<String> aSizes = Arrays.asList(((String) >> aRow.getAs(String.valueOf(SalesColumns.SIZE))).split(",")); >> final List<String> bSizes = Arrays.asList(((String) >> bRow.getAs(String.valueOf(SalesColumns.SIZE))).split(",")); >> allSizes.addAll(aSizes); >> allSizes.addAll(bSizes); >> return csvFormat(allSizes); >> } >> }); >> >> final JavaRDD<Row> values = withSizeList.values(); >> >> return ssc.createDataFrame(values, SalesColumns.getOutputSchema()); >> >> } >> >> public String csvFormat(Collection<String> collection) { >> return >> collection.stream().map(Object::toString).collect(Collectors.joining(",")); >> } >> } >> >> Please suggest if there is a better way of doing this. >> >> Regards, >> Abhishek >> > >