Hi Dmytro, For 1.11: Like Godfrey said, you can use "TableEnvironment#createFunction/createTemporarySystemFunction". And like Timo said, can support function with new type system.
But for 1.10 and 1.9: A workaround way is: "tEnv.getCatalog(tEnv.getCurrentCatalog()).get().createFunction" You may need understand some catalog concept. Best, Jingsong Lee On Wed, Apr 15, 2020 at 2:46 PM Timo Walther <[email protected]> wrote: > Hi Dmytro, > > table function will be supported in Flink 1.11 with the new type system. > Hopefully, we can also support aggregate functions until then. > > Regards, > Timo > > On 14.04.20 15:33, godfrey he wrote: > > Hi Dmytro, > > > > Currently, TableEnvironment does not support > > register AggregationFunction and TableFunction, because type extractor > > has not been unified for Java and Scala. > > > > One approach is we can use "TableEnvironment#createFunction" which will > > register UDF to catalog. > > I find "createTemporarySystemFunction" does not work now. cc @Zhenghua > > Gao <mailto:[email protected]> > > > > Best, > > Godfrey > > > > Zhenghua Gao <[email protected] <mailto:[email protected]>> 于2020年4月14 > > 日周二 下午6:40写道: > > > > `StreamTableEnvironment.create()` yields a > > `StreamTableEnvironmentImpl` object, > > which has several `registerFunction` interface for > > > ScalarFunction/TableFunction/AggregateFunction/TableAggregateFunction. > > > > `TableEnvironment.create()` yields a `TableEnvironmentImpl` object, > > which is a unify entry point for Table/SQL programs. > > And it only has a deprecated `registerFunction` interface for > > ScalarFunction. You should use `createTemporarySystemFunction` > instead. > > > > A workaround for batch mode of blink planner is: You can use the > > public constructor of `StreamTableEnvironmentImpl` to create > > the TableEnvironment and use `registerFunction`s. Pls make sure you > > pass in the correct `isStreamingMode = false` > > > > *Best Regards,* > > *Zhenghua Gao* > > > > > > On Tue, Apr 14, 2020 at 5:58 PM Dmytro Dragan > > <[email protected] <mailto:[email protected]>> wrote: > > > > Hi All,____ > > > > __ __ > > > > Could you please tell how to register custom Aggregation > > function in blink batch app?____ > > > > In case of streaming mode:____ > > > > We create ____ > > > > EnvironmentSettings bsSettings = > > > > EnvironmentSettings./newInstance/().useBlinkPlanner().inStreamingMode().build(); > > StreamTableEnvironment tableEnv = > > StreamTableEnvironment./create/(env, bsSettings);____ > > > > __ __ > > > > Which has:____ > > > > <T, ACC> void registerFunction(String name, AggregateFunction<T, > > ACC> aggregateFunction);____ > > > > __ __ > > > > But in case of batchMode, we need to create TableEnvironment:____ > > > > __ __ > > > > EnvironmentSettings bsSettings = > > > EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build(); > > tEnv = TableEnvironment./create/(bsSettings);____ > > > > __ __ > > > > Which does not have this function to register > > AggregationFunction, only Scalar one.____ > > > > __ __ > > > > Details: Flink 1.10, Java API ____ > > > > __ __ > > > > __ __ > > > > -- Best, Jingsong Lee
