Matthias,
You don't need StructType, you can have ArrayType directly
def bufferSchema: StructType = StructType(StructField("vals",
DataTypes.createArrayType(StringType)) :: Nil)
def dataType: DataType = DataTypes.createArrayType(StringType)
def evaluate(buffer: Row): Any = buffer.getSeq[String](0)
On 29 March 2016 at 11:32, Matthias Niehoff wrote:
> Hi,
>
> given is a simple DF:
>
> root
> |-- id1: string (nullable = true)
> |-- id2: string (nullable = true)
> |-- val: string (nullable = true)
>
> I run an UDAF on this DF with groupBy($“id1“,$“id2“).agg(udaf($“val“) as
> „valsStruct“).
> The aggregates simply stores all val in Set.
>
> The result is:
>
> root
> |-- id1: string (nullable = true)
> |-- id2: integer (nullable = true)
> |-- valsStruct: struct (nullable = true)
> ||-- vals: array (nullable = true)
> |||-- element: string (containsNull = true)
>
> But i would expect:
>
> root
> |-- id1: string (nullable = true)
> |-- id2: integer (nullable = true)
> |-- vals: array (nullable = true)
> ||— element: string (containsNull = true)
>
> What I’m doing right now is to add a new columns val with valsStruct.vals
> as a value and drop valsStruct afterwards, but i’m quite sure there is a
> more elegant way. I tried various implementations of the evaluate method,
> but none of those worked for me. Can you tell me what I am missing here?
>
> The implementation of the UDAF:
>
> class AggregateVals extends UserDefinedAggregateFunction {
>
> def inputSchema: StructType = StructType(Array(
> StructField("val", StringType, true)
> ))
>
> def bufferSchema: StructType = StructType(Array(
> StructField("vals", ArrayType(StringType, true))
> ))
>
> def dataType: DataType = bufferSchema
>
> def deterministic: Boolean = true
>
> def initialize(buffer: MutableAggregationBuffer): Unit = {
> buffer(0) = Seq[String]()
> }
>
> def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
> val existing: Seq[String] = buffer.getSeq[String](0)
> val newBuffer = existing :+ input.getAs[String](0)
> buffer(0) = newBuffer
> }
>
> def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
> buffer1(0) = buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)
> }
>
> def evaluate(buffer: Row): Any = {
> buffer
> }
> }
>
> --
> Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting
> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
> 172.1702676
> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> www.more4fi.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
> bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
> evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht gestattet
>