Hi Dmytro,

For 1.11:
Like Godfrey said, you can use
"TableEnvironment#createFunction/createTemporarySystemFunction". And like
Timo said, can support function with new type system.

But for 1.10 and 1.9:
A workaround way is:
"tEnv.getCatalog(tEnv.getCurrentCatalog()).get().createFunction"
You may need understand some catalog concept.

Best,
Jingsong Lee

On Wed, Apr 15, 2020 at 2:46 PM Timo Walther <[email protected]> wrote:

> Hi Dmytro,
>
> table function will be supported in Flink 1.11 with the new type system.
> Hopefully, we can also support aggregate functions until then.
>
> Regards,
> Timo
>
> On 14.04.20 15:33, godfrey he wrote:
> > Hi Dmytro,
> >
> > Currently, TableEnvironment does not support
> > register AggregationFunction and TableFunction, because type extractor
> > has not been unified for Java and Scala.
> >
> > One approach is we can use "TableEnvironment#createFunction" which will
> > register UDF to catalog.
> > I find "createTemporarySystemFunction" does not work now. cc @Zhenghua
> > Gao <mailto:[email protected]>
> >
> > Best,
> > Godfrey
> >
> > Zhenghua Gao <[email protected] <mailto:[email protected]>> 于2020年4月14
> > 日周二 下午6:40写道:
> >
> >     `StreamTableEnvironment.create()` yields a
> >     `StreamTableEnvironmentImpl` object,
> >     which has several `registerFunction` interface for
> >
>  ScalarFunction/TableFunction/AggregateFunction/TableAggregateFunction.
> >
> >     `TableEnvironment.create()` yields a `TableEnvironmentImpl` object,
> >     which is a unify entry point for Table/SQL programs.
> >     And it only has a deprecated `registerFunction` interface for
> >     ScalarFunction.  You should use `createTemporarySystemFunction`
> instead.
> >
> >     A workaround for batch mode of blink planner is: You can use the
> >     public constructor of `StreamTableEnvironmentImpl` to create
> >     the TableEnvironment and use `registerFunction`s. Pls make sure you
> >     pass in the correct `isStreamingMode = false`
> >
> >     *Best Regards,*
> >     *Zhenghua Gao*
> >
> >
> >     On Tue, Apr 14, 2020 at 5:58 PM Dmytro Dragan
> >     <[email protected] <mailto:[email protected]>> wrote:
> >
> >         Hi All,____
> >
> >         __ __
> >
> >         Could you please tell how to register custom Aggregation
> >         function in blink batch app?____
> >
> >         In case of streaming mode:____
> >
> >         We create ____
> >
> >         EnvironmentSettings bsSettings =
> >
>  
> EnvironmentSettings./newInstance/().useBlinkPlanner().inStreamingMode().build();
> >         StreamTableEnvironment tableEnv =
> >         StreamTableEnvironment./create/(env, bsSettings);____
> >
> >         __ __
> >
> >         Which has:____
> >
> >         <T, ACC> void registerFunction(String name, AggregateFunction<T,
> >         ACC> aggregateFunction);____
> >
> >         __ __
> >
> >         But in case of batchMode, we need to create TableEnvironment:____
> >
> >         __ __
> >
> >         EnvironmentSettings bsSettings =
> >
>  EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();
> >         tEnv = TableEnvironment./create/(bsSettings);____
> >
> >         __ __
> >
> >         Which does not have this function to register
> >         AggregationFunction, only Scalar one.____
> >
> >         __ __
> >
> >         Details: Flink 1.10, Java API ____
> >
> >         __ __
> >
> >         __ __
> >
>
>

-- 
Best, Jingsong Lee

Reply via email to