I've verified your code locally and it doesn't work indeed, at least not
with the latest Flink version (I've tested it with Flink 1.15). There are a
couple of reasons for that:

1. You've mentioned in this thread that there's no problem with the
'csv.field-delimiter'. There is actually, because the default is a , and
not a ; as documented at
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options
2. When adding this option, Flink wouldn't compile because the SQL
statement uses options that are different then documented at
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/.
You have connector.type, connector.path and format.type listed. It should
be connector, path and format.

In the end, I used the following code and the expect result was properly
written:

        tEnv.executeSql(
                "CREATE TABLE Table1 (column_name1 STRING, column_name2
DOUBLE) WITH ('connector' = 'filesystem', 'path' =
'file:///C:/temp/test4.txt', 'format' = 'csv', 'csv.field-delimiter' =
';')");

Best regards,

Martijn

Op di 2 aug. 2022 om 00:42 schreef <pod...@gmx.com>:

> No, I do not have it
>
>
> *Sent:* Monday, August 01, 2022 at 4:43 PM
> *From:* "Martijn Visser" <martijnvis...@apache.org>
> *To:* pod...@gmx.com
> *Cc:* user@flink.apache.org
> *Subject:* Re: Why this example does not save anything to file?
> That's Flink fault-tolerance mechanism, see
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/
>
> Op ma 1 aug. 2022 om 16:37 schreef <pod...@gmx.com>:
>
>> What's that?
>>
>>
>> *Sent:* Monday, August 01, 2022 at 2:49 PM
>> *From:* "Martijn Visser" <martijnvis...@apache.org>
>> *To:* pod...@gmx.com
>> *Cc:* user@flink.apache.org
>> *Subject:* Re: Why this example does not save anything to file?
>> Do you have checkpointing enabled?
>>
>> Op za 30 jul. 2022 om 17:31 schreef <pod...@gmx.com>:
>>
>>> Thanks David but there's no problem with that (probably ";" is default
>>> separator).
>>> I can read the file and insert into "Table1" (I said that in my mail).
>>> Problem is to save to CSV.
>>>
>>>
>>> *Sent:* Saturday, July 30, 2022 at 3:33 PM
>>> *From:* "David Anderson" <dander...@apache.org>
>>> *To:* pod...@gmx.com
>>> *Cc:* "user" <user@flink.apache.org>
>>> *Subject:* Re: Why this example does not save anything to file?
>>> You need to add
>>>
>>> 'csv.field-delimiter'=';'
>>>
>>> to the definition of Table1 so that the input from test4.txt can be
>>> correctly parsed:
>>>
>>>         tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING,
>>> column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
>>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv',
>>> 'csv.field-delimiter'=';')");
>>>
>>> Cheers,
>>> David
>>>
>>> On Fri, Jul 29, 2022 at 4:15 PM <pod...@gmx.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> you mean adding:
>>>>
>>>> " 'csv.field-delimiter'=';', "
>>>>
>>>> like:
>>>>
>>>>         tEnv.executeSql("CREATE TABLE fs_table ("
>>>>                 + "    column_nameA STRING, "
>>>>                 + "    column_nameB DOUBLE "
>>>>                 + "    ) WITH ( "
>>>>                 + "    'connector'='filesystem', "
>>>>                 + "    'path'='file:///C:/temp/test5.txt', "
>>>>                 + "    'format'='csv', "
>>>>                 + " 'csv.field-delimiter'=';', "
>>>>                 + " 'sink.partition-commit.delay'='1 s', "
>>>>                 + " 'sink.partition-commit.policy.kind'='success-file'"
>>>>                 + "    )");
>>>>
>>>>         tEnv.executeSql("INSERT INTO fs_table SELECT column_name1,
>>>> column_name2 from Table1");
>>>>
>>>> I did. Nothing new - still does not work.
>>>>
>>>>
>>>>
>>>> *Sent:* Tuesday, July 26, 2022 at 4:00 PM
>>>> *From:* "Gil De Grove" <gil.degr...@euranova.eu>
>>>> *To:* "Weihua Hu" <huweihua....@gmail.com>
>>>> *Cc:* pod...@gmx.com, "user" <user@flink.apache.org>
>>>> *Subject:* Re: Why this example does not save anything to file?
>>>> Hello,
>>>>
>>>> I may be really wrong with this, but from what I get in the source
>>>> file, you are using a semi-column to separate the value.
>>>> This probably means that you should set the csv.field-delimiter to `;`
>>>> to make your example work properly.
>>>>
>>>> Have you tried with that configuration in your create table csv
>>>> connector option?
>>>>
>>>> Regards,
>>>> Gil
>>>>
>>>> On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua....@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Can you see any exception logs?
>>>>> Where is this code running? is it a standalone cluster with one
>>>>> TaskManager?
>>>>>
>>>>>
>>>>> Best,
>>>>> Weihua
>>>>>
>>>>> On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote:
>>>>>
>>>>>> If I get it correctly this is the way how I can save to CSV:
>>>>>>
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example
>>>>>>
>>>>>> So my code is (read from file, save to file):
>>>>>>
>>>>>>
>>>>>> *package flinkCSV;*
>>>>>>
>>>>>> *import org.apache.flink.table.api.EnvironmentSettings; import
>>>>>> org.apache.flink.table.api.TableEnvironment;*
>>>>>> *public class flinkCSV {*
>>>>>> *    public static void main(String[] args) throws Exception {*
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *                 //register and create table
>>>>>>  EnvironmentSettings settings = EnvironmentSettings
>>>>>> .newInstance()                 //.inStreamingMode()
>>>>>> .inBatchMode()                 .build();*
>>>>>> *        final TableEnvironment tEnv =
>>>>>> TableEnvironment.create(settings);*
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *                 tEnv.executeSql("CREATE TABLE Table1 (column_name1
>>>>>> STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
>>>>>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
>>>>>>                  tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM
>>>>>> Table1")         .execute()         .print();         *
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *        tEnv.executeSql("CREATE TABLE fs_table ("                 +
>>>>>> "    column_nameA STRING, "                 + "    column_nameB DOUBLE "
>>>>>>              + "    ) WITH ( \n"                 + "
>>>>>>  'connector'='filesystem', "                 + "
>>>>>>  'path'='file:///C:/temp/test5.txt', "                 + "
>>>>>>  'format'='csv', "                 + "  'sink.partition-commit.delay'='1
>>>>>> s', "                 + "
>>>>>> 'sink.partition-commit.policy.kind'='success-file'"                 + "
>>>>>>  )");                  tEnv.executeSql("INSERT INTO fs_table SELECT
>>>>>> column_name1, column_name2 from Table1");
>>>>>> tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
>>>>>> .execute()         .print();               } }*
>>>>>>
>>>>>> Source file (test4.txt) is:
>>>>>>
>>>>>> aa; 23
>>>>>> bb; 657.9
>>>>>> cc; 55
>>>>>>
>>>>>> test5.txt is not created, select from fs_table gives null
>>>>>>
>>>>>>
>>>>>

Reply via email to