I've verified your code locally and it doesn't work indeed, at least not with the latest Flink version (I've tested it with Flink 1.15). There are a couple of reasons for that:
1. You've mentioned in this thread that there's no problem with the 'csv.field-delimiter'. There is actually, because the default is a , and not a ; as documented at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options 2. When adding this option, Flink wouldn't compile because the SQL statement uses options that are different then documented at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/. You have connector.type, connector.path and format.type listed. It should be connector, path and format. In the end, I used the following code and the expect result was properly written: tEnv.executeSql( "CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector' = 'filesystem', 'path' = 'file:///C:/temp/test4.txt', 'format' = 'csv', 'csv.field-delimiter' = ';')"); Best regards, Martijn Op di 2 aug. 2022 om 00:42 schreef <pod...@gmx.com>: > No, I do not have it > > > *Sent:* Monday, August 01, 2022 at 4:43 PM > *From:* "Martijn Visser" <martijnvis...@apache.org> > *To:* pod...@gmx.com > *Cc:* user@flink.apache.org > *Subject:* Re: Why this example does not save anything to file? > That's Flink fault-tolerance mechanism, see > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/ > > Op ma 1 aug. 2022 om 16:37 schreef <pod...@gmx.com>: > >> What's that? >> >> >> *Sent:* Monday, August 01, 2022 at 2:49 PM >> *From:* "Martijn Visser" <martijnvis...@apache.org> >> *To:* pod...@gmx.com >> *Cc:* user@flink.apache.org >> *Subject:* Re: Why this example does not save anything to file? >> Do you have checkpointing enabled? >> >> Op za 30 jul. 2022 om 17:31 schreef <pod...@gmx.com>: >> >>> Thanks David but there's no problem with that (probably ";" is default >>> separator). >>> I can read the file and insert into "Table1" (I said that in my mail). >>> Problem is to save to CSV. >>> >>> >>> *Sent:* Saturday, July 30, 2022 at 3:33 PM >>> *From:* "David Anderson" <dander...@apache.org> >>> *To:* pod...@gmx.com >>> *Cc:* "user" <user@flink.apache.org> >>> *Subject:* Re: Why this example does not save anything to file? >>> You need to add >>> >>> 'csv.field-delimiter'=';' >>> >>> to the definition of Table1 so that the input from test4.txt can be >>> correctly parsed: >>> >>> tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, >>> column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', >>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv', >>> 'csv.field-delimiter'=';')"); >>> >>> Cheers, >>> David >>> >>> On Fri, Jul 29, 2022 at 4:15 PM <pod...@gmx.com> wrote: >>> >>>> Hi, >>>> >>>> you mean adding: >>>> >>>> " 'csv.field-delimiter'=';', " >>>> >>>> like: >>>> >>>> tEnv.executeSql("CREATE TABLE fs_table (" >>>> + " column_nameA STRING, " >>>> + " column_nameB DOUBLE " >>>> + " ) WITH ( " >>>> + " 'connector'='filesystem', " >>>> + " 'path'='file:///C:/temp/test5.txt', " >>>> + " 'format'='csv', " >>>> + " 'csv.field-delimiter'=';', " >>>> + " 'sink.partition-commit.delay'='1 s', " >>>> + " 'sink.partition-commit.policy.kind'='success-file'" >>>> + " )"); >>>> >>>> tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, >>>> column_name2 from Table1"); >>>> >>>> I did. Nothing new - still does not work. >>>> >>>> >>>> >>>> *Sent:* Tuesday, July 26, 2022 at 4:00 PM >>>> *From:* "Gil De Grove" <gil.degr...@euranova.eu> >>>> *To:* "Weihua Hu" <huweihua....@gmail.com> >>>> *Cc:* pod...@gmx.com, "user" <user@flink.apache.org> >>>> *Subject:* Re: Why this example does not save anything to file? >>>> Hello, >>>> >>>> I may be really wrong with this, but from what I get in the source >>>> file, you are using a semi-column to separate the value. >>>> This probably means that you should set the csv.field-delimiter to `;` >>>> to make your example work properly. >>>> >>>> Have you tried with that configuration in your create table csv >>>> connector option? >>>> >>>> Regards, >>>> Gil >>>> >>>> On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua....@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> Can you see any exception logs? >>>>> Where is this code running? is it a standalone cluster with one >>>>> TaskManager? >>>>> >>>>> >>>>> Best, >>>>> Weihua >>>>> >>>>> On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote: >>>>> >>>>>> If I get it correctly this is the way how I can save to CSV: >>>>>> >>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example >>>>>> >>>>>> So my code is (read from file, save to file): >>>>>> >>>>>> >>>>>> *package flinkCSV;* >>>>>> >>>>>> *import org.apache.flink.table.api.EnvironmentSettings; import >>>>>> org.apache.flink.table.api.TableEnvironment;* >>>>>> *public class flinkCSV {* >>>>>> * public static void main(String[] args) throws Exception {* >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> * //register and create table >>>>>> EnvironmentSettings settings = EnvironmentSettings >>>>>> .newInstance() //.inStreamingMode() >>>>>> .inBatchMode() .build();* >>>>>> * final TableEnvironment tEnv = >>>>>> TableEnvironment.create(settings);* >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> * tEnv.executeSql("CREATE TABLE Table1 (column_name1 >>>>>> STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', >>>>>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); >>>>>> tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM >>>>>> Table1") .execute() .print(); * >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> * tEnv.executeSql("CREATE TABLE fs_table (" + >>>>>> " column_nameA STRING, " + " column_nameB DOUBLE " >>>>>> + " ) WITH ( \n" + " >>>>>> 'connector'='filesystem', " + " >>>>>> 'path'='file:///C:/temp/test5.txt', " + " >>>>>> 'format'='csv', " + " 'sink.partition-commit.delay'='1 >>>>>> s', " + " >>>>>> 'sink.partition-commit.policy.kind'='success-file'" + " >>>>>> )"); tEnv.executeSql("INSERT INTO fs_table SELECT >>>>>> column_name1, column_name2 from Table1"); >>>>>> tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table") >>>>>> .execute() .print(); } }* >>>>>> >>>>>> Source file (test4.txt) is: >>>>>> >>>>>> aa; 23 >>>>>> bb; 657.9 >>>>>> cc; 55 >>>>>> >>>>>> test5.txt is not created, select from fs_table gives null >>>>>> >>>>>> >>>>>