Hi Flavio,
FLIP-129 will update the connect() API with a programmatic way of
defining tables. In the API we currently only support the DDL via
executeSql.
I would recommend to implement the Catalog interface. This interface has
a lot of methods, but you only need to implement a couple of methods for
returning a CatalogTable. There is also a CatalogTableImpl that is
basically the CREATE TABLE statement in a programmatic interface.
The missing updated connect() API is the reason why we haven't dropped
the registerTableSink yet. It is also fine to continue using it for now.
Regards,
Timo
On 25.01.21 09:40, Flavio Pompermaier wrote:
Any advice on how to fix those problems?
Best,
Flavio
On Thu, Jan 21, 2021 at 4:03 PM Flavio Pompermaier <[email protected]
<mailto:[email protected]>> wrote:
Hello everybody,
I was trying to get rid of the deprecation warnings about
using BatchTableEnvironment.registerTableSink() but I don't know how
to proceed.
My current code does the following:
BatchTableEnvironment benv = BatchTableEnvironment.create(env);
benv.registerTableSink("outUsers", getFieldNames(), getFieldTypes(),
new CsvTableSink(outputDir + "users.tsv", "\t", 1,
WriteMode.OVERWRITE));
benv.executeSql("INSERT INTO `outUsers` SELECT * FROM users");
Initially I thought to port the code to benv.connect() because it I
can use the IDE autocomplete but I discovered that also connect ()
is deprecated in favor of executeSql(). Just for the sake of
curiosity I've tried to use connect() and I didn't find how to
specify overwrite. Using INSERT OVERWRITE was causing this error:
INSERT OVERWRITE requires OverwritableTableSink but actually got
org.apache.flink.table.sinks.CsvTableSink
Probably using executeSql is the only non-deprecated way to register
my sink. So I started to write the CREATE statement to create my
table but also here there are 2 problems:
1) Do I really have to write by myself a method that convert the
schema into the relative string? Is there any utility that already
does that? My naive attempt was something like:
private static String getCreateStatement(String tableName,
UserToRow userToRow) {
return "CREATE TABLE " + tableName + " (" + //
userToRow.getSchema().toString() + ")" + // this does not work
unfortunately
") WITH (" + //
"'connector' = 'filesystem'," + //
"'path' = 'file:///tmp/test.csv'," + //
"'format' = 'csv'," + //
"'sink.shuffle-by-partition.enable' = 'false'" + //
");";
}
2) How to solve the overwrite problem..?
3) Is it really the only non-deprecated way to create a table
the executeSql?
Thanks in advance,
Flavio