Have you looked at aggregators? https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html
On Fri, Apr 22, 2016 at 6:45 PM, Lee Becker <lee.bec...@hapara.com> wrote: > Is there a way to do aggregateByKey on Datasets the way one can on an RDD? > > Consider the following RDD code to build a set of KeyVals into a DataFrame > containing a column with the KeyVals' keys and a column containing lists of > KeyVals. The end goal is to join it with collections which which will be > similarly transformed. > > case class KeyVal(k: Int, v: Int) > > > val keyVals = sc.parallelize(for (i <- 1 to 3; j <- 4 to 6) yield KeyVal(i,j)) > > // function for appending to list > val addToList = (s: List[KeyVal], v: KeyVal) => s :+ v > > // function for merging two lists > val addLists = (s: List[KeyVal], t: List[KeyVal]) => s++t > > val keyAndKeyVals = keyVals.map(kv=> (kv.k, kv)) > val keyAndNestedKeyVals = keyAndKeyVals. > aggregateByKey(List[KeyVal]())(addToList, addLists). > toDF("key", "keyvals") > keyAndNestedKeyVals.show > > > which produces: > > +---+--------------------+ > |key| keyvals| > +---+--------------------+ > | 1|[[1,4], [1,5], [1...| > | 2|[[2,4], [2,5], [2...| > | 3|[[3,4], [3,5], [3...| > +---+--------------------+ > > For a Dataset approach I tried the following to no avail: > > // Initialize as Dataset > val keyVals = sc.parallelize(for (i <- 1 to 3; j <- 4 to 6) yield > KeyVal(i,j)). > toDS > > // Build key, keyVal mappings > val keyValsByKey = keyVals.groupBy(kv=>(kv.k)) > > case class NestedKeyVal(key: Int, keyvals: List[KeyVal]) > > val convertToNested = (key: Int, keyValsIter: Iterator[KeyVal]) => > NestedKeyVal(key=key, keyvals=keyValsIter.toList) > > val keyValsNestedByKey = keyValsByKey.mapGroups((key,keyvals) => > convertToNested(key,keyvals)) > keyValsNestedByKey.show > > > This and several other incantations using groupBy + mapGroups consistently > gives me serialization problems. Is this because the iterator can not be > guaranteed across boundaries? > Or is there some issue with what a Dataset can encode in the interim. > What other ways might I approach this problem? > > Thanks, > Lee > >