Hi Dmytro,

Currently, TableEnvironment does not support register AggregationFunction
and TableFunction, because type extractor has not been unified for Java and
Scala.

One approach is we can use "TableEnvironment#createFunction" which will
register UDF to catalog.
I find "createTemporarySystemFunction" does not work now. cc @Zhenghua Gao
<[email protected]>

Best,
Godfrey

Zhenghua Gao <[email protected]> 于2020年4月14日周二 下午6:40写道:

> `StreamTableEnvironment.create()` yields a `StreamTableEnvironmentImpl`
> object,
> which has several `registerFunction` interface for
> ScalarFunction/TableFunction/AggregateFunction/TableAggregateFunction.
>
> `TableEnvironment.create()` yields a `TableEnvironmentImpl` object, which
> is a unify entry point for Table/SQL programs.
> And it only has a deprecated `registerFunction` interface for
> ScalarFunction.  You should use `createTemporarySystemFunction` instead.
>
> A workaround for batch mode of blink planner is: You can use the public
> constructor of `StreamTableEnvironmentImpl` to create
> the TableEnvironment and use `registerFunction`s. Pls make sure you pass
> in the correct `isStreamingMode = false`
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Tue, Apr 14, 2020 at 5:58 PM Dmytro Dragan <[email protected]>
> wrote:
>
>> Hi All,
>>
>>
>>
>> Could you please tell how to register custom Aggregation function in
>> blink batch app?
>>
>> In case of streaming mode:
>>
>> We create
>>
>> EnvironmentSettings bsSettings = 
>> EnvironmentSettings.*newInstance*().useBlinkPlanner().inStreamingMode().build();
>> StreamTableEnvironment tableEnv = StreamTableEnvironment.*create*(env, 
>> bsSettings);
>>
>>
>>
>> Which has:
>>
>> <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC>
>> aggregateFunction);
>>
>>
>>
>> But in case of batchMode, we need to create TableEnvironment:
>>
>>
>>
>> EnvironmentSettings bsSettings = 
>> EnvironmentSettings.*newInstance*().useBlinkPlanner().inBatchMode().build();
>> tEnv = TableEnvironment.*create*(bsSettings);
>>
>>
>>
>> Which does not have this function to register AggregationFunction, only
>> Scalar one.
>>
>>
>>
>> Details: Flink 1.10, Java API
>>
>>
>>
>>
>>
>

Reply via email to