Re: Flink 1.11 Table API cannot process Avro
It seems that you don't add additional dependencies. org.apache.avro avro 1.8.2 Lian Jiang 于2020年7月12日周日 下午1:08写道: > i am using flink playground as the base: > > https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/pom.xml > > I observed "PhysicalLegacyTableSourceScan". Not sure whether this is > related. Thanks. Regards! > > On Sat, Jul 11, 2020 at 3:43 PM Lian Jiang wrote: > >> Thanks Jörn! >> >> I added the documented dependency in my pom.xml file: >> >> >> org.apache.flink >> flink-avro >> 1.11.0 >> >> The newly generated jar does have: >> >> $ jar tf target//spend-report-1.0.0.jar | grep FileSystemFormatFactory >> org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.class >> >> org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory$ParquetInputFormat.class >> >> org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.class >> >> org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroWriterFactory.class >> >> org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroInputFormat.class >> org/apache/flink/formats/avro/AvroFileSystemFormatFactory.class >> org/apache/flink/formats/avro/AvroFileSystemFormatFactory$1.class >> >> But still got the same error. Anything else is missing? Thanks. Regards! >> >> >> More detailed exception: >> jobmanager_1 | Caused by: >> org.apache.flink.client.program.ProgramInvocationException: The main method >> caused an error: Could not find any factory for identifier 'avro' that >> implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in >> the classpath. >> jobmanager_1 | >> jobmanager_1 | Available factory identifiers are: >> jobmanager_1 | >> jobmanager_1 | csv >> jobmanager_1 | json >> jobmanager_1 | parquet >> jobmanager_1 | at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) >> ~[flink-dist_2.11-1.11.0.jar:1.11.0] >> jobmanager_1 | at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >> ~[flink-dist_2.11-1.11.0.jar:1.11.0] >> jobmanager_1 | at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) >> ~[flink-dist_2.11-1.11.0.jar:1.11.0] >> jobmanager_1 | at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) >> ~[flink-dist_2.11-1.11.0.jar:1.11.0] >> jobmanager_1 | ... 10 more >> jobmanager_1 | Caused by: >> org.apache.flink.table.api.ValidationException: Could not find any factory >> for identifier 'avro' that implements >> 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath. >> jobmanager_1 | >> jobmanager_1 | Available factory identifiers are: >> jobmanager_1 | >> jobmanager_1 | csv >> jobmanager_1 | json >> jobmanager_1 | parquet >> jobmanager_1 | at >> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> jobmanager_1 | at >> org.apache.flink.table.filesystem.FileSystemTableFactory.createFormatFactory(FileSystemTableFactory.java:112) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> jobmanager_1 | at >> org.apache.flink.table.filesystem.FileSystemTableSource.getInputFormat(FileSystemTableSource.java:143) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> jobmanager_1 | at >> org.apache.flink.table.filesystem.FileSystemTableSource.getDataStream(FileSystemTableSource.java:127) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> jobmanager_1 | at >> org.apache.flink.table.planner.plan.nodes.physical.PhysicalLegacyTableSourceScan.getSourceTransformation(PhysicalLegacyTableSourceScan.scala:82) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> jobmanager_1 | at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:98) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> jobmanager_1 | at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:63) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> jobmanager_1 | at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> jobmanager_1 | at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlan(StreamExecLegacyTableSourceScan.scala:63) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> jobmanager_1 | at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) >>
Re: flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?
我们正准备开发这个功能,详情可以参考:https://issues.apache.org/jira/browse/FLINK-15221 夏帅 于2020年7月1日周三 下午3:13写道: > 你好,可以尝试自定义实现Kafka011TableSourceSinkFactory和Kafka011TableSink来实现exactly-once > > Kafka011TableSink > > > @Override > protected SinkFunction createKafkaProducer( > String topic, > Properties properties, > SerializationSchema serializationSchema, > Optional> partitioner) { >return new FlinkKafkaProducer011<>( > topic, > new KeyedSerializationSchemaWrapper<>(serializationSchema), > properties, > partitioner, > FlinkKafkaProducer011.Semantic.EXACTLY_ONCE, > 5); > } > 如果想要修改配置的话,具体可以参考KafkaTableSourceSinkFactoryBase > > 参考: > https://jxeditor.github.io/2020/06/11/FlinkSQL%E5%9C%A8%E4%BD%BF%E7%94%A8%E5%88%9B%E5%BB%BA%E8%A1%A8%E8%AF%AD%E5%8F%A5%E6%97%B6%E7%9A%84%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/ > -- > 发件人:静谧雨寒 > 发送时间:2020年7月1日(星期三) 14:33 > 收件人:user-zh > 主 题:flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once? > > flink sql CREATE TABLE kafka sink表,开启checkpoint后,如何配置sql > sink表使用两阶事务提交,exactly-once一致性保证 ? > 官档说法: > Consistency guarantees: By default, a Kafka sink ingests data with > at-least-once guarantees into a Kafka topic if the query is executed with > checkpointing enabled., > CREATE TABLE 默认是 at-least-once > >
Re: kafka相关问题
那你有没有尝试过修改connector中property中connector.startup-mode 设置为latest-offset,这样子每次从kafka读取都是读取最新的消息。 另外,我想问一下 你的sql是一直运行的吗? 我给的limit方案是一个upersert流。 小学生 <201782...@qq.com> 于2020年6月10日周三 下午5:31写道: > limit 没有用呀。有没有切实可行的方案呢,pyflink下。
Re: kafka相关问题
那你可以调整sql语句,例如使用limit关键字, topN 或使用自定义的方法,具体的你可以根据你的sql语句不断调整. 如有错误欢迎指正 小学生 <201782...@qq.com> 于2020年6月10日周三 下午3:26写道: > 您好,我是通过select * from > table_ddl这个去触发的,但是就是因为table_ddl的不断增大,脱离我的业务需要,我业务需要的就是触发select * from > table_ddl这个时候,只取最新的一条数据(就是保证table_ddl表总只有一条数据)
Re: kafka相关问题
我认为创建一张表并不会触发从source读取的操作,除非你使用select * from table_ddl这类的操作。 至于表的增大,显然是的。 我觉得你可能关注的是否会flush数据进入存储系统,保证不会out-of-memory这类的问题吗? 我个人猜可能有两种方案: 1.考虑到kafka是一种已经实现持久化的数据源,因此我们并非需要将所有的数据缓存到保存到本地,可以只记录读取到的数据位置,下次读取从记录的位置开始; 2.定期向文件系统写入数据。 小学生 <201782...@qq.com> 于2020年6月10日周三 下午2:48写道: > 各位大佬好,请教一个问题: > 利用kafka作为流式源,注册内部的flink的动态表,如下面的table_ddl,是否由于'update-mode' = > 'append',所以随着kafka消息逐步的推送,会导致table_ddl这个表的数据不断在增大,但是我想要的是kafka每推送一条数据,table_ddl就会相应的是这一条数据,请问这个如何实现呢?(或者有没有啥办法,可以直接从table_ddl中取最新的kafka数据呢)。 > > > table_ddl = """ > CREATE TABLE table_ddl( > trck_id VARCHAR > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'w', > 'connector.startup-mode' = 'group-offsets', > 'connector.properties.group.id' = 'trck_w', > 'update-mode' = 'append', > 'connector.properties.zookeeper.connect' = '*', > 'connector.properties.bootstrap.servers' = '%#', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ) > """
Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题
我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id, 因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。 注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。 方盛凯 于2020年6月9日周二 下午9:26写道: > > 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了 > 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档 > https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html > > 如有错误,欢迎补充回答。 > > 陈赋赟 于2020年6月8日周一 上午11:53写道: > >> 原先sql任务是: >> CREATE TABLE A_source(...) >> CREATE TABLE B_sink (...) >> INSERT INTO B_sink >> SELECT >> 1 >> FROM >> A_source >> ; >> 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为 >> >> >> CREATE TABLE A_source(...) >> CREATE TABLE B_sink (...) >> CREATE TABLE C_source(...) >> CREATE TABLE D_sink (...) >> INSERT INTO B_sink >> SELECT >> 1 >> FROM >> A_source >> ; >> >> >> INSERT INTO C_sink >> SELECT >> 1 >> FROM >> D_source >> ; >> 并基于Savepoint提交,结果显示 >> >> Cannot map checkpoint/savepoint state for operator >> 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator >> is not available in the new program. >> If you want to allow to skip this, you can set the >> --allowNonRestoredState option on the CLI. >> >> >> 想请教一下底层是因为什么原因导致了opertor匹配不上? > >
Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题
可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档 https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html 如有错误,欢迎补充回答。 陈赋赟 于2020年6月8日周一 上午11:53写道: > 原先sql任务是: > CREATE TABLE A_source(...) > CREATE TABLE B_sink (...) > INSERT INTO B_sink > SELECT > 1 > FROM > A_source > ; > 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为 > > > CREATE TABLE A_source(...) > CREATE TABLE B_sink (...) > CREATE TABLE C_source(...) > CREATE TABLE D_sink (...) > INSERT INTO B_sink > SELECT > 1 > FROM > A_source > ; > > > INSERT INTO C_sink > SELECT > 1 > FROM > D_source > ; > 并基于Savepoint提交,结果显示 > > Cannot map checkpoint/savepoint state for operator > 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator > is not available in the new program. > If you want to allow to skip this, you can set the --allowNonRestoredState > option on the CLI. > > > 想请教一下底层是因为什么原因导致了opertor匹配不上?