hello guys: I have a DF and a UDAF. this DF has 2 columns, lp_location_id , id, both are of Int type. I want to group by id and aggregate all value of id into 1 string. So I used a UDAF to do this transformation: multi Int values to 1 String. However my UDAF returns empty values as the accessory attached. Here is my code for my main class: val hc = new org.apache.spark.sql.hive.HiveContext(sc) val hiveTable = hc.sql("select lp_location_id,id from house_id_pv_location_top50") val jsonArray = new JsonArray val result = hiveTable.groupBy("lp_location_id").agg(jsonArray(col("id")).as("jsonArray")).collect.foreach(println) ------------------------------------------------------------------ Here is my code of my UDAF: class JsonArray extends UserDefinedAggregateFunction { def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("id", IntegerType) :: Nil) def bufferSchema: StructType = StructType( StructField("id", StringType) :: Nil) def dataType: DataType = StringType def deterministic: Boolean = true def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = "" } def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0) = buffer.getAs[Int](0) } def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { val s1 = buffer1.getAs[Int](0).toString() val s2 = buffer2.getAs[Int](0).toString() buffer1(0) = s1.concat(s2) } def evaluate(buffer: Row): Any = { buffer(0) } }
I don't quit understand why I get empty result from my UDAF, I guess there may be 2 reason:1. error initialization with "" in code of define initialize method2. the buffer didn't write successfully. can anyone share a idea about this. thank you. -------------------------------- Thanks&Best regards! San.Luo
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org