Great! Thanks for the detailed answer TImo! I think I'll wait for the
migration to finish before updating my code.
However, does the usage of a catalog solve the problem of CSV override as
well? I can't find a way to use INSERT OVERRIDE with a CSV sink using the
executeSql.

Best,
Flavio

On Mon, Jan 25, 2021 at 10:54 AM Timo Walther <twal...@apache.org> wrote:

> Hi Flavio,
>
> FLIP-129 will update the connect() API with a programmatic way of
> defining tables. In the API we currently only support the DDL via
> executeSql.
>
> I would recommend to implement the Catalog interface. This interface has
> a lot of methods, but you only need to implement a couple of methods for
> returning a CatalogTable. There is also a CatalogTableImpl that is
> basically the CREATE TABLE statement in a programmatic interface.
>
> The missing updated connect() API is the reason why we haven't dropped
> the registerTableSink yet. It is also fine to continue using it for now.
>
> Regards,
> Timo
>
> On 25.01.21 09:40, Flavio Pompermaier wrote:
> > Any advice on how to fix those problems?
> >
> > Best,
> > Flavio
> >
> > On Thu, Jan 21, 2021 at 4:03 PM Flavio Pompermaier <pomperma...@okkam.it
> > <mailto:pomperma...@okkam.it>> wrote:
> >
> >     Hello everybody,
> >     I was trying to get rid of the deprecation warnings about
> >     using BatchTableEnvironment.registerTableSink() but I don't know how
> >     to proceed.
> >
> >     My current code does the following:
> >
> >     BatchTableEnvironment benv = BatchTableEnvironment.create(env);
> >     benv.registerTableSink("outUsers", getFieldNames(), getFieldTypes(),
> >              new CsvTableSink(outputDir + "users.tsv", "\t", 1,
> >     WriteMode.OVERWRITE));
> >     benv.executeSql("INSERT INTO `outUsers` SELECT * FROM users");
> >
> >     Initially I thought to port the code to benv.connect() because it I
> >     can use the IDE autocomplete but I discovered that also connect ()
> >     is deprecated in favor of executeSql(). Just for the sake of
> >     curiosity I've tried to use connect() and I didn't find how to
> >     specify overwrite. Using INSERT OVERWRITE was causing this error:
> >
> >     INSERT OVERWRITE requires OverwritableTableSink but actually got
> >     org.apache.flink.table.sinks.CsvTableSink
> >
> >     Probably using executeSql is the only non-deprecated way to register
> >     my sink. So I started to write the CREATE statement to create my
> >     table but also here there are 2 problems:
> >
> >     1) Do I really have to write by myself a method that convert the
> >     schema into the relative string? Is there any utility that already
> >     does that? My naive attempt was something like:
> >
> >         private static String getCreateStatement(String tableName,
> >     UserToRow userToRow) {
> >         return "CREATE TABLE " + tableName + " (" + //
> >         userToRow.getSchema().toString() + ")" + // this does not work
> >     unfortunately
> >         ") WITH (" + //
> >         "'connector' = 'filesystem'," + //
> >         "'path' = 'file:///tmp/test.csv'," + //
> >         "'format' = 'csv'," + //
> >         "'sink.shuffle-by-partition.enable' = 'false'" + //
> >         ");";
> >         }
> >     2) How to solve the overwrite problem..?
> >     3) Is it really the only non-deprecated way to create a table
> >     the executeSql?
> >
> >     Thanks in advance,
> >     Flavio
> >
>

Reply via email to