Hi Flavio,

FLIP-129 will update the connect() API with a programmatic way of defining tables. In the API we currently only support the DDL via executeSql.

I would recommend to implement the Catalog interface. This interface has a lot of methods, but you only need to implement a couple of methods for returning a CatalogTable. There is also a CatalogTableImpl that is basically the CREATE TABLE statement in a programmatic interface.

The missing updated connect() API is the reason why we haven't dropped the registerTableSink yet. It is also fine to continue using it for now.

Regards,
Timo

On 25.01.21 09:40, Flavio Pompermaier wrote:
Any advice on how to fix those problems?

Best,
Flavio

On Thu, Jan 21, 2021 at 4:03 PM Flavio Pompermaier <[email protected] <mailto:[email protected]>> wrote:

    Hello everybody,
    I was trying to get rid of the deprecation warnings about
    using BatchTableEnvironment.registerTableSink() but I don't know how
    to proceed.

    My current code does the following:

    BatchTableEnvironment benv = BatchTableEnvironment.create(env);
    benv.registerTableSink("outUsers", getFieldNames(), getFieldTypes(),
             new CsvTableSink(outputDir + "users.tsv", "\t", 1,
    WriteMode.OVERWRITE));
    benv.executeSql("INSERT INTO `outUsers` SELECT * FROM users");

    Initially I thought to port the code to benv.connect() because it I
    can use the IDE autocomplete but I discovered that also connect ()
    is deprecated in favor of executeSql(). Just for the sake of
    curiosity I've tried to use connect() and I didn't find how to
    specify overwrite. Using INSERT OVERWRITE was causing this error:

    INSERT OVERWRITE requires OverwritableTableSink but actually got
    org.apache.flink.table.sinks.CsvTableSink

    Probably using executeSql is the only non-deprecated way to register
    my sink. So I started to write the CREATE statement to create my
    table but also here there are 2 problems:

    1) Do I really have to write by myself a method that convert the
    schema into the relative string? Is there any utility that already
    does that? My naive attempt was something like:

        private static String getCreateStatement(String tableName,
    UserToRow userToRow) {
        return "CREATE TABLE " + tableName + " (" + //
        userToRow.getSchema().toString() + ")" + // this does not work
    unfortunately
        ") WITH (" + //
        "'connector' = 'filesystem'," + //
        "'path' = 'file:///tmp/test.csv'," + //
        "'format' = 'csv'," + //
        "'sink.shuffle-by-partition.enable' = 'false'" + //
        ");";
        }
    2) How to solve the overwrite problem..?
    3) Is it really the only non-deprecated way to create a table
    the executeSql?

    Thanks in advance,
    Flavio


Reply via email to