Thanks Michael for replying, Aggregator/UDAF is exactly what I am looking
for, but are still on 1.4 and it's gonna take time to get 1.6.

On Wed, Jan 6, 2016 at 10:32 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> In Spark 1.6 GroupedDataset
> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedDataset>
>  has
> mapGroups, which sounds like what you are looking for.  You can also write
> a custom Aggregator
> <https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html>
>
> On Tue, Jan 5, 2016 at 8:14 PM, Abhishek Gayakwad <a.gayak...@gmail.com>
> wrote:
>
>> Hello Hivemind,
>>
>> Referring to this thread -
>> https://forums.databricks.com/questions/956/how-do-i-group-my-dataset-by-a-key-or-combination.html.
>> I have learnt that we can not do much with groupped data apart from using
>> existing aggregate functions. This blog post was written in may 2015, I
>> don't know if things are changes from that point of time. I am using 1.4
>> version of spark.
>>
>> What I am trying to achieve is something very similar to collectset in
>> hive (actually unique ordered concated values.) e.g.
>>
>> 1,2
>> 1,3
>> 2,4
>> 2,5
>> 2,4
>>
>> to
>> 1, "2,3"
>> 2, "4,5"
>>
>> Currently I am achieving this by converting dataframe to RDD, do the
>> required operations and convert it back to dataframe as shown below.
>>
>> public class AvailableSizes implements Serializable {
>>
>>     public DataFrame calculate(SQLContext ssc, DataFrame salesDataFrame) {
>>         final JavaRDD<Row> rowJavaRDD = salesDataFrame.toJavaRDD();
>>
>>         JavaPairRDD<String, Row> pairs = rowJavaRDD.mapToPair(
>>                 (PairFunction<Row, String, Row>) row -> {
>>                     final Object[] objects = {row.getAs(0), row.getAs(1), 
>> row.getAs(3)};
>>                     return new 
>> Tuple2<>(row.getAs(SalesColumns.STYLE.name()), new 
>> GenericRowWithSchema(objects, SalesColumns.getOutputSchema()));
>>                 });
>>
>>         JavaPairRDD<String, Row> withSizeList = pairs.reduceByKey(new 
>> Function2<Row, Row, Row>() {
>>             @Override
>>             public Row call(Row aRow, Row bRow) {
>>                 final String uniqueCommaSeparatedSizes = uniqueSizes(aRow, 
>> bRow);
>>                 final Object[] objects = {aRow.getAs(0), aRow.getAs(1), 
>> uniqueCommaSeparatedSizes};
>>                 return new GenericRowWithSchema(objects, 
>> SalesColumns.getOutputSchema());
>>             }
>>
>>             private String uniqueSizes(Row aRow, Row bRow) {
>>                 final SortedSet<String> allSizes = new TreeSet<>();
>>                 final List<String> aSizes = Arrays.asList(((String) 
>> aRow.getAs(String.valueOf(SalesColumns.SIZE))).split(","));
>>                 final List<String> bSizes = Arrays.asList(((String) 
>> bRow.getAs(String.valueOf(SalesColumns.SIZE))).split(","));
>>                 allSizes.addAll(aSizes);
>>                 allSizes.addAll(bSizes);
>>                 return csvFormat(allSizes);
>>             }
>>         });
>>
>>         final JavaRDD<Row> values = withSizeList.values();
>>
>>         return ssc.createDataFrame(values, SalesColumns.getOutputSchema());
>>
>>     }
>>
>>     public String csvFormat(Collection<String> collection) {
>>         return 
>> collection.stream().map(Object::toString).collect(Collectors.joining(","));
>>     }
>> }
>>
>> Please suggest if there is a better way of doing this.
>>
>> Regards,
>> Abhishek
>>
>
>

Reply via email to