Thanks for the suggestions Kurt. Actually I could use Table Api I think,
it's just that most of our Flink code use DataSet Api.

Il dom 11 apr 2021, 13:44 Kurt Young <ykt...@gmail.com> ha scritto:

> Thanks for the suggestions Flavio. Join without window & left outer join
> already worked in Table API & SQL.
> And for reduceGroup, you can try either user defined aggregate function or
> use table aggregate which is
> available in Table API now. I'm wondering whether these can meet your
> requirement, or you have other
> use cases only feasible with DataStream.
>
> Best,
> Kurt
>
>
> On Fri, Apr 9, 2021 at 7:41 PM Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> That's absolutely useful. IMHO also join should work without
>> windows/triggers and left/right outer joins should be easier in order to
>> really migrate legacy code.
>> Also reduceGroup would help but less urgent.
>> I hope that my feedback as Flink user could be useful.
>>
>> Best,
>> Flavio
>>
>> On Fri, Apr 9, 2021 at 12:38 PM Kurt Young <ykt...@gmail.com> wrote:
>>
>>> Converting from table to DataStream in batch mode is indeed a problem
>>> now. But I think this will
>>> be improved soon.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Fri, Apr 9, 2021 at 6:14 PM Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>>> In my real CSV I have LONG columns that can contain null values. In
>>>> that case I get a parse exception (and I would like to avoid to read it as
>>>> a string).
>>>> The ',bye' is just the way you can test that in my example (add that
>>>> line to the input csv).
>>>> If I use  'csv.null-literal' = '' it seems to work but, is it a
>>>> workaround or it is the right solution?
>>>>
>>>> Another big problem I'm having with the new APIs is that if I use
>>>>     TableEnvironment tableEnv = TableEnvironment.create(envSettings);
>>>> then I can't convert a table to a datastream..I need to use
>>>>     StreamTableEnvironment tableEnv =
>>>> StreamTableEnvironment.create(streamEnv, envSettings);
>>>> but in that case I can't use inBatchMode..
>>>>
>>>> On Fri, Apr 9, 2021 at 11:44 AM Kurt Young <ykt...@gmail.com> wrote:
>>>>
>>>>> `format.ignore-first-line` is unfortunately a regression compared to
>>>>> the old one.
>>>>> I've created a ticket [1] to track this but according to current
>>>>> design, it seems not easy to do.
>>>>>
>>>>> Regarding null values, I'm not sure if I understand the issue you had.
>>>>> What do you mean by
>>>>> using ',bye' to test null Long values?
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-22178
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>>> And another thing: in my csv I added ',bye' (to test null Long
>>>>>> values) but I get a parse error..if I add  'csv.null-literal' = '' it 
>>>>>> seems
>>>>>> to work..is that the right way to solve this problem?
>>>>>>
>>>>>> On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>>
>>>>>>> Thanks Kurt, now it works. However I can't find a way to skip the
>>>>>>> CSV header..before there was  "format.ignore-first-line" but now I can't
>>>>>>> find another way to skip it.
>>>>>>> I could set csv.ignore-parse-errors to true but then I can't detect
>>>>>>> other parsing errors, otherwise I need to manually transofrm the header
>>>>>>> into a comment adding the # character at the start of the line..
>>>>>>> How can I solve that?
>>>>>>>
>>>>>>> On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <ykt...@gmail.com> wrote:
>>>>>>>
>>>>>>>> My DDL is:
>>>>>>>>
>>>>>>>> CREATE TABLE csv (
>>>>>>>>        id BIGINT,
>>>>>>>>        name STRING
>>>>>>>> ) WITH (
>>>>>>>>        'connector' = 'filesystem',
>>>>>>>>        'path' = '.....',
>>>>>>>>        'format' = 'csv'
>>>>>>>> );
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Kurt
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <ykt...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Flavio,
>>>>>>>>>
>>>>>>>>> We would recommend you to use new table source & sink interfaces,
>>>>>>>>> which have different
>>>>>>>>> property keys compared to the old ones, e.g. 'connector' v.s.
>>>>>>>>> 'connector.type'.
>>>>>>>>>
>>>>>>>>> You can follow the 1.12 doc [1] to define your csv table,
>>>>>>>>> everything should work just fine.
>>>>>>>>>
>>>>>>>>> *Flink SQL> set table.dml-sync=true;*
>>>>>>>>>
>>>>>>>>> *[INFO] Session property has been set.*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Flink SQL> select * from csv;*
>>>>>>>>>
>>>>>>>>> *+----------------------+----------------------+*
>>>>>>>>>
>>>>>>>>> *|                   id |                 name |*
>>>>>>>>>
>>>>>>>>> *+----------------------+----------------------+*
>>>>>>>>>
>>>>>>>>> *|                    3 |                    c |*
>>>>>>>>>
>>>>>>>>> *+----------------------+----------------------+*
>>>>>>>>>
>>>>>>>>> *Received a total of 1 row*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Flink SQL> insert overwrite csv values(4, 'd');*
>>>>>>>>>
>>>>>>>>> *[INFO] Submitting SQL update statement to the cluster...*
>>>>>>>>>
>>>>>>>>> *[INFO] Execute statement in sync mode. Please wait for the
>>>>>>>>> execution finish...*
>>>>>>>>>
>>>>>>>>> *[INFO] Complete execution of the SQL update statement.*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Flink SQL> select * from csv;*
>>>>>>>>>
>>>>>>>>> *+----------------------+----------------------+*
>>>>>>>>>
>>>>>>>>> *|                   id |                 name |*
>>>>>>>>>
>>>>>>>>> *+----------------------+----------------------+*
>>>>>>>>>
>>>>>>>>> *|                    4 |                    d |*
>>>>>>>>>
>>>>>>>>> *+----------------------+----------------------+*
>>>>>>>>>
>>>>>>>>> *Received a total of 1 row*
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Kurt
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <
>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Till,
>>>>>>>>>> since I was using the same WITH-clause both for reading and
>>>>>>>>>> writing I discovered that overwrite is actually supported in the 
>>>>>>>>>> Sinks,
>>>>>>>>>> while in the Sources an exception is thrown (I was thinking that 
>>>>>>>>>> those
>>>>>>>>>> properties were simply ignored).
>>>>>>>>>> However the quote-character is not supported in the sinks: is
>>>>>>>>>> this a bug or is it the intended behaviour?.
>>>>>>>>>> Here is a minimal example that reproduce the problem (put in the
>>>>>>>>>> /tmp/test.csv something like '1,hello' or '2,hi').
>>>>>>>>>>
>>>>>>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>>>>>>> import org.apache.flink.table.api.Table;
>>>>>>>>>> import org.apache.flink.table.api.TableEnvironment;
>>>>>>>>>>
>>>>>>>>>> public class FlinkCsvTest {
>>>>>>>>>>   public static void main(String[] args) throws Exception {
>>>>>>>>>>     final EnvironmentSettings envSettings =
>>>>>>>>>>
>>>>>>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>>>>>>>     final TableEnvironment tableEnv =
>>>>>>>>>> TableEnvironment.create(envSettings);
>>>>>>>>>>     // ExecutionEnvironment env =
>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>     // BatchTableEnvironment tableEnv =
>>>>>>>>>> BatchTableEnvironment.create(env);
>>>>>>>>>>     final String tableInName = "testTableIn";
>>>>>>>>>>     final String createInTableDdl = getSourceDdl(tableInName,
>>>>>>>>>> "/tmp/test.csv"); //
>>>>>>>>>>
>>>>>>>>>>     final String tableOutName = "testTableOut";
>>>>>>>>>>     final String createOutTableDdl = getSinkDdl(tableOutName,
>>>>>>>>>> "/tmp/test-out.csv"); //
>>>>>>>>>>     tableEnv.executeSql(createInTableDdl);
>>>>>>>>>>     tableEnv.executeSql(createOutTableDdl);
>>>>>>>>>>
>>>>>>>>>>     Table tableIn = tableEnv.from(tableInName);
>>>>>>>>>>     Table tableOut = tableEnv.from(tableOutName);
>>>>>>>>>>     tableIn.insertInto(tableOutName);
>>>>>>>>>>     // tableEnv.toDataSet(table, Row.class).print();
>>>>>>>>>>     tableEnv.execute("TEST read/write");
>>>>>>>>>>
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>>   private static String getSourceDdl(String tableName, String
>>>>>>>>>> filePath) {
>>>>>>>>>>     return "CREATE TABLE " + tableName + " (\n" + //
>>>>>>>>>>         " `id` BIGINT,\n" + //
>>>>>>>>>>         " `name` STRING) WITH (\n" + //
>>>>>>>>>>         " 'connector.type' = 'filesystem',\n" + //
>>>>>>>>>>         " 'connector.property-version' = '1',\n" + //
>>>>>>>>>>         " 'connector.path' = '" + filePath + "',\n" + //
>>>>>>>>>>         " 'format.type' = 'csv',\n" + //
>>>>>>>>>>         " 'format.field-delimiter' = ',',\n" + //
>>>>>>>>>>  //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT
>>>>>>>>>> SUPPORTED
>>>>>>>>>>         " 'format.property-version' = '1',\n" + //
>>>>>>>>>>         " 'format.quote-character' = '\"',\n" + //
>>>>>>>>>>         " 'format.ignore-first-line' = 'false'" + //
>>>>>>>>>>         ")";
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>>   private static String getSinkDdl(String tableName, String
>>>>>>>>>> filePath) {
>>>>>>>>>>     return "CREATE TABLE " + tableName + " (\n" + //
>>>>>>>>>>         " `id` BIGINT,\n" + //
>>>>>>>>>>         " `name` STRING) WITH (\n" + //
>>>>>>>>>>         " 'connector.type' = 'filesystem',\n" + //
>>>>>>>>>>         " 'connector.property-version' = '1',\n" + //
>>>>>>>>>>         " 'connector.path' = '" + filePath + "',\n" + //
>>>>>>>>>>         " 'format.type' = 'csv',\n" + //
>>>>>>>>>>         " 'format.field-delimiter' = ',',\n" + //
>>>>>>>>>>         " 'format.num-files' = '1',\n" + //
>>>>>>>>>>         " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED
>>>>>>>>>> (sinks only)
>>>>>>>>>>         " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
>>>>>>>>>>         " 'format.property-version' = '1'\n" + //
>>>>>>>>>>         ")";
>>>>>>>>>>   }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> Thanks for the support,
>>>>>>>>>> Flavio
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <
>>>>>>>>>> trohrm...@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Flavio,
>>>>>>>>>>>
>>>>>>>>>>> I tried to execute the code snippet you have provided and I
>>>>>>>>>>> could not reproduce the problem.
>>>>>>>>>>>
>>>>>>>>>>> Concretely I am running this code:
>>>>>>>>>>>
>>>>>>>>>>> final EnvironmentSettings envSettings =
>>>>>>>>>>> EnvironmentSettings.newInstance()
>>>>>>>>>>>     .useBlinkPlanner()
>>>>>>>>>>>     .inStreamingMode()
>>>>>>>>>>>     .build();
>>>>>>>>>>> final TableEnvironment tableEnv =
>>>>>>>>>>> TableEnvironment.create(envSettings);
>>>>>>>>>>>
>>>>>>>>>>> tableEnv.fromValues("foobar").execute().await();
>>>>>>>>>>>
>>>>>>>>>>> Am I missing something? Maybe you can share a minimal but fully
>>>>>>>>>>> working example where the problem occurs. Thanks a lot.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Till
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <
>>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Any help here? Moreover if I use the DataStream APIs there's no
>>>>>>>>>>>> left/right outer join yet..are those meant to be added in Flink 
>>>>>>>>>>>> 1.13 or
>>>>>>>>>>>> 1.14?
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <
>>>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>> I'm testing writing to a CSV using Flink 1.13 and I get the
>>>>>>>>>>>>> following error:
>>>>>>>>>>>>>
>>>>>>>>>>>>> The matching candidates:
>>>>>>>>>>>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>>>>>>>>>>>>> Unsupported property keys:
>>>>>>>>>>>>> format.quote-character
>>>>>>>>>>>>>
>>>>>>>>>>>>> I create the table env using this:
>>>>>>>>>>>>>
>>>>>>>>>>>>> final EnvironmentSettings envSettings =
>>>>>>>>>>>>> EnvironmentSettings.newInstance()//
>>>>>>>>>>>>>         .useBlinkPlanner()//
>>>>>>>>>>>>>         // .inBatchMode()//
>>>>>>>>>>>>>         .inStreamingMode()//
>>>>>>>>>>>>>         .build();
>>>>>>>>>>>>>     final TableEnvironment tableEnv =
>>>>>>>>>>>>> TableEnvironment.create(envSettings);
>>>>>>>>>>>>>
>>>>>>>>>>>>> The error is the same both with inBatchMode
>>>>>>>>>>>>> and inStreamingMode.
>>>>>>>>>>>>> Is this really not supported or am I using the wrong API?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to