`format.ignore-first-line` is unfortunately a regression compared to the
old one.
I've created a ticket [1] to track this but according to current design, it
seems not easy to do.

Regarding null values, I'm not sure if I understand the issue you had. What
do you mean by
using ',bye' to test null Long values?

[1] https://issues.apache.org/jira/browse/FLINK-22178

Best,
Kurt


On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> And another thing: in my csv I added ',bye' (to test null Long values) but
> I get a parse error..if I add  'csv.null-literal' = '' it seems to work..is
> that the right way to solve this problem?
>
> On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> Thanks Kurt, now it works. However I can't find a way to skip the CSV
>> header..before there was  "format.ignore-first-line" but now I can't find
>> another way to skip it.
>> I could set csv.ignore-parse-errors to true but then I can't detect other
>> parsing errors, otherwise I need to manually transofrm the header into a
>> comment adding the # character at the start of the line..
>> How can I solve that?
>>
>> On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <ykt...@gmail.com> wrote:
>>
>>> My DDL is:
>>>
>>> CREATE TABLE csv (
>>>        id BIGINT,
>>>        name STRING
>>> ) WITH (
>>>        'connector' = 'filesystem',
>>>        'path' = '.....',
>>>        'format' = 'csv'
>>> );
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <ykt...@gmail.com> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> We would recommend you to use new table source & sink interfaces, which
>>>> have different
>>>> property keys compared to the old ones, e.g. 'connector' v.s.
>>>> 'connector.type'.
>>>>
>>>> You can follow the 1.12 doc [1] to define your csv table, everything
>>>> should work just fine.
>>>>
>>>> *Flink SQL> set table.dml-sync=true;*
>>>>
>>>> *[INFO] Session property has been set.*
>>>>
>>>>
>>>> *Flink SQL> select * from csv;*
>>>>
>>>> *+----------------------+----------------------+*
>>>>
>>>> *|                   id |                 name |*
>>>>
>>>> *+----------------------+----------------------+*
>>>>
>>>> *|                    3 |                    c |*
>>>>
>>>> *+----------------------+----------------------+*
>>>>
>>>> *Received a total of 1 row*
>>>>
>>>>
>>>> *Flink SQL> insert overwrite csv values(4, 'd');*
>>>>
>>>> *[INFO] Submitting SQL update statement to the cluster...*
>>>>
>>>> *[INFO] Execute statement in sync mode. Please wait for the execution
>>>> finish...*
>>>>
>>>> *[INFO] Complete execution of the SQL update statement.*
>>>>
>>>>
>>>> *Flink SQL> select * from csv;*
>>>>
>>>> *+----------------------+----------------------+*
>>>>
>>>> *|                   id |                 name |*
>>>>
>>>> *+----------------------+----------------------+*
>>>>
>>>> *|                    4 |                    d |*
>>>>
>>>> *+----------------------+----------------------+*
>>>>
>>>> *Received a total of 1 row*
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <pomperma...@okkam.it>
>>>> wrote:
>>>>
>>>>> Hi Till,
>>>>> since I was using the same WITH-clause both for reading and writing I
>>>>> discovered that overwrite is actually supported in the Sinks, while in the
>>>>> Sources an exception is thrown (I was thinking that those properties were
>>>>> simply ignored).
>>>>> However the quote-character is not supported in the sinks: is this a
>>>>> bug or is it the intended behaviour?.
>>>>> Here is a minimal example that reproduce the problem (put in the
>>>>> /tmp/test.csv something like '1,hello' or '2,hi').
>>>>>
>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>> import org.apache.flink.table.api.Table;
>>>>> import org.apache.flink.table.api.TableEnvironment;
>>>>>
>>>>> public class FlinkCsvTest {
>>>>>   public static void main(String[] args) throws Exception {
>>>>>     final EnvironmentSettings envSettings =
>>>>>
>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>>     final TableEnvironment tableEnv =
>>>>> TableEnvironment.create(envSettings);
>>>>>     // ExecutionEnvironment env =
>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>     // BatchTableEnvironment tableEnv =
>>>>> BatchTableEnvironment.create(env);
>>>>>     final String tableInName = "testTableIn";
>>>>>     final String createInTableDdl = getSourceDdl(tableInName,
>>>>> "/tmp/test.csv"); //
>>>>>
>>>>>     final String tableOutName = "testTableOut";
>>>>>     final String createOutTableDdl = getSinkDdl(tableOutName,
>>>>> "/tmp/test-out.csv"); //
>>>>>     tableEnv.executeSql(createInTableDdl);
>>>>>     tableEnv.executeSql(createOutTableDdl);
>>>>>
>>>>>     Table tableIn = tableEnv.from(tableInName);
>>>>>     Table tableOut = tableEnv.from(tableOutName);
>>>>>     tableIn.insertInto(tableOutName);
>>>>>     // tableEnv.toDataSet(table, Row.class).print();
>>>>>     tableEnv.execute("TEST read/write");
>>>>>
>>>>>   }
>>>>>
>>>>>   private static String getSourceDdl(String tableName, String
>>>>> filePath) {
>>>>>     return "CREATE TABLE " + tableName + " (\n" + //
>>>>>         " `id` BIGINT,\n" + //
>>>>>         " `name` STRING) WITH (\n" + //
>>>>>         " 'connector.type' = 'filesystem',\n" + //
>>>>>         " 'connector.property-version' = '1',\n" + //
>>>>>         " 'connector.path' = '" + filePath + "',\n" + //
>>>>>         " 'format.type' = 'csv',\n" + //
>>>>>         " 'format.field-delimiter' = ',',\n" + //
>>>>>  //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
>>>>>         " 'format.property-version' = '1',\n" + //
>>>>>         " 'format.quote-character' = '\"',\n" + //
>>>>>         " 'format.ignore-first-line' = 'false'" + //
>>>>>         ")";
>>>>>   }
>>>>>
>>>>>   private static String getSinkDdl(String tableName, String filePath) {
>>>>>     return "CREATE TABLE " + tableName + " (\n" + //
>>>>>         " `id` BIGINT,\n" + //
>>>>>         " `name` STRING) WITH (\n" + //
>>>>>         " 'connector.type' = 'filesystem',\n" + //
>>>>>         " 'connector.property-version' = '1',\n" + //
>>>>>         " 'connector.path' = '" + filePath + "',\n" + //
>>>>>         " 'format.type' = 'csv',\n" + //
>>>>>         " 'format.field-delimiter' = ',',\n" + //
>>>>>         " 'format.num-files' = '1',\n" + //
>>>>>         " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks
>>>>> only)
>>>>>         " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
>>>>>         " 'format.property-version' = '1'\n" + //
>>>>>         ")";
>>>>>   }
>>>>> }
>>>>>
>>>>> Thanks for the support,
>>>>> Flavio
>>>>>
>>>>>
>>>>> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <trohrm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Flavio,
>>>>>>
>>>>>> I tried to execute the code snippet you have provided and I could not
>>>>>> reproduce the problem.
>>>>>>
>>>>>> Concretely I am running this code:
>>>>>>
>>>>>> final EnvironmentSettings envSettings =
>>>>>> EnvironmentSettings.newInstance()
>>>>>>     .useBlinkPlanner()
>>>>>>     .inStreamingMode()
>>>>>>     .build();
>>>>>> final TableEnvironment tableEnv =
>>>>>> TableEnvironment.create(envSettings);
>>>>>>
>>>>>> tableEnv.fromValues("foobar").execute().await();
>>>>>>
>>>>>> Am I missing something? Maybe you can share a minimal but fully
>>>>>> working example where the problem occurs. Thanks a lot.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>>
>>>>>>> Any help here? Moreover if I use the DataStream APIs there's no
>>>>>>> left/right outer join yet..are those meant to be added in Flink 1.13 or
>>>>>>> 1.14?
>>>>>>>
>>>>>>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <
>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>
>>>>>>>> Hi to all,
>>>>>>>> I'm testing writing to a CSV using Flink 1.13 and I get the
>>>>>>>> following error:
>>>>>>>>
>>>>>>>> The matching candidates:
>>>>>>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>>>>>>>> Unsupported property keys:
>>>>>>>> format.quote-character
>>>>>>>>
>>>>>>>> I create the table env using this:
>>>>>>>>
>>>>>>>> final EnvironmentSettings envSettings =
>>>>>>>> EnvironmentSettings.newInstance()//
>>>>>>>>         .useBlinkPlanner()//
>>>>>>>>         // .inBatchMode()//
>>>>>>>>         .inStreamingMode()//
>>>>>>>>         .build();
>>>>>>>>     final TableEnvironment tableEnv =
>>>>>>>> TableEnvironment.create(envSettings);
>>>>>>>>
>>>>>>>> The error is the same both with inBatchMode and inStreamingMode.
>>>>>>>> Is this really not supported or am I using the wrong API?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>

Reply via email to