Hi, Asahi 这是一个已知bug[1],filesystem connector上处理计算列有点问题,已经有PR了,会在1.11.2和1.12版本上修复
Best Leonard Xu [1] https://issues.apache.org/jira/browse/FLINK-18665 <https://issues.apache.org/jira/browse/FLINK-18665> > 在 2020年7月23日,00:07,Asahi Lee <[email protected]> 写道: > > 1. 程序 > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment bsTableEnv = > StreamTableEnvironment.create(bsEnv, bsSettings); > > > String sourceTableDDL = "CREATE TABLE fs_table (" > + > " user_id > STRING," + > " order_amount > DOUBLE," + > " dt > TIMESTAMP(3)," + > " pt AS > PROCTIME() " + > " ) WITH (" + > " > 'connector'='filesystem'," + > " > 'path'='D:\\Program > Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv'," > + > " > 'format'='csv'" + > " )"; > > > bsTableEnv.executeSql(sourceTableDDL); > bsTableEnv.executeSql("select * from > fs_table").print(); > 2. csv文件 > order.csv > zhangsan,12.34,2020-08-03 12:23:50 > lisi,234.67,2020-08-03 12:25:50 > wangwu,57.6,2020-08-03 12:25:50 > zhaoliu,345,2020-08-03 12:28:50 > > > > 3. 错误 > - Source: FileSystemTableSource(user_id, order_amount, dt, pt) -> > Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) > -> SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched > from RUNNING to FAILED. > java.io.IOException: Failed to deserialize CSV row. > at > org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299) > at > org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210) > at > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected > but was 3. > at > org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441) > at > org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244) > at > org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293) > ... 5 more
