Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误

2020-07-23 文章 godfrey he
和hive结合下,filesystem是支持流式读取的,可以参考 [1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/hive/hive_streaming.html#streaming-reading

Leonard Xu  于2020年7月23日周四 下午10:28写道:

> Hi,
>
> Filesystem connector 支持streaming 写入,streaming 读取
> 还未支持,所以读取完了就停止。支持streaming 写入从文档上看[1]应该是有计划的
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html
> >
>
>
> > 在 2020年7月23日,22:05,Asahi Lee <978466...@qq.com> 写道:
> >
> > 使用filesystem读取csv作为源,使用流环境,为什么我的程序一执行就停止,而不是等待文件的追加写入,继续计算呢?
> > 还是filesystem只能用于批操作?
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:
> "user-zh"
>   <
> xbjt...@gmail.com <mailto:xbjt...@gmail.com>;
> > 发送时间:2020年7月23日(星期四) 上午9:55
> > 收件人:"user-zh" user-zh@flink.apache.org>;
> >
> > 主题:Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误
> >
> >
> >
> > Hi, Asahi
> >
> > 这是一个已知bug[1],filesystem connector上处理计算列有点问题,已经有PR了,会在1.11.2和1.12版本上修复
> >
> >
> > Best
> > Leonard Xu
> > [1] https://issues.apache.org/jira/browse/FLINK-18665 <
> https://issues.apache.org/jira/browse/FLINK-18665> <
> https://issues.apache.org/jira/browse/FLINK-18665 <
> https://issues.apache.org/jira/browse/FLINK-18665>;
> >
> >  在 2020年7月23日,00:07,Asahi Lee <978466...@qq.com  978466...@qq.com> 写道:
> > 
> >  1. 程序
> >  StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >  nbsp; nbsp; nbsp; nbsp; EnvironmentSettings
> bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >  nbsp; nbsp; nbsp; nbsp; StreamTableEnvironment
> bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
> > 
> > 
> >  nbsp; nbsp; nbsp; nbsp; String sourceTableDDL =
> "CREATE TABLE fs_table (" +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; "nbsp; user_id STRING," +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; "nbsp; order_amount DOUBLE," +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; "nbsp; dt TIMESTAMP(3)," +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; "nbsp; pt AS PROCTIME() " +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; " ) WITH (" +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; "nbsp; 'connector'='filesystem'," +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; "nbsp; 'path'='D:\\Program
> Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv',"
> +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; "nbsp; 'format'='csv'" +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; " )";
> > 
> > 
> >  nbsp; nbsp; nbsp; nbsp;
> bsTableEnv.executeSql(sourceTableDDL);
> >  nbsp; nbsp; nbsp; nbsp;
> bsTableEnv.executeSql("select * from fs_table").print();
> >  2. csv文件
> >  order.csv
> >  zhangsan,12.34,2020-08-03 12:23:50
> >  lisi,234.67,2020-08-03 12:25:50
> >  wangwu,57.6,2020-08-03 12:25:50
> >  zhaoliu,345,2020-08-03 12:28:50
> > 
> > 
> > 
> >  3. 错误
> >  nbsp;- Source: FileSystemTableSource(user_id, order_amount,
> dt, pt) -gt; Calc(select=[user_id, order_amount, dt,
> PROCTIME_MATERIALIZE(()) AS pt]) -gt; SinkConversionToRow (4/6)
> (9ee0383d676a190b0a62d206039db26c) switched from RUNNING to FAILED.
> >  java.io.IOException: Failed to deserialize CSV row.
> >   at
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
> >   at
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
> >   at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
> >   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> >   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> >   at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> >  Caused by: java.lang.RuntimeException: Row length mismatch. 4
> fields expected but was 3.
> >   at
> org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
> >   at
> org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
> >   at
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
> >   ... 5 more
>
>


Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误

2020-07-23 文章 Leonard Xu
Hi,

Filesystem connector 支持streaming 写入,streaming 读取 还未支持,所以读取完了就停止。支持streaming 
写入从文档上看[1]应该是有计划的


[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html>


> 在 2020年7月23日,22:05,Asahi Lee <978466...@qq.com> 写道:
> 
> 使用filesystem读取csv作为源,使用流环境,为什么我的程序一执行就停止,而不是等待文件的追加写入,继续计算呢?
> 还是filesystem只能用于批操作?
> 
> 
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
> mailto:xbjt...@gmail.com>;
> 发送时间:2020年7月23日(星期四) 上午9:55
> 收件人:"user-zh" <mailto:user-zh@flink.apache.org>;
> 
> 主题:Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误
> 
> 
> 
> Hi, Asahi
> 
> 这是一个已知bug[1],filesystem connector上处理计算列有点问题,已经有PR了,会在1.11.2和1.12版本上修复
> 
> 
> Best
> Leonard Xu
> [1] https://issues.apache.org/jira/browse/FLINK-18665 
> <https://issues.apache.org/jira/browse/FLINK-18665> 
> <https://issues.apache.org/jira/browse/FLINK-18665 
> <https://issues.apache.org/jira/browse/FLINK-18665>;
> 
>  在 2020年7月23日,00:07,Asahi Lee <978466...@qq.com 
> <mailto:978466...@qq.com> 写道:
>  
>  1. 程序
>  StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  nbsp; nbsp; nbsp; nbsp; EnvironmentSettings 
> bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>  nbsp; nbsp; nbsp; nbsp; StreamTableEnvironment 
> bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>  
>  
>  nbsp; nbsp; nbsp; nbsp; String sourceTableDDL = 
> "CREATE TABLE fs_table (" +
>  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
> nbsp; nbsp; "nbsp; user_id STRING," +
>  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
> nbsp; nbsp; "nbsp; order_amount DOUBLE," +
>  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
> nbsp; nbsp; "nbsp; dt TIMESTAMP(3)," +
>  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
> nbsp; nbsp; "nbsp; pt AS PROCTIME() " +
>  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
> nbsp; nbsp; " ) WITH (" +
>  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
> nbsp; nbsp; "nbsp; 'connector'='filesystem'," +
>  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
> nbsp; nbsp; "nbsp; 'path'='D:\\Program 
> Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv',"
>  +
>  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
> nbsp; nbsp; "nbsp; 'format'='csv'" +
>  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
> nbsp; nbsp; " )";
>  
>  
>  nbsp; nbsp; nbsp; nbsp; 
> bsTableEnv.executeSql(sourceTableDDL);
>  nbsp; nbsp; nbsp; nbsp; 
> bsTableEnv.executeSql("select * from fs_table").print();
>  2. csv文件
>  order.csv
>  zhangsan,12.34,2020-08-03 12:23:50
>  lisi,234.67,2020-08-03 12:25:50
>  wangwu,57.6,2020-08-03 12:25:50
>  zhaoliu,345,2020-08-03 12:28:50
>  
>  
>  
>  3. 错误
>  nbsp;- Source: FileSystemTableSource(user_id, order_amount, dt, pt) 
> -gt; Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS 
> pt]) -gt; SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) 
> switched from RUNNING to FAILED.
>  java.io.IOException: Failed to deserialize CSV row.
>   at 
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
>   at 
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
>   at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>  Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields 
> expected but was 3.
>   at 
> org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
>   at 
> org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
>   at 
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
>   ... 5 more



Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误

2020-07-22 文章 Leonard Xu
Hi, Asahi

这是一个已知bug[1],filesystem connector上处理计算列有点问题,已经有PR了,会在1.11.2和1.12版本上修复


Best
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18665 


> 在 2020年7月23日,00:07,Asahi Lee <978466...@qq.com> 写道:
> 
> 1. 程序
> StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(bsEnv, bsSettings);
> 
> 
> String sourceTableDDL = "CREATE TABLE fs_table (" 
> +
> " user_id 
> STRING," +
> " order_amount 
> DOUBLE," +
> " dt 
> TIMESTAMP(3)," +
> " pt AS 
> PROCTIME() " +
> " ) WITH (" +
> " 
> 'connector'='filesystem'," +
> " 
> 'path'='D:\\Program 
> Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv',"
>  +
> " 
> 'format'='csv'" +
> " )";
> 
> 
> bsTableEnv.executeSql(sourceTableDDL);
> bsTableEnv.executeSql("select * from 
> fs_table").print();
> 2. csv文件
> order.csv
> zhangsan,12.34,2020-08-03 12:23:50
> lisi,234.67,2020-08-03 12:25:50
> wangwu,57.6,2020-08-03 12:25:50
> zhaoliu,345,2020-08-03 12:28:50
> 
> 
> 
> 3. 错误
> - Source: FileSystemTableSource(user_id, order_amount, dt, pt) - 
> Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) 
> - SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched 
> from RUNNING to FAILED.
> java.io.IOException: Failed to deserialize CSV row.
>   at 
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
>   at 
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
>   at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected 
> but was 3.
>   at 
> org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
>   at 
> org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
>   at 
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
>   ... 5 more