Hi, Can you see any exception logs? Where is this code running? is it a standalone cluster with one TaskManager?
Best, Weihua On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote: > If I get it correctly this is the way how I can save to CSV: > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example > > So my code is (read from file, save to file): > > > *package flinkCSV;* > > *import org.apache.flink.table.api.EnvironmentSettings; import > org.apache.flink.table.api.TableEnvironment;* > *public class flinkCSV {* > * public static void main(String[] args) throws Exception {* > > > > > > > * //register and create table EnvironmentSettings > settings = EnvironmentSettings .newInstance() > //.inStreamingMode() .inBatchMode() > .build();* > * final TableEnvironment tEnv = TableEnvironment.create(settings);* > > > > > > > * tEnv.executeSql("CREATE TABLE Table1 (column_name1 > STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', > 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); > tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM > Table1") .execute() .print(); * > > > > > > > > > > > > > > > > > > > * tEnv.executeSql("CREATE TABLE fs_table (" + " > column_nameA STRING, " + " column_nameB DOUBLE " > + " ) WITH ( \n" + " > 'connector'='filesystem', " + " > 'path'='file:///C:/temp/test5.txt', " + " > 'format'='csv', " + " 'sink.partition-commit.delay'='1 > s', " + " > 'sink.partition-commit.policy.kind'='success-file'" + " > )"); tEnv.executeSql("INSERT INTO fs_table SELECT > column_name1, column_name2 from Table1"); > tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table") > .execute() .print(); } }* > > Source file (test4.txt) is: > > aa; 23 > bb; 657.9 > cc; 55 > > test5.txt is not created, select from fs_table gives null > >