hello guys:     I have a DF and a UDAF. this DF has 2 columns, lp_location_id , 
id, both are of Int type. I want to group by id and aggregate all value of id 
into 1 string. So I used a UDAF to do this transformation: multi Int values to 
1 String. However my UDAF returns empty values as the accessory attached.     
Here is my code for my main class:    val hc = new 
org.apache.spark.sql.hive.HiveContext(sc)
    val hiveTable = hc.sql("select lp_location_id,id from 
house_id_pv_location_top50")
    
    val jsonArray = new JsonArray
    val result = 
hiveTable.groupBy("lp_location_id").agg(jsonArray(col("id")).as("jsonArray")).collect.foreach(println)
------------------------------------------------------------------     Here is 
my code of my UDAF:
class JsonArray extends UserDefinedAggregateFunction {
  def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(StructField("id", IntegerType) :: Nil)
    
  def bufferSchema: StructType = StructType(
    StructField("id", StringType) :: Nil)
  def dataType: DataType = StringType
  def deterministic: Boolean = true
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = ""
  }
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Int](0)
  }
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val s1 = buffer1.getAs[Int](0).toString()
    val s2 = buffer2.getAs[Int](0).toString()
    buffer1(0) = s1.concat(s2)
  }
  def evaluate(buffer: Row): Any = {
    buffer(0)
  }
}

I don't quit understand why I get empty result from my UDAF, I guess there may 
be 2 reason:1. error initialization with "" in code of define initialize 
method2. the buffer didn't write successfully.
can anyone share a idea about this. thank you.



--------------------------------

 

Thanks&Best regards!
San.Luo
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to