Hi,
given is a simple DF:
root
|-- id1: string (nullable = true)
|-- id2: string (nullable = true)
|-- val: string (nullable = true)
I run an UDAF on this DF with groupBy($“id1“,$“id2“).agg(udaf($“val“) as
„valsStruct“).
The aggregates simply stores all val in Set.
The result is:
root
|-- id1: string (nullable = true)
|-- id2: integer (nullable = true)
|-- valsStruct: struct (nullable = true)
| |-- vals: array (nullable = true)
| | |-- element: string (containsNull = true)
But i would expect:
root
|-- id1: string (nullable = true)
|-- id2: integer (nullable = true)
|-- vals: array (nullable = true)
| |— element: string (containsNull = true)
What I’m doing right now is to add a new columns val with valsStruct.vals
as a value and drop valsStruct afterwards, but i’m quite sure there is a
more elegant way. I tried various implementations of the evaluate method,
but none of those worked for me. Can you tell me what I am missing here?
The implementation of the UDAF:
class AggregateVals extends UserDefinedAggregateFunction {
def inputSchema: StructType = StructType(Array(
StructField("val", StringType, true)
))
def bufferSchema: StructType = StructType(Array(
StructField("vals", ArrayType(StringType, true))
))
def dataType: DataType = bufferSchema
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Seq[String]()
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val existing: Seq[String] = buffer.getSeq[String](0)
val newBuffer = existing :+ input.getAs[String](0)
buffer(0) = newBuffer
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)
}
def evaluate(buffer: Row): Any = {
buffer
}
}
--
Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de
Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet