Re: Flink 1.11 Table API cannot process Avro

2020-07-12 Thread
It seems that you don't add additional dependencies.


org.apache.avro
avro
1.8.2


Lian Jiang  于2020年7月12日周日 下午1:08写道:

> i am using flink playground as the base:
>
> https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/pom.xml
>
> I observed "PhysicalLegacyTableSourceScan". Not sure whether this is
> related. Thanks. Regards!
>
> On Sat, Jul 11, 2020 at 3:43 PM Lian Jiang  wrote:
>
>> Thanks Jörn!
>>
>> I added the documented dependency in my pom.xml file:
>>
>> 
>>   org.apache.flink
>>   flink-avro
>>   1.11.0
>>
>> The newly generated jar does have:
>>
>> $ jar tf target//spend-report-1.0.0.jar | grep FileSystemFormatFactory
>> org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.class
>>
>> org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory$ParquetInputFormat.class
>>
>> org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.class
>>
>> org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroWriterFactory.class
>>
>> org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroInputFormat.class
>> org/apache/flink/formats/avro/AvroFileSystemFormatFactory.class
>> org/apache/flink/formats/avro/AvroFileSystemFormatFactory$1.class
>>
>> But still got the same error.  Anything else is missing? Thanks. Regards!
>>
>>
>> More detailed exception:
>> jobmanager_1  | Caused by:
>> org.apache.flink.client.program.ProgramInvocationException: The main method
>> caused an error: Could not find any factory for identifier 'avro' that
>> implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in
>> the classpath.
>> jobmanager_1  |
>> jobmanager_1  | Available factory identifiers are:
>> jobmanager_1  |
>> jobmanager_1  | csv
>> jobmanager_1  | json
>> jobmanager_1  | parquet
>> jobmanager_1  | at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1  | at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1  | at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1  | at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1  | ... 10 more
>> jobmanager_1  | Caused by:
>> org.apache.flink.table.api.ValidationException: Could not find any factory
>> for identifier 'avro' that implements
>> 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
>> jobmanager_1  |
>> jobmanager_1  | Available factory identifiers are:
>> jobmanager_1  |
>> jobmanager_1  | csv
>> jobmanager_1  | json
>> jobmanager_1  | parquet
>> jobmanager_1  | at
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1  | at
>> org.apache.flink.table.filesystem.FileSystemTableFactory.createFormatFactory(FileSystemTableFactory.java:112)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1  | at
>> org.apache.flink.table.filesystem.FileSystemTableSource.getInputFormat(FileSystemTableSource.java:143)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1  | at
>> org.apache.flink.table.filesystem.FileSystemTableSource.getDataStream(FileSystemTableSource.java:127)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1  | at
>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalLegacyTableSourceScan.getSourceTransformation(PhysicalLegacyTableSourceScan.scala:82)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1  | at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:98)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1  | at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:63)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1  | at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1  | at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlan(StreamExecLegacyTableSourceScan.scala:63)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1  | at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
>> 

Re: flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?

2020-07-01 Thread
我们正准备开发这个功能,详情可以参考:https://issues.apache.org/jira/browse/FLINK-15221

夏帅  于2020年7月1日周三 下午3:13写道:

> 你好,可以尝试自定义实现Kafka011TableSourceSinkFactory和Kafka011TableSink来实现exactly-once
>
> Kafka011TableSink
>
>
> @Override
> protected SinkFunction createKafkaProducer(
>   String topic,
>   Properties properties,
>   SerializationSchema serializationSchema,
>   Optional> partitioner) {
>return new FlinkKafkaProducer011<>(
>   topic,
>   new KeyedSerializationSchemaWrapper<>(serializationSchema),
>   properties,
>   partitioner,
>   FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
>   5);
> }
> 如果想要修改配置的话,具体可以参考KafkaTableSourceSinkFactoryBase
>
> 参考:
> https://jxeditor.github.io/2020/06/11/FlinkSQL%E5%9C%A8%E4%BD%BF%E7%94%A8%E5%88%9B%E5%BB%BA%E8%A1%A8%E8%AF%AD%E5%8F%A5%E6%97%B6%E7%9A%84%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/
> --
> 发件人:静谧雨寒 
> 发送时间:2020年7月1日(星期三) 14:33
> 收件人:user-zh 
> 主 题:flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?
>
> flink sql CREATE TABLE kafka sink表,开启checkpoint后,如何配置sql
> sink表使用两阶事务提交,exactly-once一致性保证 ?
> 官档说法:
> Consistency guarantees: By default, a Kafka sink ingests data with
> at-least-once guarantees into a Kafka topic if the query is executed with
> checkpointing enabled.,
> CREATE TABLE 默认是 at-least-once
>
>


Re: kafka相关问题

2020-06-10 Thread
那你有没有尝试过修改connector中property中connector.startup-mode
设置为latest-offset,这样子每次从kafka读取都是读取最新的消息。
另外,我想问一下 你的sql是一直运行的吗?
我给的limit方案是一个upersert流。

小学生 <201782...@qq.com> 于2020年6月10日周三 下午5:31写道:

> limit 没有用呀。有没有切实可行的方案呢,pyflink下。


Re: kafka相关问题

2020-06-10 Thread
那你可以调整sql语句,例如使用limit关键字, topN 或使用自定义的方法,具体的你可以根据你的sql语句不断调整.
如有错误欢迎指正

小学生 <201782...@qq.com> 于2020年6月10日周三 下午3:26写道:

> 您好,我是通过select * from
> table_ddl这个去触发的,但是就是因为table_ddl的不断增大,脱离我的业务需要,我业务需要的就是触发select * from
> table_ddl这个时候,只取最新的一条数据(就是保证table_ddl表总只有一条数据)


Re: kafka相关问题

2020-06-10 Thread
我认为创建一张表并不会触发从source读取的操作,除非你使用select * from table_ddl这类的操作。
至于表的增大,显然是的。 我觉得你可能关注的是否会flush数据进入存储系统,保证不会out-of-memory这类的问题吗?
我个人猜可能有两种方案:
1.考虑到kafka是一种已经实现持久化的数据源,因此我们并非需要将所有的数据缓存到保存到本地,可以只记录读取到的数据位置,下次读取从记录的位置开始;
2.定期向文件系统写入数据。


小学生 <201782...@qq.com> 于2020年6月10日周三 下午2:48写道:

> 各位大佬好,请教一个问题:
> 利用kafka作为流式源,注册内部的flink的动态表,如下面的table_ddl,是否由于'update-mode' =
> 'append',所以随着kafka消息逐步的推送,会导致table_ddl这个表的数据不断在增大,但是我想要的是kafka每推送一条数据,table_ddl就会相应的是这一条数据,请问这个如何实现呢?(或者有没有啥办法,可以直接从table_ddl中取最新的kafka数据呢)。
>
>
> table_ddl = """
> CREATE TABLE table_ddl(
> trck_id VARCHAR
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal', 
> 'connector.topic' = 'w', 
> 'connector.startup-mode' = 'group-offsets',
> 'connector.properties.group.id' = 'trck_w',
> 'update-mode' = 'append',
> 'connector.properties.zookeeper.connect' = '*',
> 'connector.properties.bootstrap.servers' = '%#',
> 'format.type' = 'json',  
> 'format.derive-schema' = 'true'
> )
> """


Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

2020-06-09 Thread
我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator
ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator
id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id,
因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator
id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。
注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。

方盛凯  于2020年6月9日周二 下午9:26写道:

>
> 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
> 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
>
> 如有错误,欢迎补充回答。
>
> 陈赋赟  于2020年6月8日周一 上午11:53写道:
>
>> 原先sql任务是:
>> CREATE TABLE A_source(...)
>> CREATE TABLE B_sink (...)
>> INSERT INTO B_sink
>> SELECT
>>  1
>> FROM
>> A_source
>> ;
>> 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为
>>
>>
>> CREATE TABLE A_source(...)
>> CREATE TABLE B_sink (...)
>> CREATE TABLE C_source(...)
>> CREATE TABLE D_sink (...)
>> INSERT INTO B_sink
>> SELECT
>>  1
>> FROM
>> A_source
>> ;
>>
>>
>> INSERT INTO C_sink
>> SELECT
>>  1
>> FROM
>> D_source
>> ;
>> 并基于Savepoint提交,结果显示
>>
>> Cannot map checkpoint/savepoint state for operator
>> 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator
>> is not available in the new program.
>> If you want to allow to skip this, you can set the
>> --allowNonRestoredState option on the CLI.
>>
>>
>> 想请教一下底层是因为什么原因导致了opertor匹配不上?
>
>


Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

2020-06-09 Thread
可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html

如有错误,欢迎补充回答。

陈赋赟  于2020年6月8日周一 上午11:53写道:

> 原先sql任务是:
> CREATE TABLE A_source(...)
> CREATE TABLE B_sink (...)
> INSERT INTO B_sink
> SELECT
>  1
> FROM
> A_source
> ;
> 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为
>
>
> CREATE TABLE A_source(...)
> CREATE TABLE B_sink (...)
> CREATE TABLE C_source(...)
> CREATE TABLE D_sink (...)
> INSERT INTO B_sink
> SELECT
>  1
> FROM
> A_source
> ;
>
>
> INSERT INTO C_sink
> SELECT
>  1
> FROM
> D_source
> ;
> 并基于Savepoint提交,结果显示
>
> Cannot map checkpoint/savepoint state for operator
> 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator
> is not available in the new program.
> If you want to allow to skip this, you can set the --allowNonRestoredState
> option on the CLI.
>
>
> 想请教一下底层是因为什么原因导致了opertor匹配不上?