??????????
class PercentileUDAF extends AggregateFunction[String, ListBuffer[Float]]{
  //????????
  val percentile1 = 0.5
  val percentile2 = 0.75
  val percentile3 = 0.98
  val percentile4 = 0.99
 
  override def getValue(accumulator: ListBuffer[Float]): String = {
    //????????????????
    val length = accumulator.size
    var i1 = Math.round(length*percentile1).toInt
    if(i1==0) i1 = 1
    var i2 = Math.round(length*percentile2).toInt
    if(i2==0) i2 = 1
    var i3 = Math.round(length*percentile3).toInt
    if(i3==0) i3 = 1
    var i4 = Math.round(length*percentile4).toInt
    if(i4==0) i4 = 1
    val seq = accumulator.sorted
    //????????
    seq(i1-1).toInt+","+seq(i2-1).toInt+","+seq(i3-1).toInt+","+seq(i4-1).toInt
  }

  override def createAccumulator(): ListBuffer[Float] = new ListBuffer[Float]()

  def accumulate(accumulator: ListBuffer[Float], i: Float): Unit = {
    accumulator.append(i)
  }

  def merge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): 
Unit = {
    its.foreach(i => accumulator ++ i)
  }





------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<[email protected]&gt;;
????????:&nbsp;2021??1??18??(??????) ????3:06
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;????: flink sql 
hop????????udaf????????????merge??????????????????????



??????????????????????????????????????????????????????????????????????????????????????

??????


&nbsp;
???????? bigdata
?????????? 2021-01-18 14:52
???????? user-zh
?????? flink sql hop????????udaf????????????merge??????????????????????
??????
&nbsp;&nbsp;&nbsp; flink1.10.1 
sql??????hop??????udaf??merge??????????????????????????????????????????????????merge??????
org.apache.flink.table.planner.codegen.CodeGenException: No matching merge 
method found for AggregateFunction com.autoai.cns.udaf.PercentileUDAF'
????merge ??????????????????
1??ImperativeAggCodeGen????checkNeededMethods??????if (needMerge)&nbsp;&nbsp; 
getUserDefinedMethod 
2??UserDefinedFunctionUtils????getUserDefinedMethod??????????????merge??????????????????????????merge????false,??????????????????????????????????flink??????bug
parameterClassEquals(methodSignature(i), clazz) ||
&nbsp;&nbsp;&nbsp; parameterDataTypeEquals(internalTypes(i), dataTypes(i))


??????????
def merge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): 
Unit = {
&nbsp; its.foreach(i =&gt; accumulator ++ i)
}

回复