Hi Dmytro, Currently, TableEnvironment does not support register AggregationFunction and TableFunction, because type extractor has not been unified for Java and Scala.
One approach is we can use "TableEnvironment#createFunction" which will register UDF to catalog. I find "createTemporarySystemFunction" does not work now. cc @Zhenghua Gao <[email protected]> Best, Godfrey Zhenghua Gao <[email protected]> 于2020年4月14日周二 下午6:40写道: > `StreamTableEnvironment.create()` yields a `StreamTableEnvironmentImpl` > object, > which has several `registerFunction` interface for > ScalarFunction/TableFunction/AggregateFunction/TableAggregateFunction. > > `TableEnvironment.create()` yields a `TableEnvironmentImpl` object, which > is a unify entry point for Table/SQL programs. > And it only has a deprecated `registerFunction` interface for > ScalarFunction. You should use `createTemporarySystemFunction` instead. > > A workaround for batch mode of blink planner is: You can use the public > constructor of `StreamTableEnvironmentImpl` to create > the TableEnvironment and use `registerFunction`s. Pls make sure you pass > in the correct `isStreamingMode = false` > > *Best Regards,* > *Zhenghua Gao* > > > On Tue, Apr 14, 2020 at 5:58 PM Dmytro Dragan <[email protected]> > wrote: > >> Hi All, >> >> >> >> Could you please tell how to register custom Aggregation function in >> blink batch app? >> >> In case of streaming mode: >> >> We create >> >> EnvironmentSettings bsSettings = >> EnvironmentSettings.*newInstance*().useBlinkPlanner().inStreamingMode().build(); >> StreamTableEnvironment tableEnv = StreamTableEnvironment.*create*(env, >> bsSettings); >> >> >> >> Which has: >> >> <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> >> aggregateFunction); >> >> >> >> But in case of batchMode, we need to create TableEnvironment: >> >> >> >> EnvironmentSettings bsSettings = >> EnvironmentSettings.*newInstance*().useBlinkPlanner().inBatchMode().build(); >> tEnv = TableEnvironment.*create*(bsSettings); >> >> >> >> Which does not have this function to register AggregationFunction, only >> Scalar one. >> >> >> >> Details: Flink 1.10, Java API >> >> >> >> >> >
