Thanks Kurt, now it works. However I can't find a way to skip the CSV header..before there was "format.ignore-first-line" but now I can't find another way to skip it. I could set csv.ignore-parse-errors to true but then I can't detect other parsing errors, otherwise I need to manually transofrm the header into a comment adding the # character at the start of the line.. How can I solve that?
On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <ykt...@gmail.com> wrote: > My DDL is: > > CREATE TABLE csv ( > id BIGINT, > name STRING > ) WITH ( > 'connector' = 'filesystem', > 'path' = '.....', > 'format' = 'csv' > ); > > Best, > Kurt > > > On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <ykt...@gmail.com> wrote: > >> Hi Flavio, >> >> We would recommend you to use new table source & sink interfaces, which >> have different >> property keys compared to the old ones, e.g. 'connector' v.s. >> 'connector.type'. >> >> You can follow the 1.12 doc [1] to define your csv table, everything >> should work just fine. >> >> *Flink SQL> set table.dml-sync=true;* >> >> *[INFO] Session property has been set.* >> >> >> *Flink SQL> select * from csv;* >> >> *+----------------------+----------------------+* >> >> *| id | name |* >> >> *+----------------------+----------------------+* >> >> *| 3 | c |* >> >> *+----------------------+----------------------+* >> >> *Received a total of 1 row* >> >> >> *Flink SQL> insert overwrite csv values(4, 'd');* >> >> *[INFO] Submitting SQL update statement to the cluster...* >> >> *[INFO] Execute statement in sync mode. Please wait for the execution >> finish...* >> >> *[INFO] Complete execution of the SQL update statement.* >> >> >> *Flink SQL> select * from csv;* >> >> *+----------------------+----------------------+* >> >> *| id | name |* >> >> *+----------------------+----------------------+* >> >> *| 4 | d |* >> >> *+----------------------+----------------------+* >> >> *Received a total of 1 row* >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html >> >> Best, >> Kurt >> >> >> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <pomperma...@okkam.it> >> wrote: >> >>> Hi Till, >>> since I was using the same WITH-clause both for reading and writing I >>> discovered that overwrite is actually supported in the Sinks, while in the >>> Sources an exception is thrown (I was thinking that those properties were >>> simply ignored). >>> However the quote-character is not supported in the sinks: is this a bug >>> or is it the intended behaviour?. >>> Here is a minimal example that reproduce the problem (put in the >>> /tmp/test.csv something like '1,hello' or '2,hi'). >>> >>> import org.apache.flink.table.api.EnvironmentSettings; >>> import org.apache.flink.table.api.Table; >>> import org.apache.flink.table.api.TableEnvironment; >>> >>> public class FlinkCsvTest { >>> public static void main(String[] args) throws Exception { >>> final EnvironmentSettings envSettings = >>> >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>> final TableEnvironment tableEnv = >>> TableEnvironment.create(envSettings); >>> // ExecutionEnvironment env = >>> ExecutionEnvironment.getExecutionEnvironment(); >>> // BatchTableEnvironment tableEnv = >>> BatchTableEnvironment.create(env); >>> final String tableInName = "testTableIn"; >>> final String createInTableDdl = getSourceDdl(tableInName, >>> "/tmp/test.csv"); // >>> >>> final String tableOutName = "testTableOut"; >>> final String createOutTableDdl = getSinkDdl(tableOutName, >>> "/tmp/test-out.csv"); // >>> tableEnv.executeSql(createInTableDdl); >>> tableEnv.executeSql(createOutTableDdl); >>> >>> Table tableIn = tableEnv.from(tableInName); >>> Table tableOut = tableEnv.from(tableOutName); >>> tableIn.insertInto(tableOutName); >>> // tableEnv.toDataSet(table, Row.class).print(); >>> tableEnv.execute("TEST read/write"); >>> >>> } >>> >>> private static String getSourceDdl(String tableName, String filePath) { >>> return "CREATE TABLE " + tableName + " (\n" + // >>> " `id` BIGINT,\n" + // >>> " `name` STRING) WITH (\n" + // >>> " 'connector.type' = 'filesystem',\n" + // >>> " 'connector.property-version' = '1',\n" + // >>> " 'connector.path' = '" + filePath + "',\n" + // >>> " 'format.type' = 'csv',\n" + // >>> " 'format.field-delimiter' = ',',\n" + // >>> // " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED >>> " 'format.property-version' = '1',\n" + // >>> " 'format.quote-character' = '\"',\n" + // >>> " 'format.ignore-first-line' = 'false'" + // >>> ")"; >>> } >>> >>> private static String getSinkDdl(String tableName, String filePath) { >>> return "CREATE TABLE " + tableName + " (\n" + // >>> " `id` BIGINT,\n" + // >>> " `name` STRING) WITH (\n" + // >>> " 'connector.type' = 'filesystem',\n" + // >>> " 'connector.property-version' = '1',\n" + // >>> " 'connector.path' = '" + filePath + "',\n" + // >>> " 'format.type' = 'csv',\n" + // >>> " 'format.field-delimiter' = ',',\n" + // >>> " 'format.num-files' = '1',\n" + // >>> " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks >>> only) >>> " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED >>> " 'format.property-version' = '1'\n" + // >>> ")"; >>> } >>> } >>> >>> Thanks for the support, >>> Flavio >>> >>> >>> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Flavio, >>>> >>>> I tried to execute the code snippet you have provided and I could not >>>> reproduce the problem. >>>> >>>> Concretely I am running this code: >>>> >>>> final EnvironmentSettings envSettings = >>>> EnvironmentSettings.newInstance() >>>> .useBlinkPlanner() >>>> .inStreamingMode() >>>> .build(); >>>> final TableEnvironment tableEnv = TableEnvironment.create(envSettings); >>>> >>>> tableEnv.fromValues("foobar").execute().await(); >>>> >>>> Am I missing something? Maybe you can share a minimal but fully working >>>> example where the problem occurs. Thanks a lot. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Any help here? Moreover if I use the DataStream APIs there's no >>>>> left/right outer join yet..are those meant to be added in Flink 1.13 or >>>>> 1.14? >>>>> >>>>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> Hi to all, >>>>>> I'm testing writing to a CSV using Flink 1.13 and I get the following >>>>>> error: >>>>>> >>>>>> The matching candidates: >>>>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory >>>>>> Unsupported property keys: >>>>>> format.quote-character >>>>>> >>>>>> I create the table env using this: >>>>>> >>>>>> final EnvironmentSettings envSettings = >>>>>> EnvironmentSettings.newInstance()// >>>>>> .useBlinkPlanner()// >>>>>> // .inBatchMode()// >>>>>> .inStreamingMode()// >>>>>> .build(); >>>>>> final TableEnvironment tableEnv = >>>>>> TableEnvironment.create(envSettings); >>>>>> >>>>>> The error is the same both with inBatchMode and inStreamingMode. >>>>>> Is this really not supported or am I using the wrong API? >>>>>> >>>>>> Best, >>>>>> Flavio >>>>>> >>>>>