Hi, Asahi

这是一个已知bug[1],filesystem connector上处理计算列有点问题,已经有PR了,会在1.11.2和1.12版本上修复


Best
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18665 
<https://issues.apache.org/jira/browse/FLINK-18665>

> 在 2020年7月23日,00:07,Asahi Lee <[email protected]> 写道:
> 
> 1. 程序
> StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> &nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> &nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(bsEnv, bsSettings);
> 
> 
> &nbsp; &nbsp; &nbsp; &nbsp; String sourceTableDDL = "CREATE TABLE fs_table (" 
> +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; user_id 
> STRING," +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; order_amount 
> DOUBLE," +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; dt 
> TIMESTAMP(3)," +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; pt AS 
> PROCTIME() " +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " ) WITH (" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; 
> 'connector'='filesystem'," +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; 
> 'path'='D:\\Program 
> Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv',"
>  +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; 
> 'format'='csv'" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " )";
> 
> 
> &nbsp; &nbsp; &nbsp; &nbsp; bsTableEnv.executeSql(sourceTableDDL);
> &nbsp; &nbsp; &nbsp; &nbsp; bsTableEnv.executeSql("select * from 
> fs_table").print();
> 2. csv文件
> order.csv
> zhangsan,12.34,2020-08-03 12:23:50
> lisi,234.67,2020-08-03 12:25:50
> wangwu,57.6,2020-08-03 12:25:50
> zhaoliu,345,2020-08-03 12:28:50
> 
> 
> 
> 3. 错误
> &nbsp;- Source: FileSystemTableSource(user_id, order_amount, dt, pt) -&gt; 
> Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) 
> -&gt; SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched 
> from RUNNING to FAILED.
> java.io.IOException: Failed to deserialize CSV row.
>       at 
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
>       at 
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
>       at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected 
> but was 3.
>       at 
> org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
>       at 
> org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
>       at 
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
>       ... 5 more

回复