Hi Dmytro,
table function will be supported in Flink 1.11 with the new type system.
Hopefully, we can also support aggregate functions until then.
Regards,
Timo
On 14.04.20 15:33, godfrey he wrote:
Hi Dmytro,
Currently, TableEnvironment does not support
register AggregationFunction and TableFunction, because type extractor
has not been unified for Java and Scala.
One approach is we can use "TableEnvironment#createFunction" which will
register UDF to catalog.
I find "createTemporarySystemFunction" does not work now. cc @Zhenghua
Gao <mailto:[email protected]>
Best,
Godfrey
Zhenghua Gao <[email protected] <mailto:[email protected]>> 于2020年4月14
日周二 下午6:40写道:
`StreamTableEnvironment.create()` yields a
`StreamTableEnvironmentImpl` object,
which has several `registerFunction` interface for
ScalarFunction/TableFunction/AggregateFunction/TableAggregateFunction.
`TableEnvironment.create()` yields a `TableEnvironmentImpl` object,
which is a unify entry point for Table/SQL programs.
And it only has a deprecated `registerFunction` interface for
ScalarFunction. You should use `createTemporarySystemFunction` instead.
A workaround for batch mode of blink planner is: You can use the
public constructor of `StreamTableEnvironmentImpl` to create
the TableEnvironment and use `registerFunction`s. Pls make sure you
pass in the correct `isStreamingMode = false`
*Best Regards,*
*Zhenghua Gao*
On Tue, Apr 14, 2020 at 5:58 PM Dmytro Dragan
<[email protected] <mailto:[email protected]>> wrote:
Hi All,____
__ __
Could you please tell how to register custom Aggregation
function in blink batch app?____
In case of streaming mode:____
We create ____
EnvironmentSettings bsSettings =
EnvironmentSettings./newInstance/().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv =
StreamTableEnvironment./create/(env, bsSettings);____
__ __
Which has:____
<T, ACC> void registerFunction(String name, AggregateFunction<T,
ACC> aggregateFunction);____
__ __
But in case of batchMode, we need to create TableEnvironment:____
__ __
EnvironmentSettings bsSettings =
EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();
tEnv = TableEnvironment./create/(bsSettings);____
__ __
Which does not have this function to register
AggregationFunction, only Scalar one.____
__ __
Details: Flink 1.10, Java API ____
__ __
__ __