????filesystem????csv??????????????????????????????????????????????????????????????????????????????????????
????filesystem????????????????




------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<[email protected]&gt;;
????????:&nbsp;2020??7??23??(??????) ????9:55
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re: flink 1.11 ddl sql ????PROCTIME()????????csv????



Hi, Asahi

????????????bug[1]??filesystem 
connector????????????????????????????PR????????1.11.2??1.12??????????


Best
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18665 
<https://issues.apache.org/jira/browse/FLINK-18665&gt;

&gt; ?? 2020??7??23????00:07??Asahi Lee <[email protected]&gt; ??????
&gt; 
&gt; 1. ????
&gt; StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; EnvironmentSettings bsSettings 
= EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; StreamTableEnvironment 
bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
&gt; 
&gt; 
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; String sourceTableDDL = 
"CREATE TABLE fs_table (" +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; "&amp;nbsp; user_id STRING," +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; "&amp;nbsp; order_amount DOUBLE," +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; "&amp;nbsp; dt TIMESTAMP(3)," +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; "&amp;nbsp; pt AS PROCTIME() " +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; " ) WITH (" +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; "&amp;nbsp; 'connector'='filesystem'," +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; "&amp;nbsp; 'path'='D:\\Program 
Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv',"
 +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; "&amp;nbsp; 'format'='csv'" +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; " )";
&gt; 
&gt; 
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
bsTableEnv.executeSql(sourceTableDDL);
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; bsTableEnv.executeSql("select 
* from fs_table").print();
&gt; 2. csv????
&gt; order.csv
&gt; zhangsan,12.34,2020-08-03 12:23:50
&gt; lisi,234.67,2020-08-03 12:25:50
&gt; wangwu,57.6,2020-08-03 12:25:50
&gt; zhaoliu,345,2020-08-03 12:28:50
&gt; 
&gt; 
&gt; 
&gt; 3. ????
&gt; &amp;nbsp;- Source: FileSystemTableSource(user_id, order_amount, dt, pt) 
-&amp;gt; Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS 
pt]) -&amp;gt; SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) 
switched from RUNNING to FAILED.
&gt; java.io.IOException: Failed to deserialize CSV row.
&gt;    at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
&gt;    at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
&gt;    at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
&gt;    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
&gt;    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
&gt;    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
&gt; Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields 
expected but was 3.
&gt;    at 
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
&gt;    at 
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
&gt;    at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
&gt;    ... 5 more

回复