??????
flink1.10.1????sql
hop??????????udaf????????????????????????????????marge??????
org.apache.flink.table.api.ValidationException: Function class
'com.autoai.cns.udaf.PercentileUDAF' does not implement at least one method
named 'merge' which is public, not abstract and (in case of table functions)
not static
UDAF??????
class PercentileUDAF extends AggregateFunction[String, ListBuffer[Float]]{
//????????
val percentile1 = 0.5
val percentile2 = 0.75
val percentile3 = 0.98
val percentile4 = 0.99
/**
* ????????????????????????
* @param accumulator ??????
* @return ??????????????????????
*/
override def getValue(accumulator: ListBuffer[Float]): String = {
//????????????????
val length = accumulator.size
var i1 = Math.round(length*percentile1).toInt
if(i1==0) i1 = 1
var i2 = Math.round(length*percentile2).toInt
if(i2==0) i2 = 1
var i3 = Math.round(length*percentile3).toInt
if(i3==0) i3 = 1
var i4 = Math.round(length*percentile4).toInt
if(i4==0) i4 = 1
val seq = accumulator.sorted
//????????
seq(i1-1).toInt+","+seq(i2-1).toInt+","+seq(i3-1).toInt+","+seq(i4-1).toInt
}
override def createAccumulator(): ListBuffer[Float] = new ListBuffer[Float]()
def accumulate(accumulator: ListBuffer[Float], i: Float): Unit = {
accumulator.append(i)
}
def marge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]):
Unit = {
its.foreach(i => accumulator ++ i)
}