In the current Flink version, the OVERWRITE should be added to every INSERT INTO statement. It is not part of the connector anymore. Maybe we can introduce an option in the future to define the default connector behavior (feel free to open an issue for this if you think this is required).

However, the OVERWRITE clause might be only supported in the Blink planner. And the BatchTableEnvironment is basically deprecated and should be replaced with the unified TableEnvironment in batch mode.

Regards,
Timo

On 25.01.21 11:06, Flavio Pompermaier wrote:
Great! Thanks for the detailed answer TImo! I think I'll wait for the migration to finish before updating my code. However, does the usage of a catalog solve the problem of CSV override as well? I can't find a way to use INSERT OVERRIDE with a CSV sink using the executeSql.

Best,
Flavio

On Mon, Jan 25, 2021 at 10:54 AM Timo Walther <[email protected] <mailto:[email protected]>> wrote:

    Hi Flavio,

    FLIP-129 will update the connect() API with a programmatic way of
    defining tables. In the API we currently only support the DDL via
    executeSql.

    I would recommend to implement the Catalog interface. This interface
    has
    a lot of methods, but you only need to implement a couple of methods
    for
    returning a CatalogTable. There is also a CatalogTableImpl that is
    basically the CREATE TABLE statement in a programmatic interface.

    The missing updated connect() API is the reason why we haven't dropped
    the registerTableSink yet. It is also fine to continue using it for now.

    Regards,
    Timo

    On 25.01.21 09:40, Flavio Pompermaier wrote:
     > Any advice on how to fix those problems?
     >
     > Best,
     > Flavio
     >
     > On Thu, Jan 21, 2021 at 4:03 PM Flavio Pompermaier
    <[email protected] <mailto:[email protected]>
     > <mailto:[email protected] <mailto:[email protected]>>> wrote:
     >
     >     Hello everybody,
     >     I was trying to get rid of the deprecation warnings about
     >     using BatchTableEnvironment.registerTableSink() but I don't
    know how
     >     to proceed.
     >
     >     My current code does the following:
     >
     >     BatchTableEnvironment benv = BatchTableEnvironment.create(env);
     >     benv.registerTableSink("outUsers", getFieldNames(),
    getFieldTypes(),
     >              new CsvTableSink(outputDir + "users.tsv", "\t", 1,
     >     WriteMode.OVERWRITE));
     >     benv.executeSql("INSERT INTO `outUsers` SELECT * FROM users");
     >
     >     Initially I thought to port the code to benv.connect()
    because it I
     >     can use the IDE autocomplete but I discovered that also
    connect ()
     >     is deprecated in favor of executeSql(). Just for the sake of
     >     curiosity I've tried to use connect() and I didn't find how to
     >     specify overwrite. Using INSERT OVERWRITE was causing this error:
     >
     >     INSERT OVERWRITE requires OverwritableTableSink but actually got
     >     org.apache.flink.table.sinks.CsvTableSink
     >
     >     Probably using executeSql is the only non-deprecated way to
    register
     >     my sink. So I started to write the CREATE statement to create my
     >     table but also here there are 2 problems:
     >
     >     1) Do I really have to write by myself a method that convert the
     >     schema into the relative string? Is there any utility that
    already
     >     does that? My naive attempt was something like:
     >
     >         private static String getCreateStatement(String tableName,
     >     UserToRow userToRow) {
     >         return "CREATE TABLE " + tableName + " (" + //
     >         userToRow.getSchema().toString() + ")" + // this does not
    work
     >     unfortunately
     >         ") WITH (" + //
     >         "'connector' = 'filesystem'," + //
     >         "'path' = 'file:///tmp/test.csv'," + //
     >         "'format' = 'csv'," + //
     >         "'sink.shuffle-by-partition.enable' = 'false'" + //
     >         ");";
     >         }
     >     2) How to solve the overwrite problem..?
     >     3) Is it really the only non-deprecated way to create a table
     >     the executeSql?
     >
     >     Thanks in advance,
     >     Flavio
     >


Reply via email to