??????
    flink1.10.1????sql 
hop??????????udaf????????????????????????????????marge??????
org.apache.flink.table.api.ValidationException: Function class 
'com.autoai.cns.udaf.PercentileUDAF' does not implement at least one method 
named 'merge' which is public, not abstract and (in case of table functions) 
not static
UDAF??????
class PercentileUDAF extends AggregateFunction[String, ListBuffer[Float]]{
  //????????
  val percentile1 = 0.5
  val percentile2 = 0.75
  val percentile3 = 0.98
  val percentile4 = 0.99
  /**
    * ????????????????????????
    * @param accumulator ??????
    * @return ??????????????????????
    */
  override def getValue(accumulator: ListBuffer[Float]): String = {
    //????????????????
    val length = accumulator.size
    var i1 = Math.round(length*percentile1).toInt
    if(i1==0) i1 = 1
    var i2 = Math.round(length*percentile2).toInt
    if(i2==0) i2 = 1
    var i3 = Math.round(length*percentile3).toInt
    if(i3==0) i3 = 1
    var i4 = Math.round(length*percentile4).toInt
    if(i4==0) i4 = 1
    val seq = accumulator.sorted
    //????????
    seq(i1-1).toInt+","+seq(i2-1).toInt+","+seq(i3-1).toInt+","+seq(i4-1).toInt
  }
  override def createAccumulator(): ListBuffer[Float] = new ListBuffer[Float]()
  def accumulate(accumulator: ListBuffer[Float], i: Float): Unit = {
    accumulator.append(i)
  }
  def marge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): 
Unit = {
    its.foreach(i => accumulator ++ i)
  }

回复