Re: Do not wrap result of a UDAF in an Struct

2016-03-29 Thread Michał Zieliński
Matthias,

You don't need StructType, you can have ArrayType directly

def bufferSchema: StructType = StructType(StructField("vals",
DataTypes.createArrayType(StringType)) :: Nil)
def dataType: DataType = DataTypes.createArrayType(StringType)
def evaluate(buffer: Row): Any = buffer.getSeq[String](0)


On 29 March 2016 at 11:32, Matthias Niehoff  wrote:

> Hi,
>
> given is a simple DF:
>
> root
>  |-- id1: string (nullable = true)
>  |-- id2: string (nullable = true)
>  |-- val: string (nullable = true)
>
> I run an UDAF on this DF with groupBy($“id1“,$“id2“).agg(udaf($“val“) as
> „valsStruct“).
> The aggregates simply stores all val in Set.
>
> The result is:
>
> root
>  |-- id1: string (nullable = true)
>  |-- id2: integer (nullable = true)
>  |-- valsStruct: struct (nullable = true)
>  ||-- vals: array (nullable = true)
>  |||-- element: string (containsNull = true)
>
> But i would expect:
>
> root
>  |-- id1: string (nullable = true)
>  |-- id2: integer (nullable = true)
>  |-- vals: array (nullable = true)
>  ||— element: string (containsNull = true)
>
> What I’m doing right now is to add a new columns val with valsStruct.vals
> as a value and drop valsStruct afterwards, but i’m quite sure there is a
> more elegant way. I tried various implementations of the evaluate method,
> but none of those worked for me. Can you tell me what I am missing here?
>
> The implementation of the UDAF:
>
> class AggregateVals extends UserDefinedAggregateFunction {
>
>   def inputSchema: StructType = StructType(Array(
> StructField("val", StringType, true)
>   ))
>
>   def bufferSchema: StructType = StructType(Array(
> StructField("vals", ArrayType(StringType, true))
>   ))
>
>   def dataType: DataType = bufferSchema
>
>   def deterministic: Boolean = true
>
>   def initialize(buffer: MutableAggregationBuffer): Unit = {
> buffer(0) = Seq[String]()
>   }
>
>   def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
> val existing: Seq[String] = buffer.getSeq[String](0)
> val newBuffer = existing :+ input.getAs[String](0)
> buffer(0) = newBuffer
>   }
>
>   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
> buffer1(0) = buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)
>   }
>
>   def evaluate(buffer: Row): Any = {
> buffer
>   }
> }
>
> --
> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
> 172.1702676
> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> www.more4fi.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
> bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
> evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht gestattet
>


Do not wrap result of a UDAF in an Struct

2016-03-29 Thread Matthias Niehoff
Hi,

given is a simple DF:

root
 |-- id1: string (nullable = true)
 |-- id2: string (nullable = true)
 |-- val: string (nullable = true)

I run an UDAF on this DF with groupBy($“id1“,$“id2“).agg(udaf($“val“) as
„valsStruct“).
The aggregates simply stores all val in Set.

The result is:

root
 |-- id1: string (nullable = true)
 |-- id2: integer (nullable = true)
 |-- valsStruct: struct (nullable = true)
 ||-- vals: array (nullable = true)
 |||-- element: string (containsNull = true)

But i would expect:

root
 |-- id1: string (nullable = true)
 |-- id2: integer (nullable = true)
 |-- vals: array (nullable = true)
 ||— element: string (containsNull = true)

What I’m doing right now is to add a new columns val with valsStruct.vals
as a value and drop valsStruct afterwards, but i’m quite sure there is a
more elegant way. I tried various implementations of the evaluate method,
but none of those worked for me. Can you tell me what I am missing here?

The implementation of the UDAF:

class AggregateVals extends UserDefinedAggregateFunction {

  def inputSchema: StructType = StructType(Array(
StructField("val", StringType, true)
  ))

  def bufferSchema: StructType = StructType(Array(
StructField("vals", ArrayType(StringType, true))
  ))

  def dataType: DataType = bufferSchema

  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Seq[String]()
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val existing: Seq[String] = buffer.getSeq[String](0)
val newBuffer = existing :+ input.getAs[String](0)
buffer(0) = newBuffer
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)
  }

  def evaluate(buffer: Row): Any = {
buffer
  }
}

-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet