??????????
class PercentileUDAF extends AggregateFunction[String, ListBuffer[Float]]{
//????????
val percentile1 = 0.5
val percentile2 = 0.75
val percentile3 = 0.98
val percentile4 = 0.99
override def getValue(accumulator: ListBuffer[Float]): String = {
//????????????????
val length = accumulator.size
var i1 = Math.round(length*percentile1).toInt
if(i1==0) i1 = 1
var i2 = Math.round(length*percentile2).toInt
if(i2==0) i2 = 1
var i3 = Math.round(length*percentile3).toInt
if(i3==0) i3 = 1
var i4 = Math.round(length*percentile4).toInt
if(i4==0) i4 = 1
val seq = accumulator.sorted
//????????
seq(i1-1).toInt+","+seq(i2-1).toInt+","+seq(i3-1).toInt+","+seq(i4-1).toInt
}
override def createAccumulator(): ListBuffer[Float] = new ListBuffer[Float]()
def accumulate(accumulator: ListBuffer[Float], i: Float): Unit = {
accumulator.append(i)
}
def merge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]):
Unit = {
its.foreach(i => accumulator ++ i)
}
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2021??1??18??(??????) ????3:06
??????: "user-zh"<[email protected]>;
????: ????: flink sql
hop????????udaf????????????merge??????????????????????
??????????????????????????????????????????????????????????????????????????????????????
??????
???????? bigdata
?????????? 2021-01-18 14:52
???????? user-zh
?????? flink sql hop????????udaf????????????merge??????????????????????
??????
flink1.10.1
sql??????hop??????udaf??merge??????????????????????????????????????????????????merge??????
org.apache.flink.table.planner.codegen.CodeGenException: No matching merge
method found for AggregateFunction com.autoai.cns.udaf.PercentileUDAF'
????merge ??????????????????
1??ImperativeAggCodeGen????checkNeededMethods??????if (needMerge)
getUserDefinedMethod
2??UserDefinedFunctionUtils????getUserDefinedMethod??????????????merge??????????????????????????merge????false,??????????????????????????????????flink??????bug
parameterClassEquals(methodSignature(i), clazz) ||
parameterDataTypeEquals(internalTypes(i), dataTypes(i))
??????????
def merge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]):
Unit = {
its.foreach(i => accumulator ++ i)
}