Hi and Happy New Year,
I’m currently trying to remove deprecations to prepare for the upgrade to Flink
1.12. currently running on 1.11.
Specifically I need to update our code that registers table sinks into the
StreamTableEnvironment. I’m maintaining jobs that use DataStreams with multiple
sinks. Now I want to use the StatementSet to benefit from its DAG for multiple
sinks.
So far I added the code to add the sinks into the StatementSet:
statementSet.addInsert(sinkName,.table)
and to execute the StatementSet:
statementSet.execute()
For this to work I need to register the sinks. I used to do that with the (now
deprecated) function on the StreamTableEnvironment:
tableEnv.registerTableSink(
sinkName,
fieldNames,
fieldTypes,
tableSink
)
My Question is how to register sinks to be discovered by the statement set?
What is the proper replacement for the function registerTableSink?
executeSql(ddl) as suggested, does not apply to this use case. Did not find
anything in the documentation either:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#translate-and-execute-a-query
When running the job I’m getting the error, that the sink could not be found in
the catalog. Which means I have to add the sink into the catalog, but how?
Which function should be used for registering a table sink into the table
environments catalog?
Thanks!
Kind Regards,
Patrick
--
Patrick Eifler
Senior Software Engineer (BI)
Cloud Gaming Engineering & Infrastructure
Sony Interactive Entertainment LLC
Wilhelmstraße 118, 10963 Berlin
Germany
E: [email protected]