That's absolutely useful. IMHO also join should work without
windows/triggers and left/right outer joins should be easier in order to
really migrate legacy code.
Also reduceGroup would help but less urgent.
I hope that my feedback as Flink user could be useful.

Best,
Flavio

On Fri, Apr 9, 2021 at 12:38 PM Kurt Young <ykt...@gmail.com> wrote:

> Converting from table to DataStream in batch mode is indeed a problem now.
> But I think this will
> be improved soon.
>
> Best,
> Kurt
>
>
> On Fri, Apr 9, 2021 at 6:14 PM Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> In my real CSV I have LONG columns that can contain null values. In that
>> case I get a parse exception (and I would like to avoid to read it as a
>> string).
>> The ',bye' is just the way you can test that in my example (add that line
>> to the input csv).
>> If I use  'csv.null-literal' = '' it seems to work but, is it a
>> workaround or it is the right solution?
>>
>> Another big problem I'm having with the new APIs is that if I use
>>     TableEnvironment tableEnv = TableEnvironment.create(envSettings);
>> then I can't convert a table to a datastream..I need to use
>>     StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(streamEnv, envSettings);
>> but in that case I can't use inBatchMode..
>>
>> On Fri, Apr 9, 2021 at 11:44 AM Kurt Young <ykt...@gmail.com> wrote:
>>
>>> `format.ignore-first-line` is unfortunately a regression compared to the
>>> old one.
>>> I've created a ticket [1] to track this but according to current design,
>>> it seems not easy to do.
>>>
>>> Regarding null values, I'm not sure if I understand the issue you had.
>>> What do you mean by
>>> using ',bye' to test null Long values?
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-22178
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>>> And another thing: in my csv I added ',bye' (to test null Long values)
>>>> but I get a parse error..if I add  'csv.null-literal' = '' it seems to
>>>> work..is that the right way to solve this problem?
>>>>
>>>> On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> Thanks Kurt, now it works. However I can't find a way to skip the CSV
>>>>> header..before there was  "format.ignore-first-line" but now I can't find
>>>>> another way to skip it.
>>>>> I could set csv.ignore-parse-errors to true but then I can't detect
>>>>> other parsing errors, otherwise I need to manually transofrm the header
>>>>> into a comment adding the # character at the start of the line..
>>>>> How can I solve that?
>>>>>
>>>>> On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <ykt...@gmail.com> wrote:
>>>>>
>>>>>> My DDL is:
>>>>>>
>>>>>> CREATE TABLE csv (
>>>>>>        id BIGINT,
>>>>>>        name STRING
>>>>>> ) WITH (
>>>>>>        'connector' = 'filesystem',
>>>>>>        'path' = '.....',
>>>>>>        'format' = 'csv'
>>>>>> );
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>>
>>>>>> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <ykt...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Flavio,
>>>>>>>
>>>>>>> We would recommend you to use new table source & sink interfaces,
>>>>>>> which have different
>>>>>>> property keys compared to the old ones, e.g. 'connector' v.s.
>>>>>>> 'connector.type'.
>>>>>>>
>>>>>>> You can follow the 1.12 doc [1] to define your csv table, everything
>>>>>>> should work just fine.
>>>>>>>
>>>>>>> *Flink SQL> set table.dml-sync=true;*
>>>>>>>
>>>>>>> *[INFO] Session property has been set.*
>>>>>>>
>>>>>>>
>>>>>>> *Flink SQL> select * from csv;*
>>>>>>>
>>>>>>> *+----------------------+----------------------+*
>>>>>>>
>>>>>>> *|                   id |                 name |*
>>>>>>>
>>>>>>> *+----------------------+----------------------+*
>>>>>>>
>>>>>>> *|                    3 |                    c |*
>>>>>>>
>>>>>>> *+----------------------+----------------------+*
>>>>>>>
>>>>>>> *Received a total of 1 row*
>>>>>>>
>>>>>>>
>>>>>>> *Flink SQL> insert overwrite csv values(4, 'd');*
>>>>>>>
>>>>>>> *[INFO] Submitting SQL update statement to the cluster...*
>>>>>>>
>>>>>>> *[INFO] Execute statement in sync mode. Please wait for the
>>>>>>> execution finish...*
>>>>>>>
>>>>>>> *[INFO] Complete execution of the SQL update statement.*
>>>>>>>
>>>>>>>
>>>>>>> *Flink SQL> select * from csv;*
>>>>>>>
>>>>>>> *+----------------------+----------------------+*
>>>>>>>
>>>>>>> *|                   id |                 name |*
>>>>>>>
>>>>>>> *+----------------------+----------------------+*
>>>>>>>
>>>>>>> *|                    4 |                    d |*
>>>>>>>
>>>>>>> *+----------------------+----------------------+*
>>>>>>>
>>>>>>> *Received a total of 1 row*
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html
>>>>>>>
>>>>>>> Best,
>>>>>>> Kurt
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <
>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>
>>>>>>>> Hi Till,
>>>>>>>> since I was using the same WITH-clause both for reading and writing
>>>>>>>> I discovered that overwrite is actually supported in the Sinks, while 
>>>>>>>> in
>>>>>>>> the Sources an exception is thrown (I was thinking that those 
>>>>>>>> properties
>>>>>>>> were simply ignored).
>>>>>>>> However the quote-character is not supported in the sinks: is this
>>>>>>>> a bug or is it the intended behaviour?.
>>>>>>>> Here is a minimal example that reproduce the problem (put in the
>>>>>>>> /tmp/test.csv something like '1,hello' or '2,hi').
>>>>>>>>
>>>>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>>>>> import org.apache.flink.table.api.Table;
>>>>>>>> import org.apache.flink.table.api.TableEnvironment;
>>>>>>>>
>>>>>>>> public class FlinkCsvTest {
>>>>>>>>   public static void main(String[] args) throws Exception {
>>>>>>>>     final EnvironmentSettings envSettings =
>>>>>>>>
>>>>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>>>>>     final TableEnvironment tableEnv =
>>>>>>>> TableEnvironment.create(envSettings);
>>>>>>>>     // ExecutionEnvironment env =
>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>     // BatchTableEnvironment tableEnv =
>>>>>>>> BatchTableEnvironment.create(env);
>>>>>>>>     final String tableInName = "testTableIn";
>>>>>>>>     final String createInTableDdl = getSourceDdl(tableInName,
>>>>>>>> "/tmp/test.csv"); //
>>>>>>>>
>>>>>>>>     final String tableOutName = "testTableOut";
>>>>>>>>     final String createOutTableDdl = getSinkDdl(tableOutName,
>>>>>>>> "/tmp/test-out.csv"); //
>>>>>>>>     tableEnv.executeSql(createInTableDdl);
>>>>>>>>     tableEnv.executeSql(createOutTableDdl);
>>>>>>>>
>>>>>>>>     Table tableIn = tableEnv.from(tableInName);
>>>>>>>>     Table tableOut = tableEnv.from(tableOutName);
>>>>>>>>     tableIn.insertInto(tableOutName);
>>>>>>>>     // tableEnv.toDataSet(table, Row.class).print();
>>>>>>>>     tableEnv.execute("TEST read/write");
>>>>>>>>
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   private static String getSourceDdl(String tableName, String
>>>>>>>> filePath) {
>>>>>>>>     return "CREATE TABLE " + tableName + " (\n" + //
>>>>>>>>         " `id` BIGINT,\n" + //
>>>>>>>>         " `name` STRING) WITH (\n" + //
>>>>>>>>         " 'connector.type' = 'filesystem',\n" + //
>>>>>>>>         " 'connector.property-version' = '1',\n" + //
>>>>>>>>         " 'connector.path' = '" + filePath + "',\n" + //
>>>>>>>>         " 'format.type' = 'csv',\n" + //
>>>>>>>>         " 'format.field-delimiter' = ',',\n" + //
>>>>>>>>  //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
>>>>>>>>         " 'format.property-version' = '1',\n" + //
>>>>>>>>         " 'format.quote-character' = '\"',\n" + //
>>>>>>>>         " 'format.ignore-first-line' = 'false'" + //
>>>>>>>>         ")";
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   private static String getSinkDdl(String tableName, String
>>>>>>>> filePath) {
>>>>>>>>     return "CREATE TABLE " + tableName + " (\n" + //
>>>>>>>>         " `id` BIGINT,\n" + //
>>>>>>>>         " `name` STRING) WITH (\n" + //
>>>>>>>>         " 'connector.type' = 'filesystem',\n" + //
>>>>>>>>         " 'connector.property-version' = '1',\n" + //
>>>>>>>>         " 'connector.path' = '" + filePath + "',\n" + //
>>>>>>>>         " 'format.type' = 'csv',\n" + //
>>>>>>>>         " 'format.field-delimiter' = ',',\n" + //
>>>>>>>>         " 'format.num-files' = '1',\n" + //
>>>>>>>>         " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED
>>>>>>>> (sinks only)
>>>>>>>>         " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
>>>>>>>>         " 'format.property-version' = '1'\n" + //
>>>>>>>>         ")";
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> Thanks for the support,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <trohrm...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Flavio,
>>>>>>>>>
>>>>>>>>> I tried to execute the code snippet you have provided and I could
>>>>>>>>> not reproduce the problem.
>>>>>>>>>
>>>>>>>>> Concretely I am running this code:
>>>>>>>>>
>>>>>>>>> final EnvironmentSettings envSettings =
>>>>>>>>> EnvironmentSettings.newInstance()
>>>>>>>>>     .useBlinkPlanner()
>>>>>>>>>     .inStreamingMode()
>>>>>>>>>     .build();
>>>>>>>>> final TableEnvironment tableEnv =
>>>>>>>>> TableEnvironment.create(envSettings);
>>>>>>>>>
>>>>>>>>> tableEnv.fromValues("foobar").execute().await();
>>>>>>>>>
>>>>>>>>> Am I missing something? Maybe you can share a minimal but fully
>>>>>>>>> working example where the problem occurs. Thanks a lot.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>>
>>>>>>>>> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <
>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>>> Any help here? Moreover if I use the DataStream APIs there's no
>>>>>>>>>> left/right outer join yet..are those meant to be added in Flink 1.13 
>>>>>>>>>> or
>>>>>>>>>> 1.14?
>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <
>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi to all,
>>>>>>>>>>> I'm testing writing to a CSV using Flink 1.13 and I get the
>>>>>>>>>>> following error:
>>>>>>>>>>>
>>>>>>>>>>> The matching candidates:
>>>>>>>>>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>>>>>>>>>>> Unsupported property keys:
>>>>>>>>>>> format.quote-character
>>>>>>>>>>>
>>>>>>>>>>> I create the table env using this:
>>>>>>>>>>>
>>>>>>>>>>> final EnvironmentSettings envSettings =
>>>>>>>>>>> EnvironmentSettings.newInstance()//
>>>>>>>>>>>         .useBlinkPlanner()//
>>>>>>>>>>>         // .inBatchMode()//
>>>>>>>>>>>         .inStreamingMode()//
>>>>>>>>>>>         .build();
>>>>>>>>>>>     final TableEnvironment tableEnv =
>>>>>>>>>>> TableEnvironment.create(envSettings);
>>>>>>>>>>>
>>>>>>>>>>> The error is the same both with inBatchMode and inStreamingMode.
>>>>>>>>>>> Is this really not supported or am I using the wrong API?
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Flavio
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to