Thanks for the suggestions Kurt. Actually I could use Table Api I think, it's just that most of our Flink code use DataSet Api.
Il dom 11 apr 2021, 13:44 Kurt Young <ykt...@gmail.com> ha scritto: > Thanks for the suggestions Flavio. Join without window & left outer join > already worked in Table API & SQL. > And for reduceGroup, you can try either user defined aggregate function or > use table aggregate which is > available in Table API now. I'm wondering whether these can meet your > requirement, or you have other > use cases only feasible with DataStream. > > Best, > Kurt > > > On Fri, Apr 9, 2021 at 7:41 PM Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> That's absolutely useful. IMHO also join should work without >> windows/triggers and left/right outer joins should be easier in order to >> really migrate legacy code. >> Also reduceGroup would help but less urgent. >> I hope that my feedback as Flink user could be useful. >> >> Best, >> Flavio >> >> On Fri, Apr 9, 2021 at 12:38 PM Kurt Young <ykt...@gmail.com> wrote: >> >>> Converting from table to DataStream in batch mode is indeed a problem >>> now. But I think this will >>> be improved soon. >>> >>> Best, >>> Kurt >>> >>> >>> On Fri, Apr 9, 2021 at 6:14 PM Flavio Pompermaier <pomperma...@okkam.it> >>> wrote: >>> >>>> In my real CSV I have LONG columns that can contain null values. In >>>> that case I get a parse exception (and I would like to avoid to read it as >>>> a string). >>>> The ',bye' is just the way you can test that in my example (add that >>>> line to the input csv). >>>> If I use 'csv.null-literal' = '' it seems to work but, is it a >>>> workaround or it is the right solution? >>>> >>>> Another big problem I'm having with the new APIs is that if I use >>>> TableEnvironment tableEnv = TableEnvironment.create(envSettings); >>>> then I can't convert a table to a datastream..I need to use >>>> StreamTableEnvironment tableEnv = >>>> StreamTableEnvironment.create(streamEnv, envSettings); >>>> but in that case I can't use inBatchMode.. >>>> >>>> On Fri, Apr 9, 2021 at 11:44 AM Kurt Young <ykt...@gmail.com> wrote: >>>> >>>>> `format.ignore-first-line` is unfortunately a regression compared to >>>>> the old one. >>>>> I've created a ticket [1] to track this but according to current >>>>> design, it seems not easy to do. >>>>> >>>>> Regarding null values, I'm not sure if I understand the issue you had. >>>>> What do you mean by >>>>> using ',bye' to test null Long values? >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-22178 >>>>> >>>>> Best, >>>>> Kurt >>>>> >>>>> >>>>> On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> And another thing: in my csv I added ',bye' (to test null Long >>>>>> values) but I get a parse error..if I add 'csv.null-literal' = '' it >>>>>> seems >>>>>> to work..is that the right way to solve this problem? >>>>>> >>>>>> On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>>> Thanks Kurt, now it works. However I can't find a way to skip the >>>>>>> CSV header..before there was "format.ignore-first-line" but now I can't >>>>>>> find another way to skip it. >>>>>>> I could set csv.ignore-parse-errors to true but then I can't detect >>>>>>> other parsing errors, otherwise I need to manually transofrm the header >>>>>>> into a comment adding the # character at the start of the line.. >>>>>>> How can I solve that? >>>>>>> >>>>>>> On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <ykt...@gmail.com> wrote: >>>>>>> >>>>>>>> My DDL is: >>>>>>>> >>>>>>>> CREATE TABLE csv ( >>>>>>>> id BIGINT, >>>>>>>> name STRING >>>>>>>> ) WITH ( >>>>>>>> 'connector' = 'filesystem', >>>>>>>> 'path' = '.....', >>>>>>>> 'format' = 'csv' >>>>>>>> ); >>>>>>>> >>>>>>>> Best, >>>>>>>> Kurt >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <ykt...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Flavio, >>>>>>>>> >>>>>>>>> We would recommend you to use new table source & sink interfaces, >>>>>>>>> which have different >>>>>>>>> property keys compared to the old ones, e.g. 'connector' v.s. >>>>>>>>> 'connector.type'. >>>>>>>>> >>>>>>>>> You can follow the 1.12 doc [1] to define your csv table, >>>>>>>>> everything should work just fine. >>>>>>>>> >>>>>>>>> *Flink SQL> set table.dml-sync=true;* >>>>>>>>> >>>>>>>>> *[INFO] Session property has been set.* >>>>>>>>> >>>>>>>>> >>>>>>>>> *Flink SQL> select * from csv;* >>>>>>>>> >>>>>>>>> *+----------------------+----------------------+* >>>>>>>>> >>>>>>>>> *| id | name |* >>>>>>>>> >>>>>>>>> *+----------------------+----------------------+* >>>>>>>>> >>>>>>>>> *| 3 | c |* >>>>>>>>> >>>>>>>>> *+----------------------+----------------------+* >>>>>>>>> >>>>>>>>> *Received a total of 1 row* >>>>>>>>> >>>>>>>>> >>>>>>>>> *Flink SQL> insert overwrite csv values(4, 'd');* >>>>>>>>> >>>>>>>>> *[INFO] Submitting SQL update statement to the cluster...* >>>>>>>>> >>>>>>>>> *[INFO] Execute statement in sync mode. Please wait for the >>>>>>>>> execution finish...* >>>>>>>>> >>>>>>>>> *[INFO] Complete execution of the SQL update statement.* >>>>>>>>> >>>>>>>>> >>>>>>>>> *Flink SQL> select * from csv;* >>>>>>>>> >>>>>>>>> *+----------------------+----------------------+* >>>>>>>>> >>>>>>>>> *| id | name |* >>>>>>>>> >>>>>>>>> *+----------------------+----------------------+* >>>>>>>>> >>>>>>>>> *| 4 | d |* >>>>>>>>> >>>>>>>>> *+----------------------+----------------------+* >>>>>>>>> >>>>>>>>> *Received a total of 1 row* >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Kurt >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier < >>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>> >>>>>>>>>> Hi Till, >>>>>>>>>> since I was using the same WITH-clause both for reading and >>>>>>>>>> writing I discovered that overwrite is actually supported in the >>>>>>>>>> Sinks, >>>>>>>>>> while in the Sources an exception is thrown (I was thinking that >>>>>>>>>> those >>>>>>>>>> properties were simply ignored). >>>>>>>>>> However the quote-character is not supported in the sinks: is >>>>>>>>>> this a bug or is it the intended behaviour?. >>>>>>>>>> Here is a minimal example that reproduce the problem (put in the >>>>>>>>>> /tmp/test.csv something like '1,hello' or '2,hi'). >>>>>>>>>> >>>>>>>>>> import org.apache.flink.table.api.EnvironmentSettings; >>>>>>>>>> import org.apache.flink.table.api.Table; >>>>>>>>>> import org.apache.flink.table.api.TableEnvironment; >>>>>>>>>> >>>>>>>>>> public class FlinkCsvTest { >>>>>>>>>> public static void main(String[] args) throws Exception { >>>>>>>>>> final EnvironmentSettings envSettings = >>>>>>>>>> >>>>>>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>>>>>>>>> final TableEnvironment tableEnv = >>>>>>>>>> TableEnvironment.create(envSettings); >>>>>>>>>> // ExecutionEnvironment env = >>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>> // BatchTableEnvironment tableEnv = >>>>>>>>>> BatchTableEnvironment.create(env); >>>>>>>>>> final String tableInName = "testTableIn"; >>>>>>>>>> final String createInTableDdl = getSourceDdl(tableInName, >>>>>>>>>> "/tmp/test.csv"); // >>>>>>>>>> >>>>>>>>>> final String tableOutName = "testTableOut"; >>>>>>>>>> final String createOutTableDdl = getSinkDdl(tableOutName, >>>>>>>>>> "/tmp/test-out.csv"); // >>>>>>>>>> tableEnv.executeSql(createInTableDdl); >>>>>>>>>> tableEnv.executeSql(createOutTableDdl); >>>>>>>>>> >>>>>>>>>> Table tableIn = tableEnv.from(tableInName); >>>>>>>>>> Table tableOut = tableEnv.from(tableOutName); >>>>>>>>>> tableIn.insertInto(tableOutName); >>>>>>>>>> // tableEnv.toDataSet(table, Row.class).print(); >>>>>>>>>> tableEnv.execute("TEST read/write"); >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> private static String getSourceDdl(String tableName, String >>>>>>>>>> filePath) { >>>>>>>>>> return "CREATE TABLE " + tableName + " (\n" + // >>>>>>>>>> " `id` BIGINT,\n" + // >>>>>>>>>> " `name` STRING) WITH (\n" + // >>>>>>>>>> " 'connector.type' = 'filesystem',\n" + // >>>>>>>>>> " 'connector.property-version' = '1',\n" + // >>>>>>>>>> " 'connector.path' = '" + filePath + "',\n" + // >>>>>>>>>> " 'format.type' = 'csv',\n" + // >>>>>>>>>> " 'format.field-delimiter' = ',',\n" + // >>>>>>>>>> // " 'format.write-mode' = 'OVERWRITE',\n" + // NOT >>>>>>>>>> SUPPORTED >>>>>>>>>> " 'format.property-version' = '1',\n" + // >>>>>>>>>> " 'format.quote-character' = '\"',\n" + // >>>>>>>>>> " 'format.ignore-first-line' = 'false'" + // >>>>>>>>>> ")"; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> private static String getSinkDdl(String tableName, String >>>>>>>>>> filePath) { >>>>>>>>>> return "CREATE TABLE " + tableName + " (\n" + // >>>>>>>>>> " `id` BIGINT,\n" + // >>>>>>>>>> " `name` STRING) WITH (\n" + // >>>>>>>>>> " 'connector.type' = 'filesystem',\n" + // >>>>>>>>>> " 'connector.property-version' = '1',\n" + // >>>>>>>>>> " 'connector.path' = '" + filePath + "',\n" + // >>>>>>>>>> " 'format.type' = 'csv',\n" + // >>>>>>>>>> " 'format.field-delimiter' = ',',\n" + // >>>>>>>>>> " 'format.num-files' = '1',\n" + // >>>>>>>>>> " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED >>>>>>>>>> (sinks only) >>>>>>>>>> " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED >>>>>>>>>> " 'format.property-version' = '1'\n" + // >>>>>>>>>> ")"; >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> Thanks for the support, >>>>>>>>>> Flavio >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann < >>>>>>>>>> trohrm...@apache.org> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Flavio, >>>>>>>>>>> >>>>>>>>>>> I tried to execute the code snippet you have provided and I >>>>>>>>>>> could not reproduce the problem. >>>>>>>>>>> >>>>>>>>>>> Concretely I am running this code: >>>>>>>>>>> >>>>>>>>>>> final EnvironmentSettings envSettings = >>>>>>>>>>> EnvironmentSettings.newInstance() >>>>>>>>>>> .useBlinkPlanner() >>>>>>>>>>> .inStreamingMode() >>>>>>>>>>> .build(); >>>>>>>>>>> final TableEnvironment tableEnv = >>>>>>>>>>> TableEnvironment.create(envSettings); >>>>>>>>>>> >>>>>>>>>>> tableEnv.fromValues("foobar").execute().await(); >>>>>>>>>>> >>>>>>>>>>> Am I missing something? Maybe you can share a minimal but fully >>>>>>>>>>> working example where the problem occurs. Thanks a lot. >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Till >>>>>>>>>>> >>>>>>>>>>> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier < >>>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>>> >>>>>>>>>>>> Any help here? Moreover if I use the DataStream APIs there's no >>>>>>>>>>>> left/right outer join yet..are those meant to be added in Flink >>>>>>>>>>>> 1.13 or >>>>>>>>>>>> 1.14? >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier < >>>>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi to all, >>>>>>>>>>>>> I'm testing writing to a CSV using Flink 1.13 and I get the >>>>>>>>>>>>> following error: >>>>>>>>>>>>> >>>>>>>>>>>>> The matching candidates: >>>>>>>>>>>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory >>>>>>>>>>>>> Unsupported property keys: >>>>>>>>>>>>> format.quote-character >>>>>>>>>>>>> >>>>>>>>>>>>> I create the table env using this: >>>>>>>>>>>>> >>>>>>>>>>>>> final EnvironmentSettings envSettings = >>>>>>>>>>>>> EnvironmentSettings.newInstance()// >>>>>>>>>>>>> .useBlinkPlanner()// >>>>>>>>>>>>> // .inBatchMode()// >>>>>>>>>>>>> .inStreamingMode()// >>>>>>>>>>>>> .build(); >>>>>>>>>>>>> final TableEnvironment tableEnv = >>>>>>>>>>>>> TableEnvironment.create(envSettings); >>>>>>>>>>>>> >>>>>>>>>>>>> The error is the same both with inBatchMode >>>>>>>>>>>>> and inStreamingMode. >>>>>>>>>>>>> Is this really not supported or am I using the wrong API? >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Flavio >>>>>>>>>>>>> >>>>>>>>>>>>