Great! Thanks for the detailed answer TImo! I think I'll wait for the migration to finish before updating my code. However, does the usage of a catalog solve the problem of CSV override as well? I can't find a way to use INSERT OVERRIDE with a CSV sink using the executeSql.
Best, Flavio On Mon, Jan 25, 2021 at 10:54 AM Timo Walther <twal...@apache.org> wrote: > Hi Flavio, > > FLIP-129 will update the connect() API with a programmatic way of > defining tables. In the API we currently only support the DDL via > executeSql. > > I would recommend to implement the Catalog interface. This interface has > a lot of methods, but you only need to implement a couple of methods for > returning a CatalogTable. There is also a CatalogTableImpl that is > basically the CREATE TABLE statement in a programmatic interface. > > The missing updated connect() API is the reason why we haven't dropped > the registerTableSink yet. It is also fine to continue using it for now. > > Regards, > Timo > > On 25.01.21 09:40, Flavio Pompermaier wrote: > > Any advice on how to fix those problems? > > > > Best, > > Flavio > > > > On Thu, Jan 21, 2021 at 4:03 PM Flavio Pompermaier <pomperma...@okkam.it > > <mailto:pomperma...@okkam.it>> wrote: > > > > Hello everybody, > > I was trying to get rid of the deprecation warnings about > > using BatchTableEnvironment.registerTableSink() but I don't know how > > to proceed. > > > > My current code does the following: > > > > BatchTableEnvironment benv = BatchTableEnvironment.create(env); > > benv.registerTableSink("outUsers", getFieldNames(), getFieldTypes(), > > new CsvTableSink(outputDir + "users.tsv", "\t", 1, > > WriteMode.OVERWRITE)); > > benv.executeSql("INSERT INTO `outUsers` SELECT * FROM users"); > > > > Initially I thought to port the code to benv.connect() because it I > > can use the IDE autocomplete but I discovered that also connect () > > is deprecated in favor of executeSql(). Just for the sake of > > curiosity I've tried to use connect() and I didn't find how to > > specify overwrite. Using INSERT OVERWRITE was causing this error: > > > > INSERT OVERWRITE requires OverwritableTableSink but actually got > > org.apache.flink.table.sinks.CsvTableSink > > > > Probably using executeSql is the only non-deprecated way to register > > my sink. So I started to write the CREATE statement to create my > > table but also here there are 2 problems: > > > > 1) Do I really have to write by myself a method that convert the > > schema into the relative string? Is there any utility that already > > does that? My naive attempt was something like: > > > > private static String getCreateStatement(String tableName, > > UserToRow userToRow) { > > return "CREATE TABLE " + tableName + " (" + // > > userToRow.getSchema().toString() + ")" + // this does not work > > unfortunately > > ") WITH (" + // > > "'connector' = 'filesystem'," + // > > "'path' = 'file:///tmp/test.csv'," + // > > "'format' = 'csv'," + // > > "'sink.shuffle-by-partition.enable' = 'false'" + // > > ");"; > > } > > 2) How to solve the overwrite problem..? > > 3) Is it really the only non-deprecated way to create a table > > the executeSql? > > > > Thanks in advance, > > Flavio > > >