Hi Dmytro,

table function will be supported in Flink 1.11 with the new type system. Hopefully, we can also support aggregate functions until then.

Regards,
Timo

On 14.04.20 15:33, godfrey he wrote:
Hi Dmytro,

Currently, TableEnvironment does not support register AggregationFunction and TableFunction, because type extractor has not been unified for Java and Scala.

One approach is we can use "TableEnvironment#createFunction" which will register UDF to catalog. I find "createTemporarySystemFunction" does not work now. cc @Zhenghua Gao <mailto:[email protected]>

Best,
Godfrey

Zhenghua Gao <[email protected] <mailto:[email protected]>> 于2020年4月14 日周二 下午6:40写道:

    `StreamTableEnvironment.create()` yields a
    `StreamTableEnvironmentImpl` object,
    which has several `registerFunction` interface for
    ScalarFunction/TableFunction/AggregateFunction/TableAggregateFunction.

    `TableEnvironment.create()` yields a `TableEnvironmentImpl` object,
    which is a unify entry point for Table/SQL programs.
    And it only has a deprecated `registerFunction` interface for
    ScalarFunction.  You should use `createTemporarySystemFunction` instead.

    A workaround for batch mode of blink planner is: You can use the
    public constructor of `StreamTableEnvironmentImpl` to create
    the TableEnvironment and use `registerFunction`s. Pls make sure you
    pass in the correct `isStreamingMode = false`

    *Best Regards,*
    *Zhenghua Gao*


    On Tue, Apr 14, 2020 at 5:58 PM Dmytro Dragan
    <[email protected] <mailto:[email protected]>> wrote:

        Hi All,____

        __ __

        Could you please tell how to register custom Aggregation
        function in blink batch app?____

        In case of streaming mode:____

        We create ____

        EnvironmentSettings bsSettings =
        
EnvironmentSettings./newInstance/().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv =
        StreamTableEnvironment./create/(env, bsSettings);____

        __ __

        Which has:____

        <T, ACC> void registerFunction(String name, AggregateFunction<T,
        ACC> aggregateFunction);____

        __ __

        But in case of batchMode, we need to create TableEnvironment:____

        __ __

        EnvironmentSettings bsSettings =
        
EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();
        tEnv = TableEnvironment./create/(bsSettings);____

        __ __

        Which does not have this function to register
        AggregationFunction, only Scalar one.____

        __ __

        Details: Flink 1.10, Java API ____

        __ __

        __ __


Reply via email to