Flink Weekly | 每周社区动态更新 - 2020/07/30

2020-07-29 文章
大家好,本文为 Flink Weekly 的第二十四期,由王松整理,李本超Review。

本期主要内容包括:近期社区开发进展、邮件问题答疑、Flink 最新社区动态及技术文章推荐等。


社区开发进展 Release

[releases] Flink 1.11.1 正式发布!

具体信息参考:

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-11-1-released-td43335.html
Vote

[vote] 伍翀发起Refactor Descriptor API to register connectors in Table API的投票

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-129-Refactor-Descriptor-API-to-register-connector-in-Table-API-td43420.html

[vote] Shuiqiang Chen发起支持 Python DataStream API (Stateless part) 的投票

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-130-Support-for-Python-DataStream-API-Stateless-Part-td43424.html
Discuss

[connector] 李本超发起了关于对齐 InputFormat#nextRecord 返回为空值语义的讨论。目前还没有明确的相关 java
doc,flink 中通常有三种处理方式:

   1.

   将 null 作为输入的结尾
   2.

   跳过 null
   3.

   假定 InputFormat#nextRecord 中的值不能为 null

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Align-the-semantic-of-returning-null-from-InputFormat-nextRecord-td43379.html

[releases] Robert Metzger发起了关于发布 Flink 1.12 计划的讨论,并决定在9月底之前冻结master的功能。

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Planning-Flink-1-12-td43348.html#a43383

[connector] Israel Ekpo发起了关于在 DataStream、Table 和 SQL Connectors 中支持 Azure
平台的讨论,并列出了相关的issue,来跟踪这些 connectors

对 Azure 平台做出的贡献,目前在用户邮件列表中已经有大约50个 Azure 相关的主题,这也证明了 Flink 在Azure平台上的使用度

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-Azure-Platform-Support-in-DataStream-Table-and-SQL-Connectors-td43342.html

[connector] Seth Wiesman 发起了关于使用 LIKE 子句创建的 DataGen 表中的时间戳处理问题,目前 DataGen
表只支持 FLINK SQL 的部分字段类型,

比如 TIMESTAMP(3) 就不支持,文档中建议是使用计算列创建事件时间属性。在 DataGen 表中使用 LIKE 子句时,如果物理表是
kafka 表就会报错。

Seth Wiesman 给出了两种解决方式:

   1.

   在datagen表中支持TIMESTAMP
   2.

   放宽 LIKE 子句的约束,允许使用计算列覆盖物理列

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-Handling-of-Timestamp-in-DataGen-table-created-via-LIKE-td43433.html

[release] Robert Metzger
发起了关于过时blockers和不稳定build的讨论,希望将此作为长期的讨论组,定期同步过时的blocker和不稳定的build,并列出了一些test

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Release-1-12-Stale-blockers-and-build-instabilities-td43477.html
用户问题

[sql] 刘首维 提问如果基于Flink1.11新的API去开发的话,如何获取到DataStream?并列举了几个使用场景:

   1.

   我们现在有某种特定的Kafka数据格式,1条Kafka数据
   会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,
   2.

   用于处理这种情况,这样对用户是透明的。
   3.

   我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
   4.

   调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
   5.

   对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的

云邪进行了回答,

   1.

   场景1建议做在 DeserializationSchema。
   2.

   场景2建议封装在 SinkFunction。
   3.

   场景3社区有计划在 FLIP-95 之上支持,会提供并发(或者分区)推断的能力。
   4.

   场景4可以通过引入类似 SupportsPartitioning 的接口实现。

并建了一个issue来跟进 [https://issues.apache.org/jira/browse/FLINK-18674]

http://apache-flink.147419.n8.nabble.com/1-11Flink-SQL-API-td5261.html#a5275

[sql] Dream-底限 提问如何在eval()方法中传递Row类型?

godfrey he、云邪和李本超进行了回答,可以参考[
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
]进行实现,

但是Dream-底限需要的是 Row[] 作为eval参数,目前并不支持,社区有 issue[
https://issues.apache.org/jira/browse/FLINK-17855] 正在跟进解决,针对 Dream-底限
打平array的具体场景,

本超提出参考 [
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins]
的”Expanding arrays into a relation“部分使用flink内置方法解决

http://apache-flink.147419.n8.nabble.com/flink1-11-tablefunction-td5229.html

[sql] junbaozhang 提出flink 1.11 executeSql查询kafka表print没有输出?

godfrey he进行了回答,1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下,

都是exactly once语义,需要配置checkpoint才能得到结果。

http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-kafka-print-td5367.html#a5370
活动博客文章及其他

共享很重要 —— Flink SQL 中的 catalogs

https://flink.apache.org/2020/07/23/catalogs.html

Flink 1.11 SQL 使用攻略

https://mp.weixin.qq.com/s/BBRw3sR323d-jaxxONYknQ

高能预警!Apache Flink Meetup · 上海站返场啦

https://mp.weixin.qq.com/s/2k4os3FakPde8IGPtSvglA

你与30W奖金只差一个 Apache Flink 极客挑战赛的报名

https://mp.weixin.qq.com/s/IW6VKWVTrzO1lTDZxJfPXQ


Re: Re: flink on yarn日志问题

2020-07-13 文章
我们也有问题 1,和 Yangze Guo 说的一样,每次都要去对应的tm目录中去找日志,很麻烦,不知道有没有更简单的办法。

Yangze Guo  于2020年7月13日周一 下午5:03写道:

> 1.
> 我验证了一下,如果开启了日志收集,那tm的日志是会保存的,但是你整个application结束前可能看不到,有一个trick的方法,首先在jm日志中找到tm分配到了哪个NodeManager上,通过拼接url的方式来获取container的日志
> 2. 你是否需要调整一下重启策略[1]? 如果开启了ck,默认情况下就会一直尝试重启job
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html
>
> Best,
> Yangze Guo
>
>
> On Mon, Jul 13, 2020 at 2:40 PM 程龙 <13162790...@163.com> wrote:
> >
> > 不好意思  怪我灭有描述清楚
> > 1 目前开启日志收集功能
> > 2 目前已是 per-job模式
> > 3 集群使用cdh flink.1.10
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-07-13 11:18:46,"Yangze Guo"  写道:
> > >Hi,
> > >
> > >第一个问题,您可以尝试开启Yarn的日志收集功能[1]
> > >
> > >第二个问题,您可以尝试一下per-job mode [2][3]
> > >
> > >[1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
> > >[2]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
> > >[3]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
> > >
> > >
> > >Best,
> > >Yangze Guo
> > >
> > >On Mon, Jul 13, 2020 at 10:49 AM 程龙 <13162790...@163.com> wrote:
> > >>
> > >> 请问一下两个问题
> > >> 1 flink on yarn的时候 taskmanager 挂掉的时候 上面的日志会被删除掉 无法查看
> ,除了使用es收集日志的这种方案, 还有没有可以使taskmanager 挂掉,相关日志仍然可以保留。
> > >> 2 flink on yarn模式 当由于错误导致taskmanager 挂掉,但是jobmanager 却一直存在,
> 有没有好的方式或者策略 ,   可以是当task失败 达到重试次数之后 taskmanager挂掉,jobmanager也挂掉
> > >>
>


Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-13 文章
感谢大家的热情解答,最后问题解决了。原因正是 Leonard Xu所说的,我应该引入的是
flink-sql-connector-kafka-${version}_${scala.binary.version},然后当时改成
flink-sql-connector-kafka
后继续报错的原因是:我还在pom文件中引入了flink-table-planner-blink,如下:

org.apache.flink

flink-table-planner-blink_${scala.binary.version}
${flink.version}

添加provided后就没有问题了。

最后附上正确的pom文件 (如 Jingsong
所说,也可以把flink-sql-connector-kafka、flink-json这些都在pom文件中去掉,直接将jar报放入lib中):




org.apache.flink

flink-table-planner-blink_${scala.binary.version}
${flink.version}



org.apache.flink
flink-json
${flink.version}


org.apache.flink

flink-sql-connector-kafka_${scala.binary.version}
${flink.version}


org.apache.flink
flink-core
${flink.version}


org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}




Jingsong Li  于2020年7月13日周一 下午4:35写道:

> Hi,
>
> 1.推荐方式:把flink-sql-connector-kafka-0.11_2.11-1.11.0.jar放入lib下。下载链接:[1]
>
> 2.次推荐方式:你的java工程打包时,需要用shade插件把kafka相关类shade到最终的jar中。(不能用jar-with-deps,因为它会覆盖掉java
> spi)
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>
> Best,
> Jingsong
>
> On Mon, Jul 13, 2020 at 4:04 PM 王松  wrote:
>
> > 你好本超,
> > 是的,我尝试解压打包好的jar包,里边是包含我pom中写的依赖的
> >
> > Benchao Li  于2020年7月13日周一 下午3:42写道:
> >
> > > 你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去;
> > > 或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。
> > > 当然,直接粗暴的放到lib下,也是可以的。
> > >
> > > Leonard Xu  于2020年7月13日周一 下午3:38写道:
> > >
> > > > Hi
> > > > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar
> > > > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。
> > > >
> > > > 祝好
> > > >
> > > > > 在 2020年7月13日,15:28,王松  写道:
> > > > >
> > > > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
> > > > >
> > > > > 我机器上flink/lib下jar包如下:
> > > > > -rw-rw-r-- 1 hadoop hadoop117719 6月  30 12:41
> > flink-avro-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 90782 7月   8 10:09
> > flink-csv-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09
> > > > flink-dist_2.11-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 94863 7月   8 10:09
> > flink-json-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
> > > > > flink-shaded-zookeeper-3.4.14.jar
> > > > > -rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
> > > > > flink-table_2.11-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
> > > > > flink-table-blink_2.11-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 67114 7月   8 10:09
> > > > log4j-1.2-api-2.12.1.jar
> > > > > -rw-r--r-- 1 hadoop hadoop276771 7月   8 10:09
> > log4j-api-2.12.1.jar
> > > > > -rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09
> > log4j-core-2.12.1.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 23518 7月   8 10:09
> > > > > log4j-slf4j-impl-2.12.1.jar
> > > > >
> > > > > Leonard Xu  于2020年7月13日周一 下午3:05写道:
> > > > >
> > > > >> Hi,
> > > > >> flink-connector-kafka_${scala.binary.version 和
> > > > >> flink-sql-connector-kafka_${scala.binary.version
> > > > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> > > > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> > > > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> > > > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
> > > > >>
> > > > >> [1] 中的话是有SQL Client JAR 的下载链接,就是
> > > > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
> > > > >>
> > > > >> 祝好
> > > > >> Leonard Xu
> > > > >>
> > > > >>> 在 2020年7月13日,14:42,王松  写道:
> > > > >>>
> > > > >>> @Leonard Xu,
> > > > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> > > > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> > > > >>>
> > > > >>&

Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-13 文章
你好本超,
是的,我尝试解压打包好的jar包,里边是包含我pom中写的依赖的

Benchao Li  于2020年7月13日周一 下午3:42写道:

> 你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去;
> 或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。
> 当然,直接粗暴的放到lib下,也是可以的。
>
> Leonard Xu  于2020年7月13日周一 下午3:38写道:
>
> > Hi
> > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar
> > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。
> >
> > 祝好
> >
> > > 在 2020年7月13日,15:28,王松  写道:
> > >
> > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
> > >
> > > 我机器上flink/lib下jar包如下:
> > > -rw-rw-r-- 1 hadoop hadoop117719 6月  30 12:41 flink-avro-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop 90782 7月   8 10:09 flink-csv-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09
> > flink-dist_2.11-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop 94863 7月   8 10:09 flink-json-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
> > > flink-shaded-zookeeper-3.4.14.jar
> > > -rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
> > > flink-table_2.11-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
> > > flink-table-blink_2.11-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop 67114 7月   8 10:09
> > log4j-1.2-api-2.12.1.jar
> > > -rw-r--r-- 1 hadoop hadoop276771 7月   8 10:09 log4j-api-2.12.1.jar
> > > -rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09 log4j-core-2.12.1.jar
> > > -rw-r--r-- 1 hadoop hadoop 23518 7月   8 10:09
> > > log4j-slf4j-impl-2.12.1.jar
> > >
> > > Leonard Xu  于2020年7月13日周一 下午3:05写道:
> > >
> > >> Hi,
> > >> flink-connector-kafka_${scala.binary.version 和
> > >> flink-sql-connector-kafka_${scala.binary.version
> > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
> > >>
> > >> [1] 中的话是有SQL Client JAR 的下载链接,就是
> > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
> > >>
> > >> 祝好
> > >> Leonard Xu
> > >>
> > >>> 在 2020年7月13日,14:42,王松  写道:
> > >>>
> > >>> @Leonard Xu,
> > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> > >>>
> > >>> [1]
> > >>>
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > >>> =
> > >>> 
> > >>>   org.apache.flink
> > >>>
> > >>>
> > >>
> >
> flink-sql-connector-kafka_${scala.binary.version}
> > >>>   ${flink.version}
> > >>>   
> > >>>
> > >>>
> > >>>   
> > >>>
> > >>>
> > >>
> >
> 
> > >>>   
> > >>>   
> > >>>   
> > >>>   
> > >>>
> > >> 
> > >>>   
> > >>>   
> > >>>   
> > >>>
> > >>>   
> > >>>   
> > >>>   
> > >>>   
> > >>>   
> > >>> =
> > >>>
> > >>> Leonard Xu  于2020年7月13日周一 下午1:39写道:
> > >>>
> > >>>> Hi, 王松
> > >>>>
> > >>>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream
> > connector
> > >>>>
> > 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
> > >>>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
> > >>>> datastream  connector 同时引用是会冲突的,请根据你的需要使用。
> > >>>>
> > >>>>
> > >>>> 祝好,
> > >>>> Leonard Xu
> > >>>> [1]
> > >>>>
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > >>>> <
> > >>>>
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > >>>>>
> > >>>>>  
> > >>>>>  org.apache.flink
> > >>>>>
> > >>>>>
> > flink-connector-kafka_${scala.binary.version}
> > >>>>>  ${flink.version}
> > >>>>>  
> > >>>>>  
> > >>>>>  org.apache.flink
> > >>>>>
> > >>>>>
> > >>>>
> > >>
> >
> flink-connector-kafka-0.11_${scala.binary.version}
> > >>>>>  ${flink.version}
> > >>>>>  
> > >>>>>  
> > >>>>>  org.apache.flink
> > >>>>>
> > >>>>>
> > flink-connector-kafka_${scala.binary.version}
> > >>>>>  ${flink.version}
> > >>>>>  
> > >>>>>  
> > >>>>>  org.apache.flink
> > >>>>>  flink-core
> > >>>>>  ${flink.version}
> > >>>>>  
> > >>>>> =
> > >>>>
> > >>>>
> > >>
> > >>
> >
> >
>
> --
>
> Best,
> Benchao Li
>


Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-13 文章
这样还是不行,我尝试flink-connector-kafka-0.11_2.11-1.11.0.jar放到lib下,报了另外一个问题:
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase


另外,我是用 bin/flink run -yid xxx xxx.jar 的方式提交任务的,报错是直接在终端报错,没有提交到flink
jobmanager上。

Leonard Xu  于2020年7月13日周一 下午3:38写道:

> Hi
> 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar
> 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。
>
> 祝好
>
> > 在 2020年7月13日,15:28,王松  写道:
> >
> > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
> >
> > 我机器上flink/lib下jar包如下:
> > -rw-rw-r-- 1 hadoop hadoop117719 6月  30 12:41 flink-avro-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop 90782 7月   8 10:09 flink-csv-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09
> flink-dist_2.11-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop 94863 7月   8 10:09 flink-json-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
> > flink-shaded-zookeeper-3.4.14.jar
> > -rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
> > flink-table_2.11-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
> > flink-table-blink_2.11-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop 67114 7月   8 10:09
> log4j-1.2-api-2.12.1.jar
> > -rw-r--r-- 1 hadoop hadoop276771 7月   8 10:09 log4j-api-2.12.1.jar
> > -rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09 log4j-core-2.12.1.jar
> > -rw-r--r-- 1 hadoop hadoop 23518 7月   8 10:09
> > log4j-slf4j-impl-2.12.1.jar
> >
> > Leonard Xu  于2020年7月13日周一 下午3:05写道:
> >
> >> Hi,
> >> flink-connector-kafka_${scala.binary.version 和
> >> flink-sql-connector-kafka_${scala.binary.version
> >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> >> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
> >>
> >> [1] 中的话是有SQL Client JAR 的下载链接,就是
> >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
> >>
> >> 祝好
> >> Leonard Xu
> >>
> >>> 在 2020年7月13日,14:42,王松  写道:
> >>>
> >>> @Leonard Xu,
> >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> >>>
> >>> [1]
> >>>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >>> =
> >>> 
> >>>   org.apache.flink
> >>>
> >>>
> >>
> flink-sql-connector-kafka_${scala.binary.version}
> >>>   ${flink.version}
> >>>   
> >>>
> >>>
> >>>   
> >>>
> >>>
> >>
> 
> >>>   
> >>>   
> >>>   
> >>>   
> >>>
> >> 
> >>>   
> >>>   
> >>>   
> >>>
> >>>   
> >>>   
> >>>   
> >>>   
> >>>   
> >>> =
> >>>
> >>> Leonard Xu  于2020年7月13日周一 下午1:39写道:
> >>>
> >>>> Hi, 王松
> >>>>
> >>>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream
> connector
> >>>>
> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
> >>>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
> >>>> datastream  connector 同时引用是会冲突的,请根据你的需要使用。
> >>>>
> >>>>
> >>>> 祝好,
> >>>> Leonard Xu
> >>>> [1]
> >>>>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >>>> <
> >>>>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >>>>>
> >>>>>  
> >>>>>  org.apache.flink
> >>>>>
> >>>>>
> flink-connector-kafka_${scala.binary.version}
> >>>>>  ${flink.version}
> >>>>>  
> >>>>>  
> >>>>>  org.apache.flink
> >>>>>
> >>>>>
> >>>>
> >>
> flink-connector-kafka-0.11_${scala.binary.version}
> >>>>>  ${flink.version}
> >>>>>  
> >>>>>  
> >>>>>  org.apache.flink
> >>>>>
> >>>>>
> flink-connector-kafka_${scala.binary.version}
> >>>>>  ${flink.version}
> >>>>>  
> >>>>>  
> >>>>>  org.apache.flink
> >>>>>  flink-core
> >>>>>  ${flink.version}
> >>>>>  
> >>>>> =
> >>>>
> >>>>
> >>
> >>
>
>


Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-13 文章
您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。

我机器上flink/lib下jar包如下:
-rw-rw-r-- 1 hadoop hadoop117719 6月  30 12:41 flink-avro-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop 90782 7月   8 10:09 flink-csv-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09 flink-dist_2.11-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop 94863 7月   8 10:09 flink-json-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
flink-table_2.11-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
flink-table-blink_2.11-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop 67114 7月   8 10:09 log4j-1.2-api-2.12.1.jar
-rw-r--r-- 1 hadoop hadoop276771 7月   8 10:09 log4j-api-2.12.1.jar
-rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09 log4j-core-2.12.1.jar
-rw-r--r-- 1 hadoop hadoop 23518 7月   8 10:09
log4j-slf4j-impl-2.12.1.jar

Leonard Xu  于2020年7月13日周一 下午3:05写道:

> Hi,
> flink-connector-kafka_${scala.binary.version 和
> flink-sql-connector-kafka_${scala.binary.version
> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
>
> [1] 中的话是有SQL Client JAR 的下载链接,就是
> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
>
> 祝好
> Leonard Xu
>
> > 在 2020年7月13日,14:42,王松  写道:
> >
> > @Leonard Xu,
> > 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> > 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > =
> >  
> >org.apache.flink
> >
> >
> flink-sql-connector-kafka_${scala.binary.version}
> >${flink.version}
> >
> >
> > 
> >
> >
> >
> 
> >
> >
> >
> >
> >
> 
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > =
> >
> > Leonard Xu  于2020年7月13日周一 下午1:39写道:
> >
> >> Hi, 王松
> >>
> >> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream  connector
> >> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
> >> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
> >> datastream  connector 同时引用是会冲突的,请根据你的需要使用。
> >>
> >>
> >> 祝好,
> >> Leonard Xu
> >> [1]
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >> <
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >>>
> >>>   
> >>>   org.apache.flink
> >>>
> >>> flink-connector-kafka_${scala.binary.version}
> >>>   ${flink.version}
> >>>   
> >>>   
> >>>   org.apache.flink
> >>>
> >>>
> >>
> flink-connector-kafka-0.11_${scala.binary.version}
> >>>   ${flink.version}
> >>>   
> >>>   
> >>>   org.apache.flink
> >>>
> >>> flink-connector-kafka_${scala.binary.version}
> >>>   ${flink.version}
> >>>   
> >>>   
> >>>   org.apache.flink
> >>>   flink-core
> >>>   ${flink.version}
> >>>   
> >>> =
> >>
> >>
>
>


Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-13 文章
@Leonard Xu,
非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
=
  
org.apache.flink

flink-sql-connector-kafka_${scala.binary.version}
${flink.version}


 

















=

Leonard Xu  于2020年7月13日周一 下午1:39写道:

> Hi, 王松
>
> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream  connector
> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
> datastream  connector 同时引用是会冲突的,请根据你的需要使用。
>
>
> 祝好,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >
> >
> >org.apache.flink
> >
> > flink-connector-kafka_${scala.binary.version}
> >${flink.version}
> >
> >
> >org.apache.flink
> >
> >
> flink-connector-kafka-0.11_${scala.binary.version}
> >${flink.version}
> >
> >
> >org.apache.flink
> >
> > flink-connector-kafka_${scala.binary.version}
> >${flink.version}
> >
> >
> >org.apache.flink
> >flink-core
> >${flink.version}
> >
> > =
>
>


flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-12 文章
各位好,写了个demo,代码如下,在本地跑没有问题,提交到yarn session上报错:
Caused by: org.apache.flink.table.api.ValidationException: Could not find
any factory for identifier 'kafka' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
classpath.
请问是什么原因导致的呢?

代码如下:

-
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env,
settings);

tenv.executeSql("CREATE TABLE test_table (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT,\n" +
" create_at TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'test_json',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'latest-offset'\n" +
")");
Table table = tenv.sqlQuery("select * from test_table");
tenv.toRetractStream(table, Row.class).print();
env.execute("flink 1.11.0 demo");
-

pom 文件如下:
=

2.11
1.11.0



org.apache.flink

flink-table-planner-blink_${scala.binary.version}
${flink.version}


org.apache.flink

flink-table-runtime-blink_${scala.binary.version}
${flink.version}


org.apache.flink
flink-json
${flink.version}


org.apache.flink

flink-connector-kafka_${scala.binary.version}
${flink.version}


org.apache.flink

flink-connector-kafka-0.11_${scala.binary.version}
${flink.version}


org.apache.flink

flink-connector-kafka_${scala.binary.version}
${flink.version}


org.apache.flink
flink-core
${flink.version}


org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}


org.apache.flink
flink-table-common
${flink.version}


=


Re: flink基于yarn的HA次数无效,以及HA拉起的任务是否可以重用state

2020-07-01 文章
hi, muchen
1. yarn.application-attempts
这个参数与另外一个参数有关系:yarn.application-attempt-failures-validity-interval,大概意思是需要在设置的这个interval内失败重试多少次,才认为flink
job是失败的,如果超过这个interval,就会重新开始计数。打个比方,yarn.application-attempts:
2,yarn.application-attempt-failures-validity-interval =
1(默认值,10s),只有在10s内 flink job 失败重启2次才会真正的失败。
2. 如果配置了checkpoint是会重用上次任务失败的state。

这是我个人的理解,有疑问大家一起讨论

MuChen <9329...@qq.com> 于2020年7月1日周三 下午7:50写道:

> hi,all:
>
> 我根据这篇博客https://blog.csdn.net/cndotaci/article/details/106870413
> 的介绍,配置了flink基于yarn的高可用,测试时发现配置的任务失败重试2次没有生效,我测试到第6次时,任务仍然能够被yarn拉起。
>
> 请问各位大佬
>
> 1. 下面配置中的重试次数为什么没有生效?
>
> 2. 通过HA拉起的任务,是否可以重用上次任务失败时的state?
>
> flink版本:1.10.0
>
> flink-conf.yaml配置:
> $ grep -v ^# flink-conf.yaml |grep -v ^$ jobmanager.rpc.address: localhost
> jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m
> taskmanager.memory.process.size: 1568m taskmanager.numberOfTaskSlots: 1
> parallelism.default: 1 high-availability: zookeeper
> high-availability.storageDir: hdfs:///flink/ha/
> high-availability.zookeeper.quorum:
> uhadoop-op3raf-master1,uhadoop-op3raf-master2,uhadoop-op3raf-core1
> state.checkpoints.dir: hdfs:///flink/checkpoint state.savepoints.dir:
> hdfs:///flink/flink-savepoints state.checkpoints.num-retained:60
> state.backend.incremental: true jobmanager.execution.failover-strategy:
> region jobmanager.archive.fs.dir: hdfs:///flink/flink-jobs/
> historyserver.web.port: 8082 historyserver.archive.fs.dir:
> hdfs:///flink/flink-jobs/ historyserver.archive.fs.refresh-interval: 1
> # HA重试次数 yarn.application-attempts: 2
> ssh到jm节点,手动kill任务的操作日志:
> [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 16853
> YarnTaskExecutorRunner 17527 PrestoServer 33289 YarnTaskExecutorRunner
> 18026 YarnJobClusterEntrypoint 20283 Jps 39599 NodeManager
> [root@uhadoop-op3raf-task48 ~]# kill -9 18026 [root@uhadoop-op3raf-task48
> ~]# jps 34785 YarnTaskExecutorRunner 16853 -- process information
> unavailable 17527 PrestoServer 21383 Jps 33289 YarnTaskExecutorRunner 20412
> YarnJobClusterEntrypoint 39599 NodeManager [root@uhadoop-op3raf-task48
> ~]# kill -9 20412 [root@uhadoop-op3raf-task48 ~]# jps 34785
> YarnTaskExecutorRunner 21926 YarnJobClusterEntrypoint 23207 Jps 17527
> PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager
> [root@uhadoop-op3raf-task48 ~]# kill -9 21926 [root@uhadoop-op3raf-task48
> ~]# jps 34785 YarnTaskExecutorRunner 23318 YarnJobClusterEntrypoint 26279
> Jps 17527 PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager
> [root@uhadoop-op3raf-task48 ~]# kill -9 23318


Re: flink SQL如何将秒转换为timestamp

2020-06-30 文章
可以试试这样写:
TO_TIMESTAMP(FROM_UNIXTIME(itime, '-MM-dd HH:mm:ss'))

zilong xiao  于2020年6月30日周二 下午4:30写道:

> 有一个字段itime,类型为int,意为当前时间的秒值,如何将该字段转换成timestamp?以下是我的想法,不知是否正确,求遇到过类似问题的大佬指导
>
> TO_TIMESTAMP(DATE_FORMAT(CAST(itime * 1000 as TIMESTAMP(3)), '-MM-dd
> hh:mm:ss'))
>


Re: Re: Re:Re: flink sql job 提交到yarn上报错

2020-06-16 文章
那你在命令行执行:hadoop classpath,有hadoop的classpath输出吗?

Zhou Zach  于2020年6月16日周二 下午3:22写道:

>
>
>
>
>
>
> 在/etc/profile下,目前只加了
> export HADOOP_CLASSPATH=`hadoop classpath`
> 我是安装的CDH,没找到sbin这个文件。。
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-16 15:05:12,"王松"  写道:
> >你配置HADOOP_HOME和HADOOP_CLASSPATH这两个环境变量了吗?
> >
> >export HADOOP_HOME=/usr/local/hadoop-2.7.2
> >export HADOOP_CLASSPATH=`hadoop classpath`
> >export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
> >
> >Zhou Zach  于2020年6月16日周二 下午2:53写道:
> >
> >> flink/lib/下的jar:
> >> flink-connector-hive_2.11-1.10.0.jar
> >> flink-dist_2.11-1.10.0.jar
> >> flink-jdbc_2.11-1.10.0.jar
> >> flink-json-1.10.0.jar
> >> flink-shaded-hadoop-2-3.0.0-cdh6.3.0-7.0.jar
> >> flink-sql-connector-kafka_2.11-1.10.0.jar
> >> flink-table_2.11-1.10.0.jar
> >> flink-table-blink_2.11-1.10.0.jar
> >> hbase-client-2.1.0.jar
> >> hbase-common-2.1.0.jar
> >> hive-exec-2.1.1.jar
> >> mysql-connector-java-5.1.49.jar
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-06-16 14:48:43,"Zhou Zach"  写道:
> >> >
> >> >
> >> >
> >> >
> >> >high-availability.storageDir: hdfs:///flink/ha/
> >> >high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
> >> >state.backend: filesystem
> >> >state.checkpoints.dir:
> hdfs://nameservice1:8020//user/flink10/checkpoints
> >> >state.savepoints.dir: hdfs://nameservice1:8020//user/flink10/savepoints
> >> >high-availability.zookeeper.path.root: /flink
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >在 2020-06-16 14:44:02,"王松"  写道:
> >> >>你的配置文件中ha配置可以贴下吗
> >> >>
> >> >>Zhou Zach  于2020年6月16日周二 下午1:49写道:
> >> >>
> >> >>> org.apache.flink.runtime.entrypoint.ClusterEntrypointException:
> Failed
> >> to
> >> >>> initialize the cluster entrypoint YarnJobClusterEntrypoint.
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119)
> >> >>>
> >> >>> Caused by: java.io.IOException: Could not create FileSystem for
> highly
> >> >>> available storage path
> (hdfs:/flink/ha/application_1592215995564_0027)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(Clust

Re: flink1.11 小疑问(提升 DDL 易用性(动态 Table 属性))

2020-06-16 文章
6.14号的meetup中讲的动态 Table 属性很清楚,附个链接:
https://developer.aliyun.com/live/2894?accounttraceid=07deb589f50c4c1abbcbed103e534316qnxq
,大概在04:17:00左右。

Kurt Young  于2020年6月16日周二 下午12:12写道:

> table hint的语法是紧跟在你query中访问某张表的时候,所以我理解并不会有 ”这个动态参数作用在哪张表“ 上的疑问吧?
>
> Best,
> Kurt
>
>
> On Tue, Jun 16, 2020 at 10:02 AM Yichao Yang <1048262...@qq.com> wrote:
>
> > Hi
> >
> >
> > 1.2版本将会有like字句的支持,参考[1],不过也是通过定义一张表的方式,而不是直接在query内指定。
> >
> >
> 个人理解在query内指定其实会涉及到很多因素,假设涉及到多张表的时候,涉及到同key属性时,你在query内指定的属性到底是赋予给哪张表的?这个其实是比较模糊的。
> >
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
> >
> >
> > Best,
> > Yichao Yang
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:"Kurt Young" > 发送时间:2020年6月16日(星期二) 上午9:53
> > 收件人:"user-zh" >
> > 主题:Re: flink1.11 小疑问(提升 DDL 易用性(动态 Table 属性))
> >
> >
> >
> > 就是你DDL定义表的时候的WITH参数,有时候有个别参数写的不对或者需要调整,可以在query里直接修改,而不用重新定义一张新表。
> >
> > Best,
> > Kurt
> >
> >
> > On Tue, Jun 16, 2020 at 9:49 AM kcz <573693...@qq.com wrote:
> >
> >  动态 Table 属性是指什么?可以举一个列子吗。
>


Re: Re:Re: flink sql job 提交到yarn上报错

2020-06-16 文章
你配置HADOOP_HOME和HADOOP_CLASSPATH这两个环境变量了吗?

export HADOOP_HOME=/usr/local/hadoop-2.7.2
export HADOOP_CLASSPATH=`hadoop classpath`
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH

Zhou Zach  于2020年6月16日周二 下午2:53写道:

> flink/lib/下的jar:
> flink-connector-hive_2.11-1.10.0.jar
> flink-dist_2.11-1.10.0.jar
> flink-jdbc_2.11-1.10.0.jar
> flink-json-1.10.0.jar
> flink-shaded-hadoop-2-3.0.0-cdh6.3.0-7.0.jar
> flink-sql-connector-kafka_2.11-1.10.0.jar
> flink-table_2.11-1.10.0.jar
> flink-table-blink_2.11-1.10.0.jar
> hbase-client-2.1.0.jar
> hbase-common-2.1.0.jar
> hive-exec-2.1.1.jar
> mysql-connector-java-5.1.49.jar
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-16 14:48:43,"Zhou Zach"  写道:
> >
> >
> >
> >
> >high-availability.storageDir: hdfs:///flink/ha/
> >high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
> >state.backend: filesystem
> >state.checkpoints.dir: hdfs://nameservice1:8020//user/flink10/checkpoints
> >state.savepoints.dir: hdfs://nameservice1:8020//user/flink10/savepoints
> >high-availability.zookeeper.path.root: /flink
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2020-06-16 14:44:02,"王松"  写道:
> >>你的配置文件中ha配置可以贴下吗
> >>
> >>Zhou Zach  于2020年6月16日周二 下午1:49写道:
> >>
> >>> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed
> to
> >>> initialize the cluster entrypoint YarnJobClusterEntrypoint.
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
> >>>
> >>> at
> >>>
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119)
> >>>
> >>> Caused by: java.io.IOException: Could not create FileSystem for highly
> >>> available storage path (hdfs:/flink/ha/application_1592215995564_0027)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
> >>>
> >>> at java.security.AccessController.doPrivileged(Native Method)
> >>>
> >>> at javax.security.auth.Subject.doAs(Subject.java:422)
> >>>
> >>> at
> >>>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
> >>>
> >>> ... 2 more
> >>>
> >>> Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> >>> Could not find a file system implementation for scheme 'hdfs'. The
> scheme
> >>> is not directly supported by Flink and no Hadoop file system to support
> >>> this scheme could be loaded.
> >>>
> >>> at
> >>>
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
> >>>
> >>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
> >>>
> >>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
> >>>
> >>> at
> >>&g

Re: flink sql job 提交到yarn上报错

2020-06-16 文章
你的配置文件中ha配置可以贴下吗

Zhou Zach  于2020年6月16日周二 下午1:49写道:

> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint YarnJobClusterEntrypoint.
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
>
> at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119)
>
> Caused by: java.io.IOException: Could not create FileSystem for highly
> available storage path (hdfs:/flink/ha/application_1592215995564_0027)
>
> at
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103)
>
> at
> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
>
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125)
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305)
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263)
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
>
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
>
> ... 2 more
>
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'hdfs'. The scheme
> is not directly supported by Flink and no Hadoop file system to support
> this scheme could be loaded.
>
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
>
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
>
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>
> at
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
>
> ... 13 more
>
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in
> the classpath, or some classes are missing from the classpath.
>
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:184)
>
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
>
> ... 16 more
>
> Caused by: java.lang.VerifyError: Bad return type
>
> Exception Details:
>
>   Location:
>
>
> org/apache/hadoop/hdfs/DFSClient.getQuotaUsage(Ljava/lang/String;)Lorg/apache/hadoop/fs/QuotaUsage;
> @160: areturn
>
>   Reason:
>
> Type 'org/apache/hadoop/fs/ContentSummary' (current frame, stack[0])
> is not assignable to 'org/apache/hadoop/fs/QuotaUsage' (from method
> signature)
>
>   Current Frame:
>
> bci: @160
>
> flags: { }
>
> locals: { 'org/apache/hadoop/hdfs/DFSClient', 'java/lang/String',
> 'org/apache/hadoop/ipc/RemoteException', 'java/io/IOException' }
> stack: { 'org/apache/hadoop/fs/ContentSummary' }
>
>
>
> 在本地intellij idea中可以正常运行,flink job上订阅kafka,sink到mysql和hbase,集群flink lib目录下,
>
>
>
>


flink 执行一段时间后报错:Requesting TaskManager's path for query services failed.

2020-06-15 文章
flink 版本:1.9.0

大家好,flink任务在执行一段时间后,出现报错:Requesting TaskManager's path for query services
failed,akka.pattern.AskTimeoutException,请问这个原因是什么呢?详细错误输出如下:

2020-06-15 02:12:07,434 WARN
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl -
Requesting TaskManager's path for query services failed.
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException:
Ask timed out on [Actor[akka://flink/user/dispatcher#-1311898795]] after
[1 ms]. Message of type
[org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical
reason for `AskTimeoutException` is that the recipient actor didn't send a
reply.
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871)
at akka.dispatch.OnComplete.internal(Future.scala:263)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#-1311898795]] after [1 ms]. Message
of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A
typical reason for `AskTimeoutException` is that the recipient actor didn't
send a reply.
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
... 9 more
2020-06-15 02:12:12,603 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Flat Map -> Filter
-> Map (2/32) (689c6e191be63afea5182581e2875d4d) switched from CANCELING to
CANCELED.
2020-06-15 02:12:12,604 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Flat Map -> Filter
-> Map (4/32) (d2f0fbf94e3d7cddc0f20080f94a1692) switched from CANCELING to
CANCELED.
2020-06-15 02:12:12,604 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Flat Map -> Filter
-> Map (3/32) (28c8833ebbc14377bc74e7a389b4220b) switched from CANCELING to
CANCELED.
2020-06-15 02:12:12,604 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Flat Map -> Filter
-> Map (5/32) (585854caaef14df77b084f92eb5fb4a2) switched from CANCELING to
CANCELED.
2020-06-15 02:12:12,604 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Flat Map -> Filter
-> Map (15/32) (fc699114e90473fda9c717b5e0d4b716) switched from CANCELING
to CANCELED.
2020-06-15 02:12:12,605 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Flat Map -> Filter
-> Map (22/32) (e3569b88e95af3d06e751ade5cdde34c) switched from CANCELING
to CANCELED.
2020-06-15 02:12:12,605 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Flat Map -> Filter
-> Map (31/32) (a653c6cbb83c04b6c3f36a3ba80e3979) switched from CANCELING
to CANCELED.
2020-06-15 02:12:12,605 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Filter ->
Map (1/6) (89af922e79d370e2df9fb4a9629ad100) switched from CANCELING to
CANCELED.
2020-06-15 02:12:12,606 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Filter ->
Map (2/6) (7006987f6159a65598fe11237490a666) switched from CANCELING to
CANCELED.
2020-06-15 02:12:12,606 INFO