Flink Weekly | 每周社区动态更新 - 2020/07/30
大家好,本文为 Flink Weekly 的第二十四期,由王松整理,李本超Review。 本期主要内容包括:近期社区开发进展、邮件问题答疑、Flink 最新社区动态及技术文章推荐等。 社区开发进展 Release [releases] Flink 1.11.1 正式发布! 具体信息参考: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-11-1-released-td43335.html Vote [vote] 伍翀发起Refactor Descriptor API to register connectors in Table API的投票 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-129-Refactor-Descriptor-API-to-register-connector-in-Table-API-td43420.html [vote] Shuiqiang Chen发起支持 Python DataStream API (Stateless part) 的投票 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-130-Support-for-Python-DataStream-API-Stateless-Part-td43424.html Discuss [connector] 李本超发起了关于对齐 InputFormat#nextRecord 返回为空值语义的讨论。目前还没有明确的相关 java doc,flink 中通常有三种处理方式: 1. 将 null 作为输入的结尾 2. 跳过 null 3. 假定 InputFormat#nextRecord 中的值不能为 null http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Align-the-semantic-of-returning-null-from-InputFormat-nextRecord-td43379.html [releases] Robert Metzger发起了关于发布 Flink 1.12 计划的讨论,并决定在9月底之前冻结master的功能。 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Planning-Flink-1-12-td43348.html#a43383 [connector] Israel Ekpo发起了关于在 DataStream、Table 和 SQL Connectors 中支持 Azure 平台的讨论,并列出了相关的issue,来跟踪这些 connectors 对 Azure 平台做出的贡献,目前在用户邮件列表中已经有大约50个 Azure 相关的主题,这也证明了 Flink 在Azure平台上的使用度 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-Azure-Platform-Support-in-DataStream-Table-and-SQL-Connectors-td43342.html [connector] Seth Wiesman 发起了关于使用 LIKE 子句创建的 DataGen 表中的时间戳处理问题,目前 DataGen 表只支持 FLINK SQL 的部分字段类型, 比如 TIMESTAMP(3) 就不支持,文档中建议是使用计算列创建事件时间属性。在 DataGen 表中使用 LIKE 子句时,如果物理表是 kafka 表就会报错。 Seth Wiesman 给出了两种解决方式: 1. 在datagen表中支持TIMESTAMP 2. 放宽 LIKE 子句的约束,允许使用计算列覆盖物理列 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-Handling-of-Timestamp-in-DataGen-table-created-via-LIKE-td43433.html [release] Robert Metzger 发起了关于过时blockers和不稳定build的讨论,希望将此作为长期的讨论组,定期同步过时的blocker和不稳定的build,并列出了一些test http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Release-1-12-Stale-blockers-and-build-instabilities-td43477.html 用户问题 [sql] 刘首维 提问如果基于Flink1.11新的API去开发的话,如何获取到DataStream?并列举了几个使用场景: 1. 我们现在有某种特定的Kafka数据格式,1条Kafka数据 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段, 2. 用于处理这种情况,这样对用户是透明的。 3. 我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能 4. 调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的 5. 对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的 云邪进行了回答, 1. 场景1建议做在 DeserializationSchema。 2. 场景2建议封装在 SinkFunction。 3. 场景3社区有计划在 FLIP-95 之上支持,会提供并发(或者分区)推断的能力。 4. 场景4可以通过引入类似 SupportsPartitioning 的接口实现。 并建了一个issue来跟进 [https://issues.apache.org/jira/browse/FLINK-18674] http://apache-flink.147419.n8.nabble.com/1-11Flink-SQL-API-td5261.html#a5275 [sql] Dream-底限 提问如何在eval()方法中传递Row类型? godfrey he、云邪和李本超进行了回答,可以参考[ https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide ]进行实现, 但是Dream-底限需要的是 Row[] 作为eval参数,目前并不支持,社区有 issue[ https://issues.apache.org/jira/browse/FLINK-17855] 正在跟进解决,针对 Dream-底限 打平array的具体场景, 本超提出参考 [ https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins] 的”Expanding arrays into a relation“部分使用flink内置方法解决 http://apache-flink.147419.n8.nabble.com/flink1-11-tablefunction-td5229.html [sql] junbaozhang 提出flink 1.11 executeSql查询kafka表print没有输出? godfrey he进行了回答,1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下, 都是exactly once语义,需要配置checkpoint才能得到结果。 http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-kafka-print-td5367.html#a5370 活动博客文章及其他 共享很重要 —— Flink SQL 中的 catalogs https://flink.apache.org/2020/07/23/catalogs.html Flink 1.11 SQL 使用攻略 https://mp.weixin.qq.com/s/BBRw3sR323d-jaxxONYknQ 高能预警!Apache Flink Meetup · 上海站返场啦 https://mp.weixin.qq.com/s/2k4os3FakPde8IGPtSvglA 你与30W奖金只差一个 Apache Flink 极客挑战赛的报名 https://mp.weixin.qq.com/s/IW6VKWVTrzO1lTDZxJfPXQ
Re: Re: flink on yarn日志问题
我们也有问题 1,和 Yangze Guo 说的一样,每次都要去对应的tm目录中去找日志,很麻烦,不知道有没有更简单的办法。 Yangze Guo 于2020年7月13日周一 下午5:03写道: > 1. > 我验证了一下,如果开启了日志收集,那tm的日志是会保存的,但是你整个application结束前可能看不到,有一个trick的方法,首先在jm日志中找到tm分配到了哪个NodeManager上,通过拼接url的方式来获取container的日志 > 2. 你是否需要调整一下重启策略[1]? 如果开启了ck,默认情况下就会一直尝试重启job > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html > > Best, > Yangze Guo > > > On Mon, Jul 13, 2020 at 2:40 PM 程龙 <13162790...@163.com> wrote: > > > > 不好意思 怪我灭有描述清楚 > > 1 目前开启日志收集功能 > > 2 目前已是 per-job模式 > > 3 集群使用cdh flink.1.10 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2020-07-13 11:18:46,"Yangze Guo" 写道: > > >Hi, > > > > > >第一个问题,您可以尝试开启Yarn的日志收集功能[1] > > > > > >第二个问题,您可以尝试一下per-job mode [2][3] > > > > > >[1] > https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files > > >[2] > https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode > > >[3] > https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn > > > > > > > > >Best, > > >Yangze Guo > > > > > >On Mon, Jul 13, 2020 at 10:49 AM 程龙 <13162790...@163.com> wrote: > > >> > > >> 请问一下两个问题 > > >> 1 flink on yarn的时候 taskmanager 挂掉的时候 上面的日志会被删除掉 无法查看 > ,除了使用es收集日志的这种方案, 还有没有可以使taskmanager 挂掉,相关日志仍然可以保留。 > > >> 2 flink on yarn模式 当由于错误导致taskmanager 挂掉,但是jobmanager 却一直存在, > 有没有好的方式或者策略 , 可以是当task失败 达到重试次数之后 taskmanager挂掉,jobmanager也挂掉 > > >> >
Re: flink sql报错 Could not find any factory for identifier 'kafka'
感谢大家的热情解答,最后问题解决了。原因正是 Leonard Xu所说的,我应该引入的是 flink-sql-connector-kafka-${version}_${scala.binary.version},然后当时改成 flink-sql-connector-kafka 后继续报错的原因是:我还在pom文件中引入了flink-table-planner-blink,如下: org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} 添加provided后就没有问题了。 最后附上正确的pom文件 (如 Jingsong 所说,也可以把flink-sql-connector-kafka、flink-json这些都在pom文件中去掉,直接将jar报放入lib中): org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} org.apache.flink flink-json ${flink.version} org.apache.flink flink-sql-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink flink-core ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} Jingsong Li 于2020年7月13日周一 下午4:35写道: > Hi, > > 1.推荐方式:把flink-sql-connector-kafka-0.11_2.11-1.11.0.jar放入lib下。下载链接:[1] > > 2.次推荐方式:你的java工程打包时,需要用shade插件把kafka相关类shade到最终的jar中。(不能用jar-with-deps,因为它会覆盖掉java > spi) > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > Best, > Jingsong > > On Mon, Jul 13, 2020 at 4:04 PM 王松 wrote: > > > 你好本超, > > 是的,我尝试解压打包好的jar包,里边是包含我pom中写的依赖的 > > > > Benchao Li 于2020年7月13日周一 下午3:42写道: > > > > > 你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去; > > > 或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。 > > > 当然,直接粗暴的放到lib下,也是可以的。 > > > > > > Leonard Xu 于2020年7月13日周一 下午3:38写道: > > > > > > > Hi > > > > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar > > > > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。 > > > > > > > > 祝好 > > > > > > > > > 在 2020年7月13日,15:28,王松 写道: > > > > > > > > > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。 > > > > > > > > > > 我机器上flink/lib下jar包如下: > > > > > -rw-rw-r-- 1 hadoop hadoop117719 6月 30 12:41 > > flink-avro-1.11.0.jar > > > > > -rw-r--r-- 1 hadoop hadoop 90782 7月 8 10:09 > > flink-csv-1.11.0.jar > > > > > -rw-r--r-- 1 hadoop hadoop 108349203 7月 8 10:09 > > > > flink-dist_2.11-1.11.0.jar > > > > > -rw-r--r-- 1 hadoop hadoop 94863 7月 8 10:09 > > flink-json-1.11.0.jar > > > > > -rw-r--r-- 1 hadoop hadoop 7712156 7月 8 10:09 > > > > > flink-shaded-zookeeper-3.4.14.jar > > > > > -rw-r--r-- 1 hadoop hadoop 33325754 7月 8 10:09 > > > > > flink-table_2.11-1.11.0.jar > > > > > -rw-r--r-- 1 hadoop hadoop 37330521 7月 8 10:09 > > > > > flink-table-blink_2.11-1.11.0.jar > > > > > -rw-r--r-- 1 hadoop hadoop 67114 7月 8 10:09 > > > > log4j-1.2-api-2.12.1.jar > > > > > -rw-r--r-- 1 hadoop hadoop276771 7月 8 10:09 > > log4j-api-2.12.1.jar > > > > > -rw-r--r-- 1 hadoop hadoop 1674433 7月 8 10:09 > > log4j-core-2.12.1.jar > > > > > -rw-r--r-- 1 hadoop hadoop 23518 7月 8 10:09 > > > > > log4j-slf4j-impl-2.12.1.jar > > > > > > > > > > Leonard Xu 于2020年7月13日周一 下午3:05写道: > > > > > > > > > >> Hi, > > > > >> flink-connector-kafka_${scala.binary.version 和 > > > > >> flink-sql-connector-kafka_${scala.binary.version > > > > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用, > > > > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL > > > > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的, > > > > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。 > > > > >> > > > > >> [1] 中的话是有SQL Client JAR 的下载链接,就是 > > > > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。 > > > > >> > > > > >> 祝好 > > > > >> Leonard Xu > > > > >> > > > > >>> 在 2020年7月13日,14:42,王松 写道: > > > > >>> > > > > >>> @Leonard Xu, > > > > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1] > > > > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下: > > > > >>> > > > > >>&
Re: flink sql报错 Could not find any factory for identifier 'kafka'
你好本超, 是的,我尝试解压打包好的jar包,里边是包含我pom中写的依赖的 Benchao Li 于2020年7月13日周一 下午3:42写道: > 你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去; > 或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。 > 当然,直接粗暴的放到lib下,也是可以的。 > > Leonard Xu 于2020年7月13日周一 下午3:38写道: > > > Hi > > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar > > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。 > > > > 祝好 > > > > > 在 2020年7月13日,15:28,王松 写道: > > > > > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。 > > > > > > 我机器上flink/lib下jar包如下: > > > -rw-rw-r-- 1 hadoop hadoop117719 6月 30 12:41 flink-avro-1.11.0.jar > > > -rw-r--r-- 1 hadoop hadoop 90782 7月 8 10:09 flink-csv-1.11.0.jar > > > -rw-r--r-- 1 hadoop hadoop 108349203 7月 8 10:09 > > flink-dist_2.11-1.11.0.jar > > > -rw-r--r-- 1 hadoop hadoop 94863 7月 8 10:09 flink-json-1.11.0.jar > > > -rw-r--r-- 1 hadoop hadoop 7712156 7月 8 10:09 > > > flink-shaded-zookeeper-3.4.14.jar > > > -rw-r--r-- 1 hadoop hadoop 33325754 7月 8 10:09 > > > flink-table_2.11-1.11.0.jar > > > -rw-r--r-- 1 hadoop hadoop 37330521 7月 8 10:09 > > > flink-table-blink_2.11-1.11.0.jar > > > -rw-r--r-- 1 hadoop hadoop 67114 7月 8 10:09 > > log4j-1.2-api-2.12.1.jar > > > -rw-r--r-- 1 hadoop hadoop276771 7月 8 10:09 log4j-api-2.12.1.jar > > > -rw-r--r-- 1 hadoop hadoop 1674433 7月 8 10:09 log4j-core-2.12.1.jar > > > -rw-r--r-- 1 hadoop hadoop 23518 7月 8 10:09 > > > log4j-slf4j-impl-2.12.1.jar > > > > > > Leonard Xu 于2020年7月13日周一 下午3:05写道: > > > > > >> Hi, > > >> flink-connector-kafka_${scala.binary.version 和 > > >> flink-sql-connector-kafka_${scala.binary.version > > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用, > > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL > > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的, > > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。 > > >> > > >> [1] 中的话是有SQL Client JAR 的下载链接,就是 > > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。 > > >> > > >> 祝好 > > >> Leonard Xu > > >> > > >>> 在 2020年7月13日,14:42,王松 写道: > > >>> > > >>> @Leonard Xu, > > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1] > > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下: > > >>> > > >>> [1] > > >>> > > >> > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > >>> = > > >>> > > >>> org.apache.flink > > >>> > > >>> > > >> > > > flink-sql-connector-kafka_${scala.binary.version} > > >>> ${flink.version} > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >> > > > > > >>> > > >>> > > >>> > > >>> > > >>> > > >> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> = > > >>> > > >>> Leonard Xu 于2020年7月13日周一 下午1:39写道: > > >>> > > >>>> Hi, 王松 > > >>>> > > >>>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream > > connector > > >>>> > > 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} > > >>>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka > > >>>> datastream connector 同时引用是会冲突的,请根据你的需要使用。 > > >>>> > > >>>> > > >>>> 祝好, > > >>>> Leonard Xu > > >>>> [1] > > >>>> > > >> > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > >>>> < > > >>>> > > >> > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > >>>>> > > >>>>> > > >>>>> org.apache.flink > > >>>>> > > >>>>> > > flink-connector-kafka_${scala.binary.version} > > >>>>> ${flink.version} > > >>>>> > > >>>>> > > >>>>> org.apache.flink > > >>>>> > > >>>>> > > >>>> > > >> > > > flink-connector-kafka-0.11_${scala.binary.version} > > >>>>> ${flink.version} > > >>>>> > > >>>>> > > >>>>> org.apache.flink > > >>>>> > > >>>>> > > flink-connector-kafka_${scala.binary.version} > > >>>>> ${flink.version} > > >>>>> > > >>>>> > > >>>>> org.apache.flink > > >>>>> flink-core > > >>>>> ${flink.version} > > >>>>> > > >>>>> = > > >>>> > > >>>> > > >> > > >> > > > > > > -- > > Best, > Benchao Li >
Re: flink sql报错 Could not find any factory for identifier 'kafka'
这样还是不行,我尝试flink-connector-kafka-0.11_2.11-1.11.0.jar放到lib下,报了另外一个问题: Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase 另外,我是用 bin/flink run -yid xxx xxx.jar 的方式提交任务的,报错是直接在终端报错,没有提交到flink jobmanager上。 Leonard Xu 于2020年7月13日周一 下午3:38写道: > Hi > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。 > > 祝好 > > > 在 2020年7月13日,15:28,王松 写道: > > > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。 > > > > 我机器上flink/lib下jar包如下: > > -rw-rw-r-- 1 hadoop hadoop117719 6月 30 12:41 flink-avro-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 90782 7月 8 10:09 flink-csv-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 108349203 7月 8 10:09 > flink-dist_2.11-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 94863 7月 8 10:09 flink-json-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 7712156 7月 8 10:09 > > flink-shaded-zookeeper-3.4.14.jar > > -rw-r--r-- 1 hadoop hadoop 33325754 7月 8 10:09 > > flink-table_2.11-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 37330521 7月 8 10:09 > > flink-table-blink_2.11-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 67114 7月 8 10:09 > log4j-1.2-api-2.12.1.jar > > -rw-r--r-- 1 hadoop hadoop276771 7月 8 10:09 log4j-api-2.12.1.jar > > -rw-r--r-- 1 hadoop hadoop 1674433 7月 8 10:09 log4j-core-2.12.1.jar > > -rw-r--r-- 1 hadoop hadoop 23518 7月 8 10:09 > > log4j-slf4j-impl-2.12.1.jar > > > > Leonard Xu 于2020年7月13日周一 下午3:05写道: > > > >> Hi, > >> flink-connector-kafka_${scala.binary.version 和 > >> flink-sql-connector-kafka_${scala.binary.version > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用, > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的, > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。 > >> > >> [1] 中的话是有SQL Client JAR 的下载链接,就是 > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。 > >> > >> 祝好 > >> Leonard Xu > >> > >>> 在 2020年7月13日,14:42,王松 写道: > >>> > >>> @Leonard Xu, > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1] > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下: > >>> > >>> [1] > >>> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > >>> = > >>> > >>> org.apache.flink > >>> > >>> > >> > flink-sql-connector-kafka_${scala.binary.version} > >>> ${flink.version} > >>> > >>> > >>> > >>> > >>> > >>> > >> > > >>> > >>> > >>> > >>> > >>> > >> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> = > >>> > >>> Leonard Xu 于2020年7月13日周一 下午1:39写道: > >>> > >>>> Hi, 王松 > >>>> > >>>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream > connector > >>>> > 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} > >>>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka > >>>> datastream connector 同时引用是会冲突的,请根据你的需要使用。 > >>>> > >>>> > >>>> 祝好, > >>>> Leonard Xu > >>>> [1] > >>>> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > >>>> < > >>>> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > >>>>> > >>>>> > >>>>> org.apache.flink > >>>>> > >>>>> > flink-connector-kafka_${scala.binary.version} > >>>>> ${flink.version} > >>>>> > >>>>> > >>>>> org.apache.flink > >>>>> > >>>>> > >>>> > >> > flink-connector-kafka-0.11_${scala.binary.version} > >>>>> ${flink.version} > >>>>> > >>>>> > >>>>> org.apache.flink > >>>>> > >>>>> > flink-connector-kafka_${scala.binary.version} > >>>>> ${flink.version} > >>>>> > >>>>> > >>>>> org.apache.flink > >>>>> flink-core > >>>>> ${flink.version} > >>>>> > >>>>> = > >>>> > >>>> > >> > >> > >
Re: flink sql报错 Could not find any factory for identifier 'kafka'
您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。 我机器上flink/lib下jar包如下: -rw-rw-r-- 1 hadoop hadoop117719 6月 30 12:41 flink-avro-1.11.0.jar -rw-r--r-- 1 hadoop hadoop 90782 7月 8 10:09 flink-csv-1.11.0.jar -rw-r--r-- 1 hadoop hadoop 108349203 7月 8 10:09 flink-dist_2.11-1.11.0.jar -rw-r--r-- 1 hadoop hadoop 94863 7月 8 10:09 flink-json-1.11.0.jar -rw-r--r-- 1 hadoop hadoop 7712156 7月 8 10:09 flink-shaded-zookeeper-3.4.14.jar -rw-r--r-- 1 hadoop hadoop 33325754 7月 8 10:09 flink-table_2.11-1.11.0.jar -rw-r--r-- 1 hadoop hadoop 37330521 7月 8 10:09 flink-table-blink_2.11-1.11.0.jar -rw-r--r-- 1 hadoop hadoop 67114 7月 8 10:09 log4j-1.2-api-2.12.1.jar -rw-r--r-- 1 hadoop hadoop276771 7月 8 10:09 log4j-api-2.12.1.jar -rw-r--r-- 1 hadoop hadoop 1674433 7月 8 10:09 log4j-core-2.12.1.jar -rw-r--r-- 1 hadoop hadoop 23518 7月 8 10:09 log4j-slf4j-impl-2.12.1.jar Leonard Xu 于2020年7月13日周一 下午3:05写道: > Hi, > flink-connector-kafka_${scala.binary.version 和 > flink-sql-connector-kafka_${scala.binary.version > 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用, > 后者的话主要是对前者做了shade处理,方便用户在 SQL > Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的, > 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。 > > [1] 中的话是有SQL Client JAR 的下载链接,就是 > flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。 > > 祝好 > Leonard Xu > > > 在 2020年7月13日,14:42,王松 写道: > > > > @Leonard Xu, > > 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1] > > 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下: > > > > [1] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > = > > > >org.apache.flink > > > > > flink-sql-connector-kafka_${scala.binary.version} > >${flink.version} > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > = > > > > Leonard Xu 于2020年7月13日周一 下午1:39写道: > > > >> Hi, 王松 > >> > >> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream connector > >> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} > >> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka > >> datastream connector 同时引用是会冲突的,请根据你的需要使用。 > >> > >> > >> 祝好, > >> Leonard Xu > >> [1] > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > >> < > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > >>> > >>> > >>> org.apache.flink > >>> > >>> flink-connector-kafka_${scala.binary.version} > >>> ${flink.version} > >>> > >>> > >>> org.apache.flink > >>> > >>> > >> > flink-connector-kafka-0.11_${scala.binary.version} > >>> ${flink.version} > >>> > >>> > >>> org.apache.flink > >>> > >>> flink-connector-kafka_${scala.binary.version} > >>> ${flink.version} > >>> > >>> > >>> org.apache.flink > >>> flink-core > >>> ${flink.version} > >>> > >>> = > >> > >> > >
Re: flink sql报错 Could not find any factory for identifier 'kafka'
@Leonard Xu, 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1] 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下: [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html = org.apache.flink flink-sql-connector-kafka_${scala.binary.version} ${flink.version} = Leonard Xu 于2020年7月13日周一 下午1:39写道: > Hi, 王松 > > 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream connector > 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} > 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka > datastream connector 同时引用是会冲突的,请根据你的需要使用。 > > > 祝好, > Leonard Xu > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > < > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > > > > >org.apache.flink > > > > flink-connector-kafka_${scala.binary.version} > >${flink.version} > > > > > >org.apache.flink > > > > > flink-connector-kafka-0.11_${scala.binary.version} > >${flink.version} > > > > > >org.apache.flink > > > > flink-connector-kafka_${scala.binary.version} > >${flink.version} > > > > > >org.apache.flink > >flink-core > >${flink.version} > > > > = > >
flink sql报错 Could not find any factory for identifier 'kafka'
各位好,写了个demo,代码如下,在本地跑没有问题,提交到yarn session上报错: Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. 请问是什么原因导致的呢? 代码如下: - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings); tenv.executeSql("CREATE TABLE test_table (\n" + " id INT,\n" + " name STRING,\n" + " age INT,\n" + " create_at TIMESTAMP(3)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test_json',\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'format' = 'json',\n" + " 'scan.startup.mode' = 'latest-offset'\n" + ")"); Table table = tenv.sqlQuery("select * from test_table"); tenv.toRetractStream(table, Row.class).print(); env.execute("flink 1.11.0 demo"); - pom 文件如下: = 2.11 1.11.0 org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} org.apache.flink flink-table-runtime-blink_${scala.binary.version} ${flink.version} org.apache.flink flink-json ${flink.version} org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-kafka-0.11_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink flink-core ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-table-common ${flink.version} =
Re: flink基于yarn的HA次数无效,以及HA拉起的任务是否可以重用state
hi, muchen 1. yarn.application-attempts 这个参数与另外一个参数有关系:yarn.application-attempt-failures-validity-interval,大概意思是需要在设置的这个interval内失败重试多少次,才认为flink job是失败的,如果超过这个interval,就会重新开始计数。打个比方,yarn.application-attempts: 2,yarn.application-attempt-failures-validity-interval = 1(默认值,10s),只有在10s内 flink job 失败重启2次才会真正的失败。 2. 如果配置了checkpoint是会重用上次任务失败的state。 这是我个人的理解,有疑问大家一起讨论 MuChen <9329...@qq.com> 于2020年7月1日周三 下午7:50写道: > hi,all: > > 我根据这篇博客https://blog.csdn.net/cndotaci/article/details/106870413 > 的介绍,配置了flink基于yarn的高可用,测试时发现配置的任务失败重试2次没有生效,我测试到第6次时,任务仍然能够被yarn拉起。 > > 请问各位大佬 > > 1. 下面配置中的重试次数为什么没有生效? > > 2. 通过HA拉起的任务,是否可以重用上次任务失败时的state? > > flink版本:1.10.0 > > flink-conf.yaml配置: > $ grep -v ^# flink-conf.yaml |grep -v ^$ jobmanager.rpc.address: localhost > jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m > taskmanager.memory.process.size: 1568m taskmanager.numberOfTaskSlots: 1 > parallelism.default: 1 high-availability: zookeeper > high-availability.storageDir: hdfs:///flink/ha/ > high-availability.zookeeper.quorum: > uhadoop-op3raf-master1,uhadoop-op3raf-master2,uhadoop-op3raf-core1 > state.checkpoints.dir: hdfs:///flink/checkpoint state.savepoints.dir: > hdfs:///flink/flink-savepoints state.checkpoints.num-retained:60 > state.backend.incremental: true jobmanager.execution.failover-strategy: > region jobmanager.archive.fs.dir: hdfs:///flink/flink-jobs/ > historyserver.web.port: 8082 historyserver.archive.fs.dir: > hdfs:///flink/flink-jobs/ historyserver.archive.fs.refresh-interval: 1 > # HA重试次数 yarn.application-attempts: 2 > ssh到jm节点,手动kill任务的操作日志: > [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 16853 > YarnTaskExecutorRunner 17527 PrestoServer 33289 YarnTaskExecutorRunner > 18026 YarnJobClusterEntrypoint 20283 Jps 39599 NodeManager > [root@uhadoop-op3raf-task48 ~]# kill -9 18026 [root@uhadoop-op3raf-task48 > ~]# jps 34785 YarnTaskExecutorRunner 16853 -- process information > unavailable 17527 PrestoServer 21383 Jps 33289 YarnTaskExecutorRunner 20412 > YarnJobClusterEntrypoint 39599 NodeManager [root@uhadoop-op3raf-task48 > ~]# kill -9 20412 [root@uhadoop-op3raf-task48 ~]# jps 34785 > YarnTaskExecutorRunner 21926 YarnJobClusterEntrypoint 23207 Jps 17527 > PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager > [root@uhadoop-op3raf-task48 ~]# kill -9 21926 [root@uhadoop-op3raf-task48 > ~]# jps 34785 YarnTaskExecutorRunner 23318 YarnJobClusterEntrypoint 26279 > Jps 17527 PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager > [root@uhadoop-op3raf-task48 ~]# kill -9 23318
Re: flink SQL如何将秒转换为timestamp
可以试试这样写: TO_TIMESTAMP(FROM_UNIXTIME(itime, '-MM-dd HH:mm:ss')) zilong xiao 于2020年6月30日周二 下午4:30写道: > 有一个字段itime,类型为int,意为当前时间的秒值,如何将该字段转换成timestamp?以下是我的想法,不知是否正确,求遇到过类似问题的大佬指导 > > TO_TIMESTAMP(DATE_FORMAT(CAST(itime * 1000 as TIMESTAMP(3)), '-MM-dd > hh:mm:ss')) >
Re: Re: Re:Re: flink sql job 提交到yarn上报错
那你在命令行执行:hadoop classpath,有hadoop的classpath输出吗? Zhou Zach 于2020年6月16日周二 下午3:22写道: > > > > > > > 在/etc/profile下,目前只加了 > export HADOOP_CLASSPATH=`hadoop classpath` > 我是安装的CDH,没找到sbin这个文件。。 > > > > > > > > > > > > 在 2020-06-16 15:05:12,"王松" 写道: > >你配置HADOOP_HOME和HADOOP_CLASSPATH这两个环境变量了吗? > > > >export HADOOP_HOME=/usr/local/hadoop-2.7.2 > >export HADOOP_CLASSPATH=`hadoop classpath` > >export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH > > > >Zhou Zach 于2020年6月16日周二 下午2:53写道: > > > >> flink/lib/下的jar: > >> flink-connector-hive_2.11-1.10.0.jar > >> flink-dist_2.11-1.10.0.jar > >> flink-jdbc_2.11-1.10.0.jar > >> flink-json-1.10.0.jar > >> flink-shaded-hadoop-2-3.0.0-cdh6.3.0-7.0.jar > >> flink-sql-connector-kafka_2.11-1.10.0.jar > >> flink-table_2.11-1.10.0.jar > >> flink-table-blink_2.11-1.10.0.jar > >> hbase-client-2.1.0.jar > >> hbase-common-2.1.0.jar > >> hive-exec-2.1.1.jar > >> mysql-connector-java-5.1.49.jar > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-06-16 14:48:43,"Zhou Zach" 写道: > >> > > >> > > >> > > >> > > >> >high-availability.storageDir: hdfs:///flink/ha/ > >> >high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181 > >> >state.backend: filesystem > >> >state.checkpoints.dir: > hdfs://nameservice1:8020//user/flink10/checkpoints > >> >state.savepoints.dir: hdfs://nameservice1:8020//user/flink10/savepoints > >> >high-availability.zookeeper.path.root: /flink > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> >在 2020-06-16 14:44:02,"王松" 写道: > >> >>你的配置文件中ha配置可以贴下吗 > >> >> > >> >>Zhou Zach 于2020年6月16日周二 下午1:49写道: > >> >> > >> >>> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: > Failed > >> to > >> >>> initialize the cluster entrypoint YarnJobClusterEntrypoint. > >> >>> > >> >>> at > >> >>> > >> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187) > >> >>> > >> >>> at > >> >>> > >> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518) > >> >>> > >> >>> at > >> >>> > >> > org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119) > >> >>> > >> >>> Caused by: java.io.IOException: Could not create FileSystem for > highly > >> >>> available storage path > (hdfs:/flink/ha/application_1592215995564_0027) > >> >>> > >> >>> at > >> >>> > >> > org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103) > >> >>> > >> >>> at > >> >>> > >> > org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89) > >> >>> > >> >>> at > >> >>> > >> > org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125) > >> >>> > >> >>> at > >> >>> > >> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305) > >> >>> > >> >>> at > >> >>> > >> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263) > >> >>> > >> >>> at > >> >>> > >> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207) > >> >>> > >> >>> at > >> >>> > >> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
Re: flink1.11 小疑问(提升 DDL 易用性(动态 Table 属性))
6.14号的meetup中讲的动态 Table 属性很清楚,附个链接: https://developer.aliyun.com/live/2894?accounttraceid=07deb589f50c4c1abbcbed103e534316qnxq ,大概在04:17:00左右。 Kurt Young 于2020年6月16日周二 下午12:12写道: > table hint的语法是紧跟在你query中访问某张表的时候,所以我理解并不会有 ”这个动态参数作用在哪张表“ 上的疑问吧? > > Best, > Kurt > > > On Tue, Jun 16, 2020 at 10:02 AM Yichao Yang <1048262...@qq.com> wrote: > > > Hi > > > > > > 1.2版本将会有like字句的支持,参考[1],不过也是通过定义一张表的方式,而不是直接在query内指定。 > > > > > 个人理解在query内指定其实会涉及到很多因素,假设涉及到多张表的时候,涉及到同key属性时,你在query内指定的属性到底是赋予给哪张表的?这个其实是比较模糊的。 > > > > > > [1] > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table > > > > > > Best, > > Yichao Yang > > > > > > > > > > -- 原始邮件 -- > > 发件人: "Kurt Young" > 发送时间: 2020年6月16日(星期二) 上午9:53 > > 收件人: "user-zh" > > > 主题: Re: flink1.11 小疑问(提升 DDL 易用性(动态 Table 属性)) > > > > > > > > 就是你DDL定义表的时候的WITH参数,有时候有个别参数写的不对或者需要调整,可以在query里直接修改,而不用重新定义一张新表。 > > > > Best, > > Kurt > > > > > > On Tue, Jun 16, 2020 at 9:49 AM kcz <573693...@qq.com> wrote: > > > > > 动态 Table 属性是指什么?可以举一个列子吗。 >
Re: Re:Re: flink sql job 提交到yarn上报错
你配置HADOOP_HOME和HADOOP_CLASSPATH这两个环境变量了吗? export HADOOP_HOME=/usr/local/hadoop-2.7.2 export HADOOP_CLASSPATH=`hadoop classpath` export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH Zhou Zach 于2020年6月16日周二 下午2:53写道: > flink/lib/下的jar: > flink-connector-hive_2.11-1.10.0.jar > flink-dist_2.11-1.10.0.jar > flink-jdbc_2.11-1.10.0.jar > flink-json-1.10.0.jar > flink-shaded-hadoop-2-3.0.0-cdh6.3.0-7.0.jar > flink-sql-connector-kafka_2.11-1.10.0.jar > flink-table_2.11-1.10.0.jar > flink-table-blink_2.11-1.10.0.jar > hbase-client-2.1.0.jar > hbase-common-2.1.0.jar > hive-exec-2.1.1.jar > mysql-connector-java-5.1.49.jar > > > > > > > > > > > > > > > > > > 在 2020-06-16 14:48:43,"Zhou Zach" 写道: > > > > > > > > > >high-availability.storageDir: hdfs:///flink/ha/ > >high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181 > >state.backend: filesystem > >state.checkpoints.dir: hdfs://nameservice1:8020//user/flink10/checkpoints > >state.savepoints.dir: hdfs://nameservice1:8020//user/flink10/savepoints > >high-availability.zookeeper.path.root: /flink > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >在 2020-06-16 14:44:02,"王松" 写道: > >>你的配置文件中ha配置可以贴下吗 > >> > >>Zhou Zach 于2020年6月16日周二 下午1:49写道: > >> > >>> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed > to > >>> initialize the cluster entrypoint YarnJobClusterEntrypoint. > >>> > >>> at > >>> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187) > >>> > >>> at > >>> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518) > >>> > >>> at > >>> > org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119) > >>> > >>> Caused by: java.io.IOException: Could not create FileSystem for highly > >>> available storage path (hdfs:/flink/ha/application_1592215995564_0027) > >>> > >>> at > >>> > org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103) > >>> > >>> at > >>> > org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89) > >>> > >>> at > >>> > org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125) > >>> > >>> at > >>> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305) > >>> > >>> at > >>> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263) > >>> > >>> at > >>> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207) > >>> > >>> at > >>> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169) > >>> > >>> at java.security.AccessController.doPrivileged(Native Method) > >>> > >>> at javax.security.auth.Subject.doAs(Subject.java:422) > >>> > >>> at > >>> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746) > >>> > >>> at > >>> > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > >>> > >>> at > >>> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168) > >>> > >>> ... 2 more > >>> > >>> Caused by: > org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > >>> Could not find a file system implementation for scheme 'hdfs'. The > scheme > >>> is not directly supported by Flink and no Hadoop file system to support > >>> this scheme could be loaded. > >>> > >>> at > >>> > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450) > >>> > >>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362) > >>> > >>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298) > >>> > >>> at >
Re: flink sql job 提交到yarn上报错
你的配置文件中ha配置可以贴下吗 Zhou Zach 于2020年6月16日周二 下午1:49写道: > org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to > initialize the cluster entrypoint YarnJobClusterEntrypoint. > > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187) > > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518) > > at > org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119) > > Caused by: java.io.IOException: Could not create FileSystem for highly > available storage path (hdfs:/flink/ha/application_1592215995564_0027) > > at > org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103) > > at > org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89) > > at > org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125) > > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305) > > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263) > > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207) > > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:422) > > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746) > > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168) > > ... 2 more > > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Could not find a file system implementation for scheme 'hdfs'. The scheme > is not directly supported by Flink and no Hadoop file system to support > this scheme could be loaded. > > at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450) > > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362) > > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298) > > at > org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100) > > ... 13 more > > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in > the classpath, or some classes are missing from the classpath. > > at > org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:184) > > at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446) > > ... 16 more > > Caused by: java.lang.VerifyError: Bad return type > > Exception Details: > > Location: > > > org/apache/hadoop/hdfs/DFSClient.getQuotaUsage(Ljava/lang/String;)Lorg/apache/hadoop/fs/QuotaUsage; > @160: areturn > > Reason: > > Type 'org/apache/hadoop/fs/ContentSummary' (current frame, stack[0]) > is not assignable to 'org/apache/hadoop/fs/QuotaUsage' (from method > signature) > > Current Frame: > > bci: @160 > > flags: { } > > locals: { 'org/apache/hadoop/hdfs/DFSClient', 'java/lang/String', > 'org/apache/hadoop/ipc/RemoteException', 'java/io/IOException' } > stack: { 'org/apache/hadoop/fs/ContentSummary' } > > > > 在本地intellij idea中可以正常运行,flink job上订阅kafka,sink到mysql和hbase,集群flink lib目录下, > > > >
flink 执行一段时间后报错:Requesting TaskManager's path for query services failed.
flink 版本:1.9.0 大家好,flink任务在执行一段时间后,出现报错:Requesting TaskManager's path for query services failed,akka.pattern.AskTimeoutException,请问这个原因是什么呢?详细错误输出如下: 2020-06-15 02:12:07,434 WARN org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl - Requesting TaskManager's path for query services failed. java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-1311898795]] after [1 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871) at akka.dispatch.OnComplete.internal(Future.scala:263) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) at java.lang.Thread.run(Thread.java:748) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-1311898795]] after [1 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) ... 9 more 2020-06-15 02:12:12,603 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Flat Map -> Filter -> Map (2/32) (689c6e191be63afea5182581e2875d4d) switched from CANCELING to CANCELED. 2020-06-15 02:12:12,604 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Flat Map -> Filter -> Map (4/32) (d2f0fbf94e3d7cddc0f20080f94a1692) switched from CANCELING to CANCELED. 2020-06-15 02:12:12,604 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Flat Map -> Filter -> Map (3/32) (28c8833ebbc14377bc74e7a389b4220b) switched from CANCELING to CANCELED. 2020-06-15 02:12:12,604 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Flat Map -> Filter -> Map (5/32) (585854caaef14df77b084f92eb5fb4a2) switched from CANCELING to CANCELED. 2020-06-15 02:12:12,604 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Flat Map -> Filter -> Map (15/32) (fc699114e90473fda9c717b5e0d4b716) switched from CANCELING to CANCELED. 2020-06-15 02:12:12,605 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Flat Map -> Filter -> Map (22/32) (e3569b88e95af3d06e751ade5cdde34c) switched from CANCELING to CANCELED. 2020-06-15 02:12:12,605 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Flat Map -> Filter -> Map (31/32) (a653c6cbb83c04b6c3f36a3ba80e3979) switched from CANCELING to CANCELED. 2020-06-15 02:12:12,605 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Filter -> Map (1/6) (89af922e79d370e2df9fb4a9629ad100) switched from CANCELING to CANCELED. 2020-06-15 02:12:12,606 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Filter -> Map (2/6) (7006987f6159a65598fe11237490a666) switched from CANCELING to CANCELED. 2020-06-15 02:12:12,606 INFO