Thanks Kurt, now it works. However I can't find a way to skip the CSV
header..before there was  "format.ignore-first-line" but now I can't find
another way to skip it.
I could set csv.ignore-parse-errors to true but then I can't detect other
parsing errors, otherwise I need to manually transofrm the header into a
comment adding the # character at the start of the line..
How can I solve that?

On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <ykt...@gmail.com> wrote:

> My DDL is:
>
> CREATE TABLE csv (
>        id BIGINT,
>        name STRING
> ) WITH (
>        'connector' = 'filesystem',
>        'path' = '.....',
>        'format' = 'csv'
> );
>
> Best,
> Kurt
>
>
> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <ykt...@gmail.com> wrote:
>
>> Hi Flavio,
>>
>> We would recommend you to use new table source & sink interfaces, which
>> have different
>> property keys compared to the old ones, e.g. 'connector' v.s.
>> 'connector.type'.
>>
>> You can follow the 1.12 doc [1] to define your csv table, everything
>> should work just fine.
>>
>> *Flink SQL> set table.dml-sync=true;*
>>
>> *[INFO] Session property has been set.*
>>
>>
>> *Flink SQL> select * from csv;*
>>
>> *+----------------------+----------------------+*
>>
>> *|                   id |                 name |*
>>
>> *+----------------------+----------------------+*
>>
>> *|                    3 |                    c |*
>>
>> *+----------------------+----------------------+*
>>
>> *Received a total of 1 row*
>>
>>
>> *Flink SQL> insert overwrite csv values(4, 'd');*
>>
>> *[INFO] Submitting SQL update statement to the cluster...*
>>
>> *[INFO] Execute statement in sync mode. Please wait for the execution
>> finish...*
>>
>> *[INFO] Complete execution of the SQL update statement.*
>>
>>
>> *Flink SQL> select * from csv;*
>>
>> *+----------------------+----------------------+*
>>
>> *|                   id |                 name |*
>>
>> *+----------------------+----------------------+*
>>
>> *|                    4 |                    d |*
>>
>> *+----------------------+----------------------+*
>>
>> *Received a total of 1 row*
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>>> Hi Till,
>>> since I was using the same WITH-clause both for reading and writing I
>>> discovered that overwrite is actually supported in the Sinks, while in the
>>> Sources an exception is thrown (I was thinking that those properties were
>>> simply ignored).
>>> However the quote-character is not supported in the sinks: is this a bug
>>> or is it the intended behaviour?.
>>> Here is a minimal example that reproduce the problem (put in the
>>> /tmp/test.csv something like '1,hello' or '2,hi').
>>>
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.Table;
>>> import org.apache.flink.table.api.TableEnvironment;
>>>
>>> public class FlinkCsvTest {
>>>   public static void main(String[] args) throws Exception {
>>>     final EnvironmentSettings envSettings =
>>>
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>     final TableEnvironment tableEnv =
>>> TableEnvironment.create(envSettings);
>>>     // ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>     // BatchTableEnvironment tableEnv =
>>> BatchTableEnvironment.create(env);
>>>     final String tableInName = "testTableIn";
>>>     final String createInTableDdl = getSourceDdl(tableInName,
>>> "/tmp/test.csv"); //
>>>
>>>     final String tableOutName = "testTableOut";
>>>     final String createOutTableDdl = getSinkDdl(tableOutName,
>>> "/tmp/test-out.csv"); //
>>>     tableEnv.executeSql(createInTableDdl);
>>>     tableEnv.executeSql(createOutTableDdl);
>>>
>>>     Table tableIn = tableEnv.from(tableInName);
>>>     Table tableOut = tableEnv.from(tableOutName);
>>>     tableIn.insertInto(tableOutName);
>>>     // tableEnv.toDataSet(table, Row.class).print();
>>>     tableEnv.execute("TEST read/write");
>>>
>>>   }
>>>
>>>   private static String getSourceDdl(String tableName, String filePath) {
>>>     return "CREATE TABLE " + tableName + " (\n" + //
>>>         " `id` BIGINT,\n" + //
>>>         " `name` STRING) WITH (\n" + //
>>>         " 'connector.type' = 'filesystem',\n" + //
>>>         " 'connector.property-version' = '1',\n" + //
>>>         " 'connector.path' = '" + filePath + "',\n" + //
>>>         " 'format.type' = 'csv',\n" + //
>>>         " 'format.field-delimiter' = ',',\n" + //
>>>  //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
>>>         " 'format.property-version' = '1',\n" + //
>>>         " 'format.quote-character' = '\"',\n" + //
>>>         " 'format.ignore-first-line' = 'false'" + //
>>>         ")";
>>>   }
>>>
>>>   private static String getSinkDdl(String tableName, String filePath) {
>>>     return "CREATE TABLE " + tableName + " (\n" + //
>>>         " `id` BIGINT,\n" + //
>>>         " `name` STRING) WITH (\n" + //
>>>         " 'connector.type' = 'filesystem',\n" + //
>>>         " 'connector.property-version' = '1',\n" + //
>>>         " 'connector.path' = '" + filePath + "',\n" + //
>>>         " 'format.type' = 'csv',\n" + //
>>>         " 'format.field-delimiter' = ',',\n" + //
>>>         " 'format.num-files' = '1',\n" + //
>>>         " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks
>>> only)
>>>         " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
>>>         " 'format.property-version' = '1'\n" + //
>>>         ")";
>>>   }
>>> }
>>>
>>> Thanks for the support,
>>> Flavio
>>>
>>>
>>> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> I tried to execute the code snippet you have provided and I could not
>>>> reproduce the problem.
>>>>
>>>> Concretely I am running this code:
>>>>
>>>> final EnvironmentSettings envSettings =
>>>> EnvironmentSettings.newInstance()
>>>>     .useBlinkPlanner()
>>>>     .inStreamingMode()
>>>>     .build();
>>>> final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
>>>>
>>>> tableEnv.fromValues("foobar").execute().await();
>>>>
>>>> Am I missing something? Maybe you can share a minimal but fully working
>>>> example where the problem occurs. Thanks a lot.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> Any help here? Moreover if I use the DataStream APIs there's no
>>>>> left/right outer join yet..are those meant to be added in Flink 1.13 or
>>>>> 1.14?
>>>>>
>>>>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>>> Hi to all,
>>>>>> I'm testing writing to a CSV using Flink 1.13 and I get the following
>>>>>> error:
>>>>>>
>>>>>> The matching candidates:
>>>>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>>>>>> Unsupported property keys:
>>>>>> format.quote-character
>>>>>>
>>>>>> I create the table env using this:
>>>>>>
>>>>>> final EnvironmentSettings envSettings =
>>>>>> EnvironmentSettings.newInstance()//
>>>>>>         .useBlinkPlanner()//
>>>>>>         // .inBatchMode()//
>>>>>>         .inStreamingMode()//
>>>>>>         .build();
>>>>>>     final TableEnvironment tableEnv =
>>>>>> TableEnvironment.create(envSettings);
>>>>>>
>>>>>> The error is the same both with inBatchMode and inStreamingMode.
>>>>>> Is this really not supported or am I using the wrong API?
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>

Reply via email to