Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 文章 Weihua Hu
Congratulations

Best,
Weihua


On Tue, Mar 19, 2024 at 10:56 AM Rodrigo Meneses  wrote:

> Congratulations
>
> On Mon, Mar 18, 2024 at 7:43 PM Yu Chen  wrote:
>
> > Congratulations!
> > Thanks to release managers and everyone involved!
> >
> > Best,
> > Yu Chen
> >
> >
> > > 2024年3月19日 01:01,Jeyhun Karimov  写道:
> > >
> > > Congrats!
> > > Thanks to release managers and everyone involved.
> > >
> > > Regards,
> > > Jeyhun
> > >
> > > On Mon, Mar 18, 2024 at 9:25 AM Lincoln Lee 
> > wrote:
> > >
> > >> The Apache Flink community is very happy to announce the release of
> > Apache
> > >> Flink 1.19.0, which is the fisrt release for the Apache Flink 1.19
> > series.
> > >>
> > >> Apache Flink® is an open-source stream processing framework for
> > >> distributed, high-performing, always-available, and accurate data
> > streaming
> > >> applications.
> > >>
> > >> The release is available for download at:
> > >> https://flink.apache.org/downloads.html
> > >>
> > >> Please check out the release blog post for an overview of the
> > improvements
> > >> for this bugfix release:
> > >>
> > >>
> >
> https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
> > >>
> > >> The full release notes are available in Jira:
> > >>
> > >>
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
> > >>
> > >> We would like to thank all contributors of the Apache Flink community
> > who
> > >> made this release possible!
> > >>
> > >>
> > >> Best,
> > >> Yun, Jing, Martijn and Lincoln
> > >>
> >
> >
>


Re: K8s Appliaction模式无法支持flinkjar中Java动态编译?

2023-08-23 文章 Weihua Hu
Hi,

抱歉我对 JavaCompiler 不是非常了解,我想知道这些动态编译是运行在 UserJar 的 main 方法中吗?以及编译的产物是怎么传递给
Flink 的?

Best,
Weihua


On Tue, Aug 22, 2023 at 5:12 PM 周坤 <18679131...@163.com> wrote:

> 你好!
>
> 有一个关于flink K8S apllication模式运行的问题需要解答下;
>
> 原本又yarn per模式运行的flink需要切换到K8s apllication模式;
>
>
>
>
> 目前公司环境提供了一个通用的基础flink1.13镜像包;
>
> usrJar:自己实现一个flink任务, 该任务存在使用 javax.tools.JavaCompiler
> 动态加载数据库的java类,进行动态编译加载运行;
>
> 目前在切换运行的时候会报 需要动态编译的类的依赖找不到;
>
>
>
>
> 需要动态编译的class文件的依赖在 usrJar中都是存在的,但是启动却包找不到依赖的类;
> 开始以为是 flink类加载机制导致:  classloader.resolve-order: parent-first , 增加该配置也无效;
> 后来发现将需要编译的类依赖放入到lib, 可以执行通过,但是如此违背了动态编译的初衷;
>
> 对此我感到很困惑? 是什么原因导致,期待能有回复。
>
>


Re: flink-job-history 任务太多页面卡死

2023-07-27 文章 Weihua Hu
Hi

Flink UI 需要加载所有的 Job 信息并在 UI 渲染,在作业比较多的时候很容易导致 UI 卡死。
不只在这个页面,在一些并发比较大的任务上打开 subtask 页面也很容易导致UI 卡死。

Flink UI 需要一个分页的功能来减少数据加载和 UI 渲染的压力

Best,
Weihua


On Fri, Jul 28, 2023 at 11:29 AM Shammon FY  wrote:

> Hi,
>
>
> 可以通过配置`jobstore.max-capacity`和`jobstore.expiration-time`控制保存的任务数,具体参数可以参考[1]
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full-jobmanager-options
>
> Best,
> Shammon FY
>
> On Fri, Jul 28, 2023 at 10:17 AM 阿华田  wrote:
>
> > 目前flink-job-history
> > 已经收录5000+任务,当点击全部任务查看时,job-history就会卡死无法访问,各位大佬有什么好的解决方式?
> > | |
> > 阿华田
> > |
> > |
> > a15733178...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
>


Re: (无主题)

2023-06-13 文章 Weihua Hu
>
> 这个状态变量是否需要用transient来修饰

ValueState 再 Rich fuction 的 open 方法中被初始化,不应该被序列化和反序列化,建议使用 transient 来修饰。
但实际上自定义函数的序列化、反序列化只在任务部署阶段执行,而且初始状态下 ValueState 的值是 null,所以不使用 transient
关键字也不会有太大的影响。

以及什么情况下flink代码中需要用transient来修饰变量,什么情况下不用transient来修饰

理解自定义函数的序列化、反序列化是在任务部署阶段执行之后,这个问题就比较好回答了。 如果你的变量在是函数的 open 方法内初始化的,那应该增加
transient 关键字来表明该字段不需要参与序列化


Best,
Weihua


On Tue, Jun 13, 2023 at 1:10 PM Paul <18751805...@163.com> wrote:

> 在flink处理函数中定义一个状态变量,比如private ValueState
> vs;这个状态变量是否需要用transient来修饰,为什么呢?以及什么情况下flink代码中需要用transient来修饰变量,什么情况下不用transient来修饰?请大家指教
>
>
>


Re: flink 输出异常数据

2023-05-29 文章 Weihua Hu
Hi,

你使用的数据源是什么呢?Kafka 吗?用的是 FlinkSQL 还是 DataStream API 呢?

方便把异常栈贴一下吗

Best,
Weihua


On Mon, May 29, 2023 at 1:36 PM 小昌同学  wrote:

>
> 各位老师,我有一个作业运行很久了,但是最近源系统有一些脏数据导致作业运行失败,看yarn的日志报错是空指针,但是我现在想把那一条脏数据捕获到,请问一下有啥办法吗?谢谢各位老师的指导
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: Flink rocksDB疑似内存泄露,导致被Linux kernel killed

2023-04-21 文章 Weihua Hu
Hi,

你作业运行在 YARN 还是 Kubernetes 上?可以先关注下文档里的 Glibc 泄露问题

Best,
Weihua


On Fri, Apr 21, 2023 at 6:04 PM Guo Thompson  wrote:

> Flink
> Job是基于sql的,Flink版本为1.13.3,state用rocksDB存,发现会存在内存泄露的情况,作业运行一段时间后,会被linux内核kill掉,求助,如何解决?
> 网上
> http://www.whitewood.me/2021/01/02/%E8%AF%A6%E8%A7%A3-Flink-%E5%AE%B9%E5%99%A8%E5%8C%96%E7%8E%AF%E5%A2%83%E4%B8%8B%E7%9A%84-OOM-Killed/
> 讲很可能就是rocksDB的内存没法回收导致。
>
> 1、分配 tm的30G内存,jvm堆内的远远没有使用完。
> [image: 8f47b109-a04b-4bc1-8f64-fed21c58838d.jpeg]
> 2、从linux上查看内存使用,实际使用内存 44.4G,远远超出设置的30G
> [image: image.png]
> 3、dump下tm的jvm内存,实际不到2G(dump会触发full gc)
> [image: image.png]
>


Re: kafka实例重启对flink作业的影响

2023-04-20 文章 Weihua Hu
Flink kafka connector 支持自动发现 partition,可以参考官方文档[1]

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/kafka/#dynamic-partition-discovery


Best,
Weihua


On Thu, Apr 20, 2023 at 3:43 PM casel.chen  wrote:

>
> 实际工作中会遇到kafka版本升级或者kafka扩容(横向或纵向),数据重平衡等情况,想问一下发生这些情况下对线上运行的flink作业会有什么影响?flink作业能感知topic分区发生变化吗?要如何应对以减少对flink作业消费端的影响?


Re: 流数据转化为json

2023-04-17 文章 Weihua Hu
Hi,

你使用的那个 Flink 版本,建议直接参考 Flink 官方 kafka connector 文档[1]。
转换为 Json 数据格式可以使用 flink-json  format

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/formats/json/

Best,
Weihua


On Fri, Apr 14, 2023 at 7:17 PM 小昌同学  wrote:

> 你好,请问一下上游的数据是
> SingleOutputStreamOperator outPutInfoStream =
> keyedStream.process(new KeyStreamFunc());
> 数据样式为:InPutInfo[phone='123456',workId='001']
> 我想直接将这个流输入到kafka中,直接使用addsink算子,但是查看kafka日志发现,数据内容没有插入进来,想请教一下有没有什么解决方案;
> 我现在自己想着将流中的数据转换为json,但是我使用了gson以及fastjson都不行,请各位大佬指点
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: flink sink web ui显示为Sink: Unnamed

2023-04-14 文章 Weihua Hu
可以,在算子后可以通过 .name("xxx") 来命名

Best,
Weihua


On Fri, Apr 14, 2023 at 4:27 PM 小昌同学  wrote:

> 我将流式数据输出到mysql,查看flink 自带的web ui界面,有一个sink节点显示为Sink: Unnamed
> ,这个针对sink节点可以命名嘛
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: Re: PartitionNotFoundException

2023-04-09 文章 Weihua Hu
Hi,

可以提供下 JobManager 和相关 TaskManager 的日志吗?

一般来说 PartitionNotFoundException 只在作业启动建立链接的时候才会出现,
根据你的描述,应该是一个消费 Kafka 的流式任务,不太应该在运行一周后
出现 PartitionNotFoundException

可以检查下是否存在其他异常

Best,
Weihua


On Mon, Apr 10, 2023 at 9:51 AM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:

> taskmanager.network.tcp-connection.enable-reuse-across-jobs: false
> taskmanager.network.max-num-tcp-connections: 4
>
> 这两参数已经调整了的,connections  之前只是由1改为2   但运行一周后又出现了 PartitionNotFoundException
>
>
>
> From: Shammon FY
> Date: 2023-04-10 09:46
> To: user-zh
> Subject: Re: Re: PartitionNotFoundException
> 像上面提到的,流式作业可以设置taskmanager.network.tcp-connection.enable-reuse-across-jobs:
> false,一般作业影响不会有影响
>
> Best,
> Shammon FY
>
> On Mon, Apr 10, 2023 at 9:27 AM zhan...@eastcom-sw.com <
> zhan...@eastcom-sw.com> wrote:
>
> > hi, 上周调整这两参数后,正常运行了近一个星期后 又重现了[PartitionNotFoundException]...
> >
> > taskmanager.network.max-num-tcp-connections  只是调整为2,可能是太小了 今天我改为4 再看看
> > 或者 将flink版本升级到 1.17 是否可修复该问题?
> >
> > From: yidan zhao
> > Date: 2023-04-03 10:45
> > To: user-zh
> > Subject: Re: PartitionNotFoundException
> > 设置 taskmanager.network.tcp-connection.enable-reuse-across-jobs 为
> > false,设置 taskmanager.network.max-num-tcp-connections 大点。
> > 之前有个bug导致这个问题我记得,不知道1.16修复没有。
> >
> > zhan...@eastcom-sw.com  于2023年4月3日周一 10:08写道:
> > >
> > >
> > > hi, 最近从1.14升级到1.16后,kafka消费不定时会出现 [org.apache.flink.runtime.io
> .network.partition.PartitionNotFoundException:
> > Partition *** not found.]
> > > 然后不停自动重启job再继续抛出该异常后 不断重启,直到手动cancel任务后 再启动才恢复正常消费
> > >
> > > 在1.14集群中从未出现的问题,升到1.16后才出现,请问是否有配置可以优化或避免该异常?
> >
>


Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-27 文章 Weihua Hu
Congratulations!

Best,
Weihua


On Mon, Mar 27, 2023 at 9:02 PM yuxia  wrote:

> congratulations!
>
> Best regards,
> Yuxia
>
>
> 发件人: "Andrew Otto" 
> 收件人: "Matthias Pohl" 
> 抄送: "Jing Ge" , "Leonard Xu" , "Yu
> Li" , "dev" , "User" <
> u...@flink.apache.org>, "user-zh" 
> 发送时间: 星期一, 2023年 3 月 27日 下午 8:57:50
> 主题: Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache
> Paimon(incubating)
>
> Exciting!
>
> If this ends up working well, Wikimedia Foundation would love to try it
> out!
>
> On Mon, Mar 27, 2023 at 8:39 AM Matthias Pohl via user < [ mailto:
> u...@flink.apache.org | u...@flink.apache.org ] > wrote:
>
>
>
> Congratulations and good luck with pushing the project forward.
>
> On Mon, Mar 27, 2023 at 2:35 PM Jing Ge via user < [ mailto:
> u...@flink.apache.org | u...@flink.apache.org ] > wrote:
>
> BQ_BEGIN
>
> Congrats!
> Best regards,
> Jing
>
> On Mon, Mar 27, 2023 at 2:32 PM Leonard Xu < [ mailto:xbjt...@gmail.com |
> xbjt...@gmail.com ] > wrote:
>
> BQ_BEGIN
>
> Congratulations!
>
> Best,
> Leonard
>
>
> BQ_BEGIN
>
> On Mar 27, 2023, at 5:23 PM, Yu Li < [ mailto:car...@gmail.com |
> car...@gmail.com ] > wrote:
>
> Dear Flinkers,
>
>
>
>
> As you may have noticed, we are pleased to announce that Flink Table Store
> has joined the Apache Incubator as a separate project called Apache
> Paimon(incubating) [1] [2] [3]. The new project still aims at building a
> streaming data lake platform for high-speed data ingestion, change data
> tracking and efficient real-time analytics, with the vision of supporting a
> larger ecosystem and establishing a vibrant and neutral open source
> community.
>
>
>
>
> We would like to thank everyone for their great support and efforts for
> the Flink Table Store project, and warmly welcome everyone to join the
> development and activities of the new project. Apache Flink will continue
> to be one of the first-class citizens supported by Paimon, and we believe
> that the Flink and Paimon communities will maintain close cooperation.
>
>
>
>
> 亲爱的Flinkers,
>
>
>
>
> 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache 孵化器独立孵化 [1] [2]
> [3]。新项目的名字是 Apache
> Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。
>
>
>
>
>
> 在这里我们要感谢大家对 Flink Table Store 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink
> 将继续作为 Paimon 支持的主力计算引擎之一,我们也相信 Flink 和 Paimon 社区将继续保持密切合作。
>
>
>
>
> Best Regards,
> Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)
>
> 致礼,
> 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)
>
>
>
>
> [1] [ https://paimon.apache.org/ | https://paimon.apache.org/ ]
> [2] [ https://github.com/apache/incubator-paimon |
> https://github.com/apache/incubator-paimon ]
> [3] [ https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal
> | https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal ]
>
>
>
>
>
> BQ_END
>
>
> BQ_END
>
>
> BQ_END
>
>
>


Re: flink sql作业监控指标operator name和task name超长导致prometheus OOM问题

2023-03-24 文章 Weihua Hu
Hi,

现在不会过滤指标,可以尝试修改 PrometheusReporter 将不需要的 label 过滤掉

https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java#L104
需要注意这里的 key 的格式是   这种

Best,
Weihua


On Fri, Mar 24, 2023 at 2:47 PM casel.chen  wrote:

> 使用prometheus监控flink
> sql作业,发现没一会儿工夫就将prometheus内存(30GB)占满了,查了一下是因为作业指标名称过长导致的,像flink
> sql作业这种operator name和task name默认是根据sql内容拼装的,一旦sql出现的列名很多就会导致指标名称过长。
> 请问这种情况Flink社区有什么建议?prometheus抓取的时候能够过滤掉吗?只保留operator_id和task_id。
> 要是自己想将现有拼装名称修改成哈希值的话应该改哪个类呢?谢谢!


Re: 我上报的一个sql bug没人处理怎么办?

2023-03-21 文章 Weihua Hu
Hi

我使用同样的 SQL 没有复现该问题,你可以提供下复现的办法吗?

Best,
Weihua


On Wed, Mar 22, 2023 at 10:28 AM Jeff  wrote:

> bug地址:
> https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
>
>
> bug详细内容:
> the values of map are truncated by the CASE WHEN function.
> // sql
> create table test (a map) with ('connector'='print');
> insert into test  select * from (values(case when true then
> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
> end));
>
> the result:
>
> +I[{test=123}]
>
> We hope the value of result is '123456789', but I get '123', the length is
> limited by 'abc'.


Re: Flink实时计算平台在k8s上以Application模式启动作业如何实时同步作业状态到平台?

2023-03-21 文章 Weihua Hu
Hi

我们内部最初版本是通过 cluster-id 来唯一标识一个 application,同时认为流式任务是长时间运行的,不应该主动退出。如果该
cluster-id 在 Kubernetes 中查询不到,说明作业已经异常退出了,此时标记作业为异常。后续我们开发了特殊的 operator +
crd 来管理 pod 声明周期,防止 pod 快速退出。

另外,作业状态还可以通过启用 history server[1] 来查看

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/advanced/historyserver/

Best,
Weihua


On Wed, Mar 22, 2023 at 9:42 AM Shammon FY  wrote:

> Hi
>
> 你可以在你的提交平台启动后台任务定去向k8s查询作业状态,Flink也在设计支持作业状态汇报[1],目前正在讨论中
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Status+Listener
>
> Best,
> Shammon FY
>
>
> On Wed, Mar 22, 2023 at 8:54 AM casel.chen  wrote:
>
> >
> Flink实时计算平台在k8s上以Application模式启动作业如何实时同步作业状态到平台?作业一旦crash失败就会被k8s回收到相关的pod,没法通过web
> > url去获取作业状态,有什么别的办法吗?通过metrics? 如果是的话具体是哪一个metric值呢?
>


Re: 无法设置任务名

2023-03-15 文章 Weihua Hu
Hi,

UI 显示的任务名是什么呢?

Best,
Weihua


On Wed, Mar 15, 2023 at 8:02 PM wei_yuze  wrote:

> 您好!
>
>
>
>
> 我在使用flink1.16.0。在通过这个方式设置了任务名:
> streamExecutionEnvironment.execute("jobName")
> 但是web UI 中并不显示出设置的用户名。请问哪位大佬能答疑一下,感谢!


Re: flink k8s 部署启动报错

2023-03-13 文章 Weihua Hu
_DIRTY.json

看下以这个结尾的文件,内容应该是一个 json,如果不是标准 json 说明数据已经异常了,可以尝试删除



Best,
Weihua


On Tue, Mar 14, 2023 at 11:23 AM Jason_H  wrote:

> 您好,
> 我找到了我的ha目录,请教一下,怎么确定哪些数据是脏数据,可以允许删除的,这个有什么办法可以确定吗,我看到的都是些系统数据
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |
> ---- 回复的原邮件 
> | 发件人 | Weihua Hu |
> | 发送日期 | 2023年3月14日 10:39 |
> | 收件人 |  |
> | 主题 | Re: flink k8s 部署启动报错 |
> Hi,
>
> 看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。
> 可以参考文档[1],检查相关的 HA 路径,清理下异常数据
>
> 另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗?
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path
>
> Best,
> Weihua
>
>
> On Tue, Mar 14, 2023 at 9:58 AM Jason_H  wrote:
>
> hi,大家好
> 请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults
> of globally-terminated jobs from JobResultStore
> at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
> Source) ~[?:?]
> at
> java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
> [?:?]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
> Source) [?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source) [?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source) [?:?]
> at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve
> JobResults of globally-terminated jobs from JobResultStore
> at
>
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
> ~[flink-dist-1.15.2.jar:1.15.2]
> ... 4 more
> Caused by:
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input
> at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream);
> line: 1, column: 0]
> at
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
> ~[flink-dist-1.15.2.jar:1.15.2]
> ... 4 more
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |
>


Re: flink k8s 部署启动报错

2023-03-13 文章 Weihua Hu
Hi,

看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。
可以参考文档[1],检查相关的 HA 路径,清理下异常数据

另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗?

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path

Best,
Weihua


On Tue, Mar 14, 2023 at 9:58 AM Jason_H  wrote:

> hi,大家好
> 请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults
> of globally-terminated jobs from JobResultStore
> at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
> Source) ~[?:?]
> at
> java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
> [?:?]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
> Source) [?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source) [?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source) [?:?]
> at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve
> JobResults of globally-terminated jobs from JobResultStore
> at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
> ~[flink-dist-1.15.2.jar:1.15.2]
> ... 4 more
> Caused by:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input
>  at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream);
> line: 1, column: 0]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
> ~[flink-dist-1.15.2.jar:1.15.2]
> ... 4 more
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |


Re: Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 文章 Weihua Hu
图片看不到,可以找一个图床上传图片,在邮件列表中贴一下链接。

YARN 拉起 AM 还受 "yarn.application-attempt-failures-validity-interval"[1]
控制,在这个时间内达到指定次数才会退出。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#yarn-application-attempt-failures-validity-interval

Best,
Weihua


On Mon, Mar 13, 2023 at 4:27 PM guanyq  wrote:

> 图片在附件
> 但是实际却是超过了10次。。
>
>
>
>
>
>
> 在 2023-03-13 15:39:39,"Weihua Hu"  写道:
> >Hi,
> >
> >图片看不到了
> >
> >按照这个配置,YARN 应该只会拉起 10 次 JobManager。
> >
> >Best,
> >Weihua
> >
> >
> >On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:
> >
> >> flink1.10版本,flink配置如下
> >> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
> >> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
> >> 请问appattempt_1678102326043_0006_000409
> >> <http://192.168.63.12:8088/cluster/appattempt/appattempt_1678102326043_0006_000409>
> >> 每个序号不是代表一次尝试么
> >>
>
>


Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 文章 Weihua Hu
Hi,

图片看不到了

按照这个配置,YARN 应该只会拉起 10 次 JobManager。

Best,
Weihua


On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:

> flink1.10版本,flink配置如下
> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
> 请问appattempt_1678102326043_0006_000409
> 
> 每个序号不是代表一次尝试么
>


Re: Re: Re: flink on yarn 异常停电问题咨询

2023-03-09 文章 Weihua Hu
Hi

一般来说只是 YARN 集群异常停电不会影响已经完成的历史 Checkpoint(最后一次 Checkpoint 可能会写 hdfs 异常)

有更详细的 JobManager 日志吗?可以先确认下 Flink 在恢复时检索到了多少个 completedCheckpoint
以及最终尝试从哪一次 cp 恢复的。

也可以尝试按照 Yanfei 所说指定历史的 cp 作为 savepoint 恢复


Best,
Weihua


On Fri, Mar 10, 2023 at 10:38 AM guanyq  wrote:

> 没有开启增量chk
> 文件损坏是看了启动日志,启动日志尝试从10个chk启动,但是都因为以下块损坏启动失败了
> 错误日志为:
>
> java.io.IOException: Got error, status message opReadBlock
> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
> received exception
> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
> The meta file length 0 is less than the expected length 7, for
> OP_READ_BLOCK, self=/ip:45534, remote=/ip:9866,
> for file
> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
> for pool BP-1003103929-192.168.200.11-1668473836936 block
> 1301252639_227512278
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
> at
> org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
> at
> org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-10 10:26:11,"Yanfei Lei"  写道:
> >Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定
> chk重启[1]。
> >
> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>
> >请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。
> >
> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
> >是观察到checkpoint dir下面没有文件吗?
> >
> >[1]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints
> >
> >guanyq  于2023年3月10日周五 08:58写道:
> >>
> >> 目前也想着用savepoint处理异常停电的问题
> >> 但是我这面还有个疑问:
> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
> >> 就很奇怪是不是10个checkpoint都没落盘导致的。
> >> 想问下:
> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2023-03-10 08:47:11,"Shammon FY"  写道:
> >> >Hi
> >> >
> >> >我觉得Flink
> >>
> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
> >> >
> >> >Best,
> >> >Shammon
> >> >
> >> >On Thu, Mar 9, 2023 at 10:06 PM guanyq  wrote:
> >> >
> >> >> 前提
> >> >> 1.flink配置了高可用
> >> >> 2.flink配置checkpoint数为10
> >> >> 3.yarn集群配置了任务恢复
> >> >> 疑问
> >> >> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
> >> >>
> >> >>
> >> >>
> >> >>
> >
> >
> >
> >--
> >Best,
> >Yanfei
>


Re: Flink异步Hbase导致Too many open files异常

2023-03-08 文章 Weihua Hu
Hi,

通过代码看作业在Failover 时的确会有 HBaseClient 的资源泄露。

在 HbaseDimensionAsyncFunc 中重写一下 close 方法,释放掉 HBaseClient。

Best,
Weihua


On Wed, Mar 8, 2023 at 4:19 PM aiden <18765295...@163.com> wrote:

> Hi
>   我在使用Async Hbase时频繁遇到too many open file异常,程序自动重启后会立即报错,具体报错日志如下:
> 2023-03-08 16:15:39
> org.jboss.netty.channel.ChannelException: Failed to create a selector.
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.(AbstractNioSelector.java:100)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.(AbstractNioWorker.java:52)
> at org.jboss.netty.channel.socket.nio.NioWorker.(NioWorker.java:45)
> at
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
> at
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81)
> at
> org.jboss.netty.channel.socket.nio.NioWorkerPool.(NioWorkerPool.java:39)
> at org.hbase.async.HBaseClient.defaultChannelFactory(HBaseClient.java:707)
> at org.hbase.async.HBaseClient.(HBaseClient.java:507)
> at org.hbase.async.HBaseClient.(HBaseClient.java:496)
> at
> com.topgame.function.HbaseDimTrackerAsyncFunc.open(HbaseDimTrackerAsyncFunc.java:37)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:214)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Too many open files
> at sun.nio.ch.IOUtil.makePipe(Native Method)
> at sun.nio.ch.EPollSelectorImpl.(EPollSelectorImpl.java:65)
> at sun.nio.ch
> .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> at java.nio.channels.Selector.open(Selector.java:227)
> at
> org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341)
> ... 25 more
>   对当前程序使用文件描述符数量进行监控,发现当程序抛出如下错误自动重启后,程序使用文件描述符数量激增。错误日志如下
> java.io.IOException: Could not perform checkpoint 5 for operator async
> wait operator (2/9)#0.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> at
> 

Re: flink on K8S(operator) 如何获取 Accumulator

2023-03-06 文章 Weihua Hu
Hi,

按照你的描述,我猜测你使用的是 Application 模式吧?这种模式下 user code 会在 JobManager 侧执行,Job
执行结束后会直接 shutdown cluster。

可以尝试使用 session mode[1] 部署 cluster

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#session-mode

Best,
Weihua


On Mon, Mar 6, 2023 at 8:54 PM wangwei  wrote:

>
> Hi,大佬们
>
> 如何在任务结束后获取Accumulator 数据?
> 参考代码:(但是无法获取)
> ableResult execute = statementSet.execute();
> Optional jobClient = execute.getJobClient();
> jobClient.get().getAccumulators().get()
>
> PS: 最初的需求是: 对任务同步的数据量做统计。希望在批任务结束后,准确的获取Accumulator 中值,但是在K8S 中无法获取?
>
> 大佬求助!!先磕为敬
>


Re: Flink内存问题

2023-03-02 文章 Weihua Hu
Hi,

针对问题 2, 可以增加下列环境变量来排除 Glibc 的问题,详情可以参考[1]

containerized.master.env.MALLOC_ARENA_MAX: 1

containerized.taskmanager.env.MALLOC_ARENA_MAX: 1

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_trouble/

Best,
Weihua


On Thu, Mar 2, 2023 at 8:10 PM 吴先生 <15951914...@163.com> wrote:

> Hi,
> 目前分析问题应该在堆外,大概率是managed和overhead这两部分,这两部分的内存分配比例都是默认配置,通过网上的相关资料来看有两种解决方案:
> 1、调大managed和overhead这两块的内存比例,
> 问题:调整多大合适?是否调整之后还会持续增长
> 2、还有另一种说法是glibc内存分配器有个64M的问题引起(这里可有深入研究),替换为jemalloc可避免
> 问题:有具体的知道方案吗
>
>
> | |
> 吴先生
> |
> |
> 15951914...@163.com
> |
>  回复的原邮件 
> | 发件人 | Shammon FY |
> | 发送日期 | 2023年3月2日 19:24 |
> | 收件人 |  |
> | 主题 | Re: Flink内存问题 |
> Hi
>
>
> 如果有搜集metrics,可以根据metrics查看一下是哪部分内存上涨导致container被kill掉;然后将上涨比较快的container内存dump一下,查看具体是哪些对象占用内存比较多
>
> Best,
> Shammon
>
>
> On Thu, Mar 2, 2023 at 7:14 PM 吴先生 <15951914...@163.com> wrote:
>
> Hi,
> Flink版本:1.12
> 部署模式:on yarn per-job
> 开发方式:DataStream Api
> 状态后端:RocksDB
> Job逻辑为一个15分钟的窗口计算,任务在运行一段时间后会出现内存使用超限,container被yarn
> kill的现象,目前有不少任务都会存在类似问题:
> Closing TaskExecutor connection container_e02_1654567136606_1034_01_12
> because: [2023-03-02 08:12:44.794]Container
> [pid=11455,containerID=container_e02_1654567136606_1034_01_12] is
> running 745472B beyond the 'PHYSICAL' memory limit. Current usage: 8.0 GB
> of 8 GB physical memory used; 10.0 GB of 40 GB virtual memory used. Killing
> container.
> 请问:
> 该如何排查及优化
>
>
> | |
> 吴先生
> |
> |
> 15951914...@163.com
> |
>


Re: 退订

2023-02-27 文章 Weihua Hu
退订请发送邮件到 user-zh-unsubscr...@flink.apache.org

Best,
Weihua


On Tue, Feb 28, 2023 at 12:13 AM zhangjunjie 
wrote:

> 退订
>
>
>


Re: 【Windowing TVF】 GROUP BY window_start, window_end 没有输出

2023-02-27 文章 Weihua Hu
Hi,
可以详细描述下你的使用 case 吗?用的 SQL 语句是什么样子的


Best,
Weihua


On Mon, Feb 27, 2023 at 12:51 PM wei_yuze  wrote:

> 您好!
>
>
>
>
> 我在使用Windowing table-valued functions (Windowing TVFs) 的时候,GROUP BY 中一旦加上
> window_start, window_end 就没有输出,但也不报错。请问有哪位大佬知道是什么原因吗?
>
> Lucas


Re: flink taskmanger重启失败的问题

2023-02-23 文章 Weihua Hu
从 region 改为 full 会扩容单个 Task 故障的影响范围,可以参考社区文档:
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/task_failure_recovery/

Best,
Weihua


On Fri, Feb 24, 2023 at 2:12 PM 唐世伟  wrote:

> 谢谢回复,我看日志已经超出来yarn保存的期限被删了。另外Failover从region改为full。是不是能避免这个问题啊?
>
> > 2023年2月23日 上午11:36,Weihua Hu  写道:
> >
> > Hi,
> >
> > 在 Cancel 其他 task 时会先将 task 状态置为 cancelling,这时 task 失败是不会二次触发 Failover 的。
> > 可以检查下是不是作业划分了多个 region,多个 region 的异常是统一计数的。
> >
> > 或者可以贴一下日志吗?
> >
> >
> > Best,
> > Weihua
> >
> >
> > On Thu, Feb 23, 2023 at 11:16 AM 唐世伟  wrote:
> >
> >> 我们有一个flink任务,同时写10几张doris表,每次doris出问题的时候任务就挂,flink的重启策略没有效果。
> >> flink的重启配置入下:
> >> restart-strategy: failure-rate
> >> restart-strategy.failure-rate.delay: 60 s
> >> restart-strategy.failure-rate.failure-rate-interval: 10 min
> >> restart-strategy.failure-rate.max-failures-per-interval: 3
> >>
> >> 这边看了一下任务日志逻辑,发现任务写doris失败的时候,进入了重启流程,然后尝试cancel其他的operator。而每次cancel
> >>
> operator的时候都会触发当前operator的checkpoint。但是由于存在其他大量写doris表的算子。在执行checkpoint都会尝试flush数据到doris,导致再次报错calcel失败。而每次失败都会计入尝试重启次数,最后导致超过重启上限次数,任务直接挂了。请问这个是不是不太合理?理论上说,执行失败就失败了,没必要计入重启失败次数。最后导致重启失败。这个有办法调整吗?
>
>


Re: flink taskmanger重启失败的问题

2023-02-22 文章 Weihua Hu
Hi,

在 Cancel 其他 task 时会先将 task 状态置为 cancelling,这时 task 失败是不会二次触发 Failover 的。
可以检查下是不是作业划分了多个 region,多个 region 的异常是统一计数的。

或者可以贴一下日志吗?


Best,
Weihua


On Thu, Feb 23, 2023 at 11:16 AM 唐世伟  wrote:

> 我们有一个flink任务,同时写10几张doris表,每次doris出问题的时候任务就挂,flink的重启策略没有效果。
> flink的重启配置入下:
> restart-strategy: failure-rate
> restart-strategy.failure-rate.delay: 60 s
> restart-strategy.failure-rate.failure-rate-interval: 10 min
> restart-strategy.failure-rate.max-failures-per-interval: 3
>
> 这边看了一下任务日志逻辑,发现任务写doris失败的时候,进入了重启流程,然后尝试cancel其他的operator。而每次cancel
> operator的时候都会触发当前operator的checkpoint。但是由于存在其他大量写doris表的算子。在执行checkpoint都会尝试flush数据到doris,导致再次报错calcel失败。而每次失败都会计入尝试重启次数,最后导致超过重启上限次数,任务直接挂了。请问这个是不是不太合理?理论上说,执行失败就失败了,没必要计入重启失败次数。最后导致重启失败。这个有办法调整吗?


Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-21 文章 Weihua Hu
如果想保证每次写入 mysql 的事件是最新的,需要在 Flink 内部针对事件时间排序取 TOP 1, 可以参考[1]。 但是需要注意这需要使用
state,你可以需要指定合适的 TTL[2] 来保证 state 不会过大

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/topn/
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-state-ttl

Best,
Weihua


On Wed, Feb 22, 2023 at 10:23 AM casel.chen  wrote:

>
> 如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?
>
>
> 在 2023-02-20 09:50:50,"Shengkai Fang"  写道:
> >我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
> >
> >Best,
> >Shengkai
> >
> >[1]
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
> >
> >Shammon FY  于2023年2月20日周一 08:41写道:
> >
> >> Hi
> >>
> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
> >>
> >> Best,
> >> Shammon
> >>
> >>
> >> On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
> >>
> >> > Hi,
> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert
> into
> >> >
> >> >
> >> > Thanks
> >> >
> >> >
> >> >
> >> > 在 2023-02-17 15:56:51,"casel.chen"  写道:
> >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
> >> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
> >> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
> >> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
> >> > >
> >> > >
> >> > >请问:
> >> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
> >> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
> >> > >我理解flink
> >> >
> >>
> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
> >> > >
> >> >
> >>
>


Re: Whether Flink SQL window operations support "Allow Lateness and SideOutput"?

2023-02-21 文章 Weihua Hu
Hi,

Maybe you can use CURRENT_WATERMARK()[1]  to handle some late data.


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/


Best,
Weihua


On Tue, Feb 21, 2023 at 1:46 PM wang <24248...@163.com> wrote:

> Hi dear engineers,
>
>   One question as title: Whether Flink SQL window operations support
> "Allow Lateness and SideOutput"?
>
>   Just as supported in Datastream api (allowedLateness
> and sideOutputLateData) like:
>
> SingleOutputStreamOperator<> sumStream = dataStream.keyBy()
> .timeWindow()
>.
> allowedLateness(Time.minutes(1))
>.
> sideOutputLateData(outputTag)
>.sum();
>
> Thanks && Regards,
> Hunk
>
>


Re: 广播流与非广播流 数据先后问题

2023-02-20 文章 Weihua Hu
Hi,

可以看到你的问题,你需要订阅 user-zh 的邮件列表才能收到相关的回复,可以参考:
https://flink.apache.org/community.html

Best,
Weihua


On Tue, Feb 21, 2023 at 12:17 PM 知而不惑  wrote:

> 有收到我的问题吗
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
>
>  发送时间:2023年2月21日(星期二) 上午9:37
> 收件人:"user-zh"
> 主题:广播流与非广播流 数据先后问题
>
>
>
> 各位大佬好
> 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction
> 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt
> 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
> 以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
> @Override
> public void processElement(FileEventOuterClass.FileEvent value,
> BroadcastProcessFunction List FileEventOuterClass.FileEventgt;.ReadOnlyContext ctx,
> Collector  try {
>  ReadOnlyBroadcastState List ctx.getBroadcastState(ruleDescriptor);
>
>  List sensitiveRules = broadcastState.get(null);
>  if
> (CollectionUtils.isEmpty(sensitiveRules)) {
>  return;
>  }
>  
>  } catch (Exception e) {
> 
> log.error("SensitiveDataClassify err:", e);
>  }
> }
> public static void main(String[] args) throws Exception {
>  StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>  env.setParallelism(1);
>
>  MapStateDescriptor List  new
> MapStateDescriptor ListTypeInfo
>  // 广播流
>  BroadcastStream broadcast = sensitiveRule.broadcast(ruleDescriptor);
>
>  DataStreamSource env.socketTextStream("localhost", 11451);
> 
> SingleOutputStreamOperator localhost.map((MapFunction value -gt;
> FileEventOuterClass.FileEvent.newBuilder().setChannel("").build());
>
> 
> SingleOutputStreamOperator streamOperator = stream.connect(broadcast).process(new
> SensitiveDataClassify());
>  streamOperator.print("qqq");
>  env.execute();
>
> }


Re: 广播流与非广播流 数据先后问题

2023-02-20 文章 Weihua Hu
Hi,

可以把具体的报错信息贴一下,另外代码中没有看到使用 listState 缓存元素的部分

Best,
Weihua


On Tue, Feb 21, 2023 at 9:38 AM 知而不惑  wrote:

> 各位大佬好
> 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction
> 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt
> 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
> 以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
> @Override
> public void processElement(FileEventOuterClass.FileEvent value,
> BroadcastProcessFunction List ctx, Collector try {
> ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(ruleDescriptor);
>
> List if (CollectionUtils.isEmpty(sensitiveRules)) {
> return;
> }
> 
> } catch (Exception e) {
> log.error("SensitiveDataClassify err:", e);
> }
> }
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>
> MapStateDescriptor new MapStateDescriptor<("ruleBroadcastState", Types.VOID,
> new ListTypeInfo<(SensitiveRule.class));
>
> // 广播流
> BroadcastStream sensitiveRule.broadcast(ruleDescriptor);
>
> DataStreamSource env.socketTextStream("localhost", 11451);
> SingleOutputStreamOperator localhost.map((MapFunction -
> FileEventOuterClass.FileEvent.newBuilder().setChannel("").build());
>
> SingleOutputStreamOperator streamOperator = stream.connect(broadcast).process(new
> SensitiveDataClassify());
> streamOperator.print("qqq");
> env.execute();
>
> }


Re: Re: flink canal json格式忽略不识别的type

2023-02-19 文章 Weihua Hu
Hi,
可以尝试使用: json.ignore-parse-errors[1] 来忽略解析的报错,需要注意这个参数会忽略所有的解析错误

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/#json-ignore-parse-errors

Best,
Weihua


On Mon, Feb 20, 2023 at 10:14 AM casel.chen  wrote:

> 日志中就是报这个 "type":"INIT_DDL" 不能识别呀,然后作业就退出了
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-02-20 09:58:56,"Shengkai Fang"  写道:
> >Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗?
> >
> >Best,
> >Shengkai
> >
> >casel.chen  于2023年2月9日周四 12:03写道:
> >
> >> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
> >> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal
> >> json格式解析时直接忽略不识别的type,例如
> >> 例1:
> >>
> {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE
> >> TABLE `oms_parcels` (  `id` varchar(255) NOT NULL,  `createdby`
> >> varchar(255) DEFAULT NULL,  `createdat` timestamp NOT NULL DEFAULT
> >> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  `updatedat` timestamp
> NOT
> >> NULL DEFAULT '-00-00 00:00:00',  `updatedby` varchar(255) DEFAULT
> >> NULL,  `account` varchar(255) DEFAULT NULL,  `batch` varchar(255)
> DEFAULT
> >> NULL,  `client` varchar(255) DEFAULT NULL,  `command` varchar(255)
> DEFAULT
> >> NULL,  `container` varchar(255) DEFAULT NULL,  `items` mediumtext,
> >> `trackingnumber` varchar(255) NOT NULL,  `transporter` varchar(255)
> DEFAULT
> >> NULL,  `weight` decimal(19,2) NOT NULL,  `zipcode` varchar(255) DEFAULT
> >> NULL,  `ld3` varchar(255) DEFAULT NULL,  `destination_code` varchar(255)
> >> DEFAULT NULL,  PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB
> DEFAULT
> >> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}
> >>
> >>
> >> 例2:
> >> {
> >> "action":"ALTER",
> >> "before":[],
> >> "bid":0,
> >> "data":[],
> >> "db":"db_test",
> >> "dbValType":{
> >> "col1":"varchar(22)",
> >> "col2":"varchar(22)",
> >> "col_pk":"varchar(22)"
> >> },
> >> "ddl":true,
> >> "entryType":"ROWDATA",
> >> "execTs":1669789188000,
> >> "jdbcType":{
> >> "col1":12,
> >> "col2":12,
> >> "col_pk":12
> >> },
> >> "pks":[],
> >> "schema":"db_test",
> >> "sendTs":1669789189533,
> >> "sql":"alter table table_test add col2 varchar(22) null",
> >> "table":"table_test",
> >> "tableChanges":{
> >> "table":{
> >> "columns":[
> >> {
> >> "jdbcType":12, // jdbc 类型。
> >> "name":"col1",// 字段名称。
> >> "position":0,  // 字段的顺序。
> >> "typeExpression":"varchar(22)", // 类型描述。
> >> "typeName":"varchar" // 类型名称。
> >> },
> >> {
> >> "jdbcType":12,
> >> "name":"col2",
> >> "position":1,
> >> "typeExpression":"varchar(22)",
> >> "typeName":"varchar"
> >> },
> >> {
> >> "jdbcType":12,
> >> "name":"col_pk",
> >> "position":2,
> >> "typeExpression":"varchar(22)",
> >> "typeName":"varchar"
> >> }
> >> ],
> >> "primaryKeyColumnNames":["col_pk"] // 主键名列表。
> >> },
> >> "type":"ALTER"
> >> }
> >> }
>


Re: Re: Flink程序内存Dump不了

2023-02-19 文章 Weihua Hu
Hi,

可以把心跳超时(heartbeat.timeout)[1]也调大再尝试 dump 内存。


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-fault-tolerance-options

Best,
Weihua


On Mon, Feb 20, 2023 at 1:58 PM lxk  wrote:

> 我尝试调整了参数,具体数值如下
>
>
> akka.ask.timeout: 900s
>
>
>
> 但还是报同样的错
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-02-17 17:32:51,"Guo Thompson"  写道:
> >可能是jm 和 tm之间的心跳时间太短了, dump的过程会stop the world,tm就不响应jm的heartbeat了;
> >
> >lxk  于2023年2月14日周二 14:32写道:
> >
> >> Flink version:1.16
> >> java version: jdk1.8.0_251
> >> 问题:最近上线的Flink程序,频繁young
> >>
> gc,几秒一次,在调整了新生代大小之后,还是没有解决,目前整个jvm堆大小是3.57g。因此想通过程序内存情况来分析哪里问题有问题,我们通过yarn上的applicationId,使用ps
> >> -ef|grep 1666758697316_2639108,找到对应的pid,最后执行 jmap -dump:format
> >> b,file=user.dump 26326
> >>
> 命令生成dump文件,但我们测试了很多个程序,只要一开始dump,都会对线上程序产生影响,程序的container会莫名的死掉,然后程序重启。具体执行命令后的报错如下:
> >> sun.jvm.hotspot.debugger.UnmappedAddressException: 7f74efa5d410
> >> https://pic.imgdb.cn/item/63eb2a46f144a010071899ba.png
> >> 不知道大家有没有遇见这个问题,是我们使用的姿势不对,还是目前使用的版本有什么问题,希望大家能够给出一些建议和看法。
>


Re: Kafka 数据源无法实现基于事件时间的窗口聚合

2023-02-07 文章 Weihua Hu
Hi,

问题应该是 kafka source 配置了多并发运行,但数据量比较少(或者 topic 的 partition 数量小于 task
的并发数量),不是所有的 source task 都消费到了数据并产生 watermark,导致下游聚合算子无法对齐 watermark 触发计算。
可以尝试通过以下办法解决:
1. 将 source 并发控制为 1
2. 为 watermark 策略开始 idleness 处理,参考 [#1]

fromElement 数据源会强制指定并发为 1

[#1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources


Best,
Weihua


On Tue, Feb 7, 2023 at 1:31 PM wei_yuze  wrote:

> 您好!
>
>
>
>
> 我在进行基于事件时间的窗口聚合操作时,使用fromElement数据源可以实现,但替换为Kafka数据源就不行了,但程序并不报错。以下贴出代码。代码中给了两个数据源,分别命名为:streamSource
> 和 kafkaSource
> 。当使用streamSource生成watermarkedStream的时候,可以完成聚合计算并输出结果。但使用kafkaSource却不行。
>
>
>
>
> public class WindowReduceTest2 {  public static void
> main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>
> // 使用fromElement数据源
> DataStreamSource env.fromElements(
> new
> Event2("Alice", "./home", "2023-02-04 17:10:11"),
> new Event2("Bob",
> "./cart", "2023-02-04 17:10:12"),
> new
> Event2("Alice", "./home", "2023-02-04 17:10:13"),
> new
> Event2("Alice", "./home", "2023-02-04 17:10:15"),
> new Event2("Cary",
> "./home", "2023-02-04 17:10:16"),
> new Event2("Cary",
> "./home", "2023-02-04 17:10:16")
> );
>
>
> // 使用Kafka数据源
> JsonDeserializationSchema jsonFormat = new JsonDeserializationSchema<(Event2.class);
> KafkaSource KafkaSource.
> .setBootstrapServers(Config.KAFKA_BROKERS)
>
> .setTopics(Config.KAFKA_TOPIC)
>
> .setGroupId("my-group")
>
> .setStartingOffsets(OffsetsInitializer.earliest())
>
> .setValueOnlyDeserializer(jsonFormat)
> .build();
> DataStreamSource env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
> kafkaSource.print();
>
>
> // 生成watermark,从数据中提取时间作为事件时间
> SingleOutputStreamOperator watermarkedStream =
> kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.
> .withTimestampAssigner(new SerializableTimestampAssigner  
> @Override
>  
> public long extractTimestamp(Event2 element, long recordTimestamp) {
>  
>   SimpleDateFormat simpleDateFormat = new
> SimpleDateFormat("-MM-dd HH:mm:ss");
>  
>   Date date = null;
>  
>   try {
>  
> date =
> simpleDateFormat.parse(element.getTime());
>  
>   } catch (ParseException e) {
>  
> throw new RuntimeException(e);
>  
>   }
>  
>   long time = date.getTime();
>  
>   System.out.println(time);
>  
>   return time;
>   }
> }));
>
>
> // 窗口聚合
> watermarkedStream.map(new MapFunction Tuple2  
> @Override
>  
> public Tuple2  
>   // 将数据转换成二元组,方便计算
>  
>   return Tuple2.of(value.getUser(), 1L);
>   }
> })
> .keyBy(r -
> r.f0)
> // 设置滚动事件时间窗口
>
> .window(TumblingEventTimeWindows.of(Time.seconds(5)))
> .reduce(new
> ReduceFunction  
> @Override
>  
> public Tuple2 Tuple2  
>   // 定义累加规则,窗口闭合时,向下游发送累加结果
>  
>   return Tuple2.of(value1.f0, value1.f1 + value2.f1);
>   }
> })
> .print("Aggregated
> stream");
>
>
> env.execute();
>   }
> }
>
>
>
>
>
>
> 值得注意的是,若将代码中的 TumblingEventTimeWindows 替换为 TumblingProcessingTimeWindows
> ,即使使用 Kafka 数据源也是可以完成聚合计算并输出结果的。
>
>
>
> 感谢您花时间查看这个问题!
> Lucas


Re: idea构建flink源码失败

2023-02-06 文章 Weihua Hu
Hi,

Flink 1.15 版本之后已经不支持 JDK 8 了,可以尝试使用 jdk 11 编译,另外在 idea
编译具体是怎么操作的呢?可以尝试在命令行编译,如果有问题可以发一下更多的错误日志。

源码编译可以参考文档:
https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/

Best,
Weihua


On Sat, Feb 4, 2023 at 6:06 PM tiger <2372554...@qq.com.invalid> wrote:

> hi,
>
>
> 各位大佬好,在idea构建flink源码失败,吧几乎所有scala版本,sbt版本都下载下来,一一测试都失败了。
>
> 环境如下:
>
> 操作系统:Ubuntu22.04
>
> idea:2022.3.2
>
> jdk:
>
>   java version "1.8.0_191"
>  Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)
>
> scala:
>
> 2.12.0,2.12.14(都下载下来了.
> 2.12.1,2.12.2,2.12.3,2.12.4,2.13.3)
>
>
> sbt:
>
>  sbt-1.3.6 (  都下载下:sbt-1.1.4  sbt-1.2.0 , sbt-1.4.0 sbt-1.5.5
> sbt-1.6.1  sbt-1.7.2  sbt-1.8.1)
>
>
> mvn:
>
> 3.8.7,3.2.5
>
>
> 目前的异常是:
>
> scalac: Error: assertion failed:
>(class DataStream,iterate$default$2)
>   while compiling:
>
> /java-source/flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
>  during phase: typer
>   library version: version 2.12.14
>  compiler version: version 2.12.14
>reconstructed args: -nobootcp -classpath
>
> /jdk/development/jre/lib/charsets.jar:/jdk/development/jre/lib/deploy.jar:/jdk/development/jre/lib/ext/cldrdata.jar:/jdk/development/jre/lib/ext/dnsns.jar:/jdk/development/jre/lib/ext/jaccess.jar:/jdk/development/jre/lib/ext/jfxrt.jar:/jdk/development/jre/lib/ext/localedata.jar:/jdk/development/jre/lib/ext/nashorn.jar:/jdk/development/jre/lib/ext/sunec.jar:/jdk/development/jre/lib/ext/sunjce_provider.jar:/jdk/development/jre/lib/ext/sunpkcs11.jar:/jdk/development/jre/lib/ext/zipfs.jar:/jdk/development/jre/lib/javaws.jar:/jdk/development/jre/lib/jce.jar:/jdk/development/jre/lib/jfr.jar:/jdk/development/jre/lib/jfxswt.jar:/jdk/development/jre/lib/jsse.jar:/jdk/development/jre/lib/management-agent.jar:/jdk/development/jre/lib/plugin.jar:/jdk/development/jre/lib/resources.jar:/jdk/development/jre/lib/rt.jar:/java-source/flink/flink-streaming-scala/target/classes:/java-source/flink/flink-streaming-java/target/classes:/java-source/flink/flink-core/target/classes:/java-source/flink/flink-connectors/flink-file-sink-common/target/classes:/java-source/flink/flink-runtime/target/classes:/java-source/flink/flink-java/target/classes:/rely/maven/repository/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar:/rely/maven/repository/org/apache/flink/flink-shaded-guava/30.1.1-jre-15.0/flink-shaded-guava-30.1.1-jre-15.0.jar:/rely/maven/repository/org/apache/commons/commons-math3/3.6.1/commons-math3-3.6.1.jar:/java-source/flink/flink-scala/target/classes:/rely/maven/repository/org/apache/flink/flink-shaded-asm-9/9.2-15.0/flink-shaded-asm-9-9.2-15.0.jar:/rely/maven/repository/com/twitter/chill_2.12/0.7.6/chill_2.12-0.7.6.jar:/rely/maven/repository/org/scala-lang/scala-reflect/2.12.7/scala-reflect-2.12.7.jar:/rely/maven/repository/org/scala-lang/scala-library/2.12.7/scala-library-2.12.7.jar:/rely/maven/repository/org/scala-lang/scala-compiler/2.12.7/scala-compiler-2.12.7.jar:/rely/maven/repository/org/scala-lang/modules/scala-xml_2.12/1.0.6/scala-xml_2.12-1.0.6.jar:/java-source/flink/flink-annotations/target/classes:/java-source/flink/flink-metrics/flink-metrics-core/target/classes:/rely/maven/repository/org/apache/flink/flink-shaded-jackson/2.12.4-15.0/flink-shaded-jackson-2.12.4-15.0.jar:/rely/maven/repository/org/apache/commons/commons-lang3/3.3.2/commons-lang3-3.3.2.jar:/rely/maven/repository/com/esotericsoftware/kryo/kryo/2.24.0/kryo-2.24.0.jar:/rely/maven/repository/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/rely/maven/repository/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar:/rely/maven/repository/org/apache/commons/commons-compress/1.21/commons-compress-1.21.jar:/java-source/flink/flink-rpc/flink-rpc-core/target/classes:/java-source/flink/flink-rpc/flink-rpc-akka-loader/target/classes:/java-source/flink/flink-queryable-state/flink-queryable-state-client-java/target/classes:/java-source/flink/flink-filesystems/flink-hadoop-fs/target/classes:/rely/maven/repository/commons-io/commons-io/2.11.0/commons-io-2.11.0.jar:/rely/maven/repository/org/apache/flink/flink-shaded-netty/
> 4.1.70.
> Final-15.0/flink-shaded-netty-4.1.70.Final-15.0.jar:/rely/maven/repository/org/apache/flink/flink-shaded-zookeeper-3/3.5.9-15.0/flink-shaded-zookeeper-3-3.5.9-15.0.jar:/rely/maven/repository/commons-cli/commons-cli/1.5.0/commons-cli-1.5.0.jar:/rely/maven/repository/org/javassist/javassist/3.24.0-GA/javassist-3.24.0-GA.jar:/rely/maven/repository/org/xerial/snappy/snappy-java/
> 

Re: 如何监控flink sql on native k8s作业是否过度申请资源?

2023-01-16 文章 Weihua Hu
Hi, casel

Flink 本身会采集 JVM 层面的资源使用量,详情可以参考官方文档
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#cpu

除此之外,可以看下 Kubernetes 是否部署了相关的资源监控服务,可以从 Pod 层面查看资源使用率。

Best,
Weihua


On Tue, Jan 17, 2023 at 11:41 AM casel.chen  wrote:

> 我们flink
> sql作业跑在k8s上,但发现k8s集群整体资源使用率并不高,例如请求内存占总内存89.28%,但实际使用内存占总内存只有66.38%。
> 现在想排查出哪些作业过度申请资源,有什么办法或直接的metrics可以监控flink sql作业实现k8s资源使用率么?谢谢!


Re: flink-kubernetes-operator部署问题

2023-01-09 文章 Weihua Hu
Hi,

我看到你的作业将宿主机的 /tmp/flink 挂载到了容器内,并使用该路径作为 ha 的根路径。Flink 会在该目录下创建子目录存在 cp、ha
相关的数据。这个报错一般是对应的 ha 目录没有创建成功,建议检查下 /tmp/flink 的目录权限。

如果是这个问题,我想Flink 应该更早的暴露目录权限异常,而不是等待后续校验路径是否存在时报错。


Best,
Weihua


On Mon, Jan 9, 2023 at 5:17 PM 圣 万  wrote:

> 您好:
>
> 我最近在尝试使用flink-kubernetes-operator来部署flink,在官方Github项目中发现了一些example,我在部署其中一个样例时发生了错误,还请您帮忙解答下,感谢!
> 项目地址:flink-kubernetes-operator/examples at main ·
> apache/flink-kubernetes-operator (github.com)<
> https://github.com/apache/flink-kubernetes-operator/tree/main/examples>
> 所使用样例:basic-checkpoint-ha.yaml<
> https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-checkpoint-ha.yaml
> >
> 内容如下:
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: basic-checkpoint-ha-example
> spec:
>   image: flink:1.15
>   flinkVersion: v1_15
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "2"
> state.savepoints.dir: file:///flink-data/savepoints
> state.checkpoints.dir: file:///flink-data/checkpoints
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: file:///flink-data/ha
>   serviceAccount: flink
>   jobManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   taskManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   podTemplate:
> spec:
>   containers:
> - name: flink-main-container
>   volumeMounts:
>   - mountPath: /flink-data
> name: flink-volume
>   volumes:
>   - name: flink-volume
> hostPath:
>   # directory location on host
>   path: /tmp/flink
>   # this field is optional
>   type: Directory
>   job:
> jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
> parallelism: 2
> upgradeMode: savepoint
> state: running
> savepointTriggerNonce: 0
>
>
> 报错内容如下:
> 2023-01-05 18:51:12,176 INFO
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
> [] - Stopping SessionDispatcherLeaderProcess.
> 2023-01-05 18:51:12,185 INFO
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Stopping
> DefaultJobGraphStore.
> 2023-01-05 18:51:12,191 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
> error occurred in the cluster entrypoint.
> java.util.concurrent.CompletionException: java.lang.IllegalStateException:
> The base directory of the JobResultStore isn't accessible. No dirty
> JobResults can be restored.
>  at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> ~[?:1.8.0_352]
>  at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> [?:1.8.0_352]
>  at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
> [?:1.8.0_352]
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_352]
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_352]
>  at java.lang.Thread.run(Thread.java:750) [?:1.8.0_352]
> Caused by: java.lang.IllegalStateException: The base directory of the
> JobResultStore isn't accessible. No dirty JobResults can be restored.
>  at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:182)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ~[?:1.8.0_352]
>  ... 3 more
> 2023-01-05 18:51:12,211 INFO  org.apache.flink.runtime.blob.BlobServer
>  [] - Stopped BLOB server at 0.0.0.0:6124
> 2023-01-05 18:51:12,574 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Starting the resource manager.
> 2023-01-05 18:51:13,776 INFO
> 

Re: 请问cancel的任务能够恢复running状态吗?

2023-01-04 文章 Weihua Hu
Hi,

简单来说是不能,已经 cancel 的 job 状态不能恢复到 running 状态。用 savepoint 恢复的任务是新的 job。

这个问题的背景是什么呢?什么情况下需要将已经 cancel 的 job 恢复呢?


Best,
Weihua


On Fri, Dec 30, 2022 at 5:12 PM 陈佳豪  wrote:

> hi
> 我目前测试flink restapi
> 指定savepointpath来恢复任务发现会重新触发创建一个新的任务原有的任务还是cancel状态,请问有办法恢复原有cancel状态的任务为running吗?


Re: flink sql connector options如何支持Map数据类型?

2022-12-18 文章 Weihua Hu
Hi, 你可以尝试使用独立开源的 http connector

https://github.com/getindata/flink-http-connector

Best,
Weihua


On Sat, Dec 17, 2022 at 10:21 AM casel.chen  wrote:

> 我想开发一个Flink SQL Http Connector,遇到用户添加自定义请求头Headers问题,如何在connector
> options中支持Map数据类型呢?


Re: Remote system has been silent for too long. (more than 48.0 hours)

2022-11-02 文章 Weihua Hu
Hi,
这种情况一般是这两个 TaskManager 出现故障断开连接了。可以再查看下之前的日志验证下。

Best,
Weihua


On Wed, Nov 2, 2022 at 9:41 AM casel.chen  wrote:

> 今天线上 Flink 1.13.2 作业遇到如下报错,请问是何原因,要如何解决?
> 作业内容是从kafka topic消费canal json数据写到另一个mysql库表
>
>
> 2022-09-17 19:40:03,088 ERROR akka.remote.Remoting
>  [] - Association to [akka.tcp://
> flink-metrics@172.19.193.15:34101] with UID [-633015504] irrecoverably
> failed. Quarantining address.
>
> java.util.concurrent.TimeoutException: Remote system has been silent for
> too long. (more than 48.0 hours)
>
> at
> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:387)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at
> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> 2022-09-25 17:17:21,581 ERROR akka.remote.Remoting
>  [] - Association to [akka.tcp://
> flink-metrics@172.19.193.15:38805] with UID [1496738655] irrecoverably
> failed. Quarantining address.
>
> java.util.concurrent.TimeoutException: Remote system has been silent for
> too long. (more than 48.0 hours)
>
> at
> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:387)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at
> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]


Re: Flink k8s operator高可用部署Flink Session Cluster,提交job遇到异常。

2022-10-27 文章 Weihua Hu
Hi, Young

你的分析是正确的。Flink kubernetes operator 是通过 rest service 来跟 Flink cluster
通信的,Kubernetes 会随机将发往 service 的请求路由到后端的多个 JM Pod
上。任务提交流程分为了:uploadJar,runJob,deleteJar 三个 API,所以会在 opeartor 的日志里看到相关的错误。

也许你可以创建一个 jira issue 来跟进这个问题

Best,
Weihua


On Thu, Oct 27, 2022 at 6:51 PM Young Chen  wrote:

> 【问题描述】
>
> Flink k8s operator(v1.1.0)高可用部署了一个Flink Session Cluster(两个JobManager),
> 然后用SessionJob 部署一个例子job,job有时可以部署,有时部署不了。
>
> 可以看到容器中如下error日志。
>
>
>
> 【操作步骤】
>
> 部署Cluster
>
>
>
> apiVersion: flink.apache.org/v1beta1
>
> kind: FlinkDeployment
>
> metadata:
>
>   name: flink-cluster-1jm-checkpoint
>
> spec:
>
>   image: flink:1.15
>
>   flinkVersion: v1_15
>
>   flinkConfiguration:
>
> taskmanager.numberOfTaskSlots: "1"
>
> state.savepoints.dir:
> file:///flink-data/savepoints
>
> state.checkpoints.dir:
> file:///flink-data/checkpoints
>
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>
> high-availability.storageDir:
> file:///flink-data/ha
>
> state.checkpoints.num-retained: "10"
>
>   serviceAccount: flink
>
>   ingress:
>
> template: "{{name}}.{{namespace}}.k8s.rf.io"
>
>   jobManager:
>
> replicas: 2
>
>   podTemplate:
>
> spec:
>
>   nodeSelector:
>
> kubernetes.io/hostname: k8s17
>
>   containers:
>
> - name: flink-main-container
>
>   volumeMounts:
>
> - mountPath: /flink-data
>
>   name: flink-volume
>
>   volumes:
>
> - name: flink-volume
>
>   hostPath:
>
> # directory location on host
>
> path: /tmp/flink
>
> # this field is optional
>
> type: Directory
>
>
>
> 部署job:
>
>
>
> apiVersion: flink.apache.org/v1beta1
>
> kind: FlinkSessionJob
>
> metadata:
>
>   name: flink-job-1jm-checkpoint
>
> spec:
>
>   deploymentName: flink-cluster-1jm-checkpoint
>
>   job:
>
> jarURI:
> file:///opt/flink/examples/streaming/StateMachineExample.jar
> # 自己打的operator镜像包含了examples的jar
>
> entryClass:
> org.apache.flink.streaming.examples.statemachine.StateMachineExample
>
> parallelism: 1
>
> upgradeMode: savepoint
>
>
>
>
>
> 【相关日志】
>
>   1.  job部署成功可以运行的一次,operator日志:
>
> 2022-10-27 03:38:07,952 o.a.f.k.o.s.FlinkService
> [ERROR][flink/flink-job-1jm-checkpoint] Failed to delete the jar:
> 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar.
>
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException:
> [org.apache.flink.runtime.rest.handler.RestHandlerException: File
> 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar does not exist
> in /tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload.
>
> at
> org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler.lambda$handleRequest$0(JarDeleteHandler.java:80)
>
> at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
> Source)
>
> at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
> Source)
>
> at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source)
>
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
>
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
>
> at java.base/java.lang.Thread.run(Unknown Source)
>
> ]
>
> at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown
> Source
>
> 一个JobManager
> Pod中没有这个/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload/06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar文件,而在另一个JM的Pod中,但这个JM应该不是Leader,因为看到刷出的checkpoint相关的日志在第一个JM中。
>
>
>
>
>
>   1.  job部署失败operator日志:
>
> 2022-10-27 10:12:09,749 i.j.o.p.e.ReconciliationDispatcher
> [ERROR][flink/flink-job-1jm-checkpoint] Error during event processing
> ExecutionScope{ resource id: ResourceID{name='flink-job-1jm-checkpoint',
> namespace='flink'}, version: 120505701} failed.
>
> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
> org.apache.flink.util.FlinkRuntimeException:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> error., 
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rest.handler.RestHandlerException: Jar file
> /tmp/flink-web-69209c8b-6ed5-45f2-aa99-4bc41efb7983/flink-web-upload/d7df9d81-2cfb-4642-a450-e9080a30db12_StateMachineExample.jar
> does not exist
>
> at
> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:172)
>
> at
> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141)
>
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:100)
>
> at
> 

Re: 使用flink-operator 配置中的 taskmanager.numOfSlots 取值存在问题?

2022-10-27 文章 Weihua Hu
Hi, LiTing

这是因为 flink-operator 默认配置中有以下两项默认值。

> taskmanager.numberOfTaskSlots: 2
> parallelism.default: 2

你可以在作业的 yaml 的 flinkConfiguration 中覆盖这两个默认配置


Best,
Weihua


On Thu, Oct 27, 2022 at 9:12 AM Jason_H  wrote:

> hi,Liting Liu
>
> 看你的设置,是将并行度设置为了2,那么作业在运行的时候,所用到的slot为2个,你可以尝试修改并行度,来验证是否这个参数决定了你的taskslot的数量。
> 可以参考一下:
> https://blog.csdn.net/sinat_38079265/article/details/108535909
>
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |
>  回复的原邮件 
> | 发件人 | Liting Liu (litiliu) |
> | 发送日期 | 2022年10月26日 13:19 |
> | 收件人 | user-zh |
> | 主题 | 使用flink-operator 配置中的 taskmanager.numOfSlots 取值存在问题? |
> hi:
> 我尝试使用flink-operator 1.2.0 用如下yaml 创建一个任务(该yaml
> 中并未设置taskmanager.numberOfTaskSlots).  遇到了一个问题。 可以稳定复现。
> 
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
> name: basic-example
> spec:
> image: flink:1.15
> flinkVersion: v1_15
> flinkConfiguration:
> serviceAccount: flink
> jobManager:
> resource:
> memory: "2048m"
> cpu: 1
> taskManager:
> resource:
> memory: "2048m"
> cpu: 1
> job:
> jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
> parallelism: 2
> upgradeMode: stateless
> 
> 但是在生成的configMap (flink-config-basic-example)中,
> 发现“taskmanager.numberOfTaskSlots: 2”
> 不太理解,taskmanager.numberOfTaskSlots=2 是怎么被设置进去的(为什么是2?)。 感觉configMap
> 里不应该有这项配置才对,或者该配置项的值为1。
>
>
>
>
>


Re: 请教下flink源码分支和tag的命名

2022-08-26 文章 Weihua Hu
release 版本会对应到 tag 1.15.1 上,开发改动在是 release-1.15 分支的。


Best,
Weihua


On Fri, Aug 26, 2022 at 10:10 AM yidan zhao  wrote:

> hi。想继续问下。我目前看官方的 tag 1.15.1 显示不属于任何分支,所以最终1.15.1下载的发布包对应是不是1.15.1的tag呢?
>
> 包括后续1.15.2的修改是基于哪个分支 patch 上去的。
>
> Lijie Wang  于2022年7月21日周四 14:01写道:
> >
> > Hi,
> > 1.15.1 应该是对应 tag release-1.15.1
> >
> > yidan zhao  于2022年7月21日周四 12:53写道:
> >
> > > 我目前看了下,有一定规律但也还是不完全懂。
> > > 比如我目前有部分公司内部用到的,希望基于1.15.1的release上加的话,我需要基于哪个分支?还是tag做更改呢?
> > > 哪个branch、or tag是对应官方download页面提供的下载链接的包中一模一样的源码呢,就是不包含新增开发但未发布代码的版本。
> > >
>


Re: Re: Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0

2022-08-26 文章 Weihua Hu
可以尝试升级到 2.5+

Best,
Weihua


On Thu, Aug 25, 2022 at 6:41 PM gulugulucxg  wrote:

> 您好,集群版本是1.1.1,是挺低的,是这个原因吗,升级到多少合适呢
> 在 2022-08-25 18:31:06,"Weihua Hu"  写道:
> >kafka 集群的版本是什么呢?看起来是集群版本有点低了
> >
> >Best,
> >Weihua
> >
> >
> >On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg  wrote:
> >
> >> 大佬们好:
> >>
> >>
> 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4,
> >>
> >> 异常如下:
> >>
> >> 2022-08-25 10:42:44
> >>
> >> org.apache.kafka.common.errors.UnsupportedVersionException: Attempted
> to write a non-default producerId at version 0
> >>
> >> 相关代码如下:
> >> Properties properties = new Properties();
> >> properties.put("bootstrap.servers",
> >> KafkaConstant.bootstrap_servers_01);
> >> properties.put("transaction.timeout.ms", 15 * 60 * 1000);
> >> FlinkKafkaProducer statsLogV2Producer = new
> >> FlinkKafkaProducer<>(
> >> KafkaConstant.topic_01,
> >> new MyKafkaSerializationSchema(KafkaConstant.topic_01),
> >> properties ,
> >> FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
> >>
>


Re: Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0

2022-08-25 文章 Weihua Hu
kafka 集群的版本是什么呢?看起来是集群版本有点低了

Best,
Weihua


On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg  wrote:

> 大佬们好:
>
> 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4,
>
> 异常如下:
>
> 2022-08-25 10:42:44
>
> org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to 
> write a non-default producerId at version 0
>
> 相关代码如下:
> Properties properties = new Properties();
> properties.put("bootstrap.servers",
> KafkaConstant.bootstrap_servers_01);
> properties.put("transaction.timeout.ms", 15 * 60 * 1000);
> FlinkKafkaProducer statsLogV2Producer = new
> FlinkKafkaProducer<>(
> KafkaConstant.topic_01,
> new MyKafkaSerializationSchema(KafkaConstant.topic_01),
> properties ,
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>


Re: HA模式,standalone集群,仅单个 JM 情况下任务异常。

2022-08-23 文章 Weihua Hu
PartitionNotFoundException 应该是跟描述的有一台 TM ip 是 127.0.0.1 有关,其他 TM 节点链接不到这个节点。

用的什么版本呢?

配置文件是这样的吗?
master 文件中有一个 内网 IP: A
workers 文件中有多个内网 IP: A,B,C

Best,
Weihua


On Tue, Aug 23, 2022 at 7:37 PM yidan zhao  wrote:

>
> 如题,目前发现任务报错是:org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition
> c74a0a104d81bf2d38f76f104d65a2ab#27@7e1a8495f062f8ceb964a3205e584613
> not found
>
> ——
> 任务本身问题不大,也不是网络问题。 目前发现解决方法:
>
> 换成非单 JM 即可。
>
> 同时也发现一个可能原因,或另一个明显现象:
>
> 从web ui的Taskmanager界面可以发现,执行 start-cluster 脚本的机器A(同时也是 JM ,即配置到
> masters 文件的唯一机器),该机器对应的tm的resource id中ip是127.0.0.1。其他机器都是显示的内网ip。
>
> 
> masters文件换2个以上机器后,没问题了,包括后一个现象,ip也都是正常的。
>


Re: flink sql支持监听单个文件内容变化吗?

2022-08-19 文章 Weihua Hu
Hi,
不支持监听单个文件的变化,但是可以监听某个目录下文件的新增。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#source

Best,
Weihua


On Fri, Aug 19, 2022 at 11:56 AM casel.chen  wrote:

> flink sql支持监听单个文件内容变化吗?文件中每一行是一条记录,对外输出的模式可以全量或者变量。


Re: akka.framesize配置问题

2022-08-19 文章 Weihua Hu
Hi,

看这个报错没有影响 Flink 任务的运行,不太像是 Flink 内部的通信。可以检查下是否有外部非预期的 API 请求(可能是安全的定期扫描?)

Best,
Weihua


On Fri, Aug 19, 2022 at 3:31 PM 杨扬  wrote:

> 各位大佬好!
> 最近将升级flink至1.14.2版本后出现附件图片中告警,每天固定时间告警几次。
> 经过初步排查属于akka.framesize设置问题,默认值太小需要调大,但是感觉需要调大的过多了,想请教下直接调教至200M以上是否合理?
> PS:使用 flink on yarn 模式,application模式启动。
> --
> 杨扬
> 银联数据服务有限公司 研究院
> 电话:021-60269751
> 邮箱:yangya...@cupdata.com
>
>
>
>
>


Re: 关于flink读取csv文件问题

2022-08-09 文章 Weihua Hu
Hi,

 CSVInputFormat 默认没有考虑引号,会直接按照 ',' 分割单行。可以尝试在解析 csv 时指定双引号

csvInputFormat.enableQuotedStringParsing('"');


Best,
Weihua


On Wed, Aug 10, 2022 at 9:40 AM 胡凌瑞  wrote:

>
> 你好,我有一个CSV文件在附件中,我通过flink读取csv文件,已经按照CSV文件的格式创建了对应的POJO,然后使用PojoCsvInputFormat来解析他,这里是我的代码
> [image: image.png]
>
>
> 这里有个问题是,我的CSV文件的第二行的title里,双引号里有两个逗号,flink就无法正确读取这个csv文件并会exception,如果我把第二行里的逗号删掉,就可以正常运行,可以看看是不是因为csv文件解析导致的吗,或者说是我的使用方法不当。
>
> 十分感谢!
>


Re: Oracle CDC产生大量logminer日志

2022-08-09 文章 Weihua Hu
Hi Kyle,

你可以尝试启动 Flink cluster 前修改 conf/log4j.properties 的内容来过滤特定的日志

Best,
Weihua


On Tue, Aug 9, 2022 at 5:10 PM Kyle Zhang  wrote:

> Hi,Team
>   最近在使用cdc的方式获取oracle数据的时候,dba反馈产生了大量的logminer日志,有没有方式调整日志级别,或者有特定参数可以加上?
>
>
>
>
> Best
>


Re: Re:Does flink sql support UDTAGG

2022-08-08 文章 Weihua Hu
Hi, wang

Maybe you can take a look at
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-function

Best,
Weihua


On Mon, Aug 8, 2022 at 3:52 PM wang <24248...@163.com> wrote:

>
>
>
> Hi,
>
>
> Thanks for your response,  I guess what I need should be this one
> (UDTAGG):
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-aggregate-functions
> As I want multiple rows as aggregate output. So my question: can we use
> UDTAGG in flink SQL?.  If so, is there some guide of how to use UDTAGG in
> flink SQL?  As there are only flink table api instructions of UDTAGG  in
> the page above.
>
>
>
>
> Thanks,
> Hunk
>
>
>
>
>
>
>
>
> At 2022-08-08 10:56:22, "Xuyang"  wrote:
> >Hi, what you want is UDAF? Please check whether this[1] is meet your
> requirement.
> >
> >[1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#aggregate-functions
> >
> >
> >
> >在 2022-08-07 22:06:29,"wang" <24248...@163.com> 写道:
> >
> >Hi dear engineers,
> >
> >
> >One small question:  does flink sql support UDTAGG? (user-defined table
> aggregate function), seems only supported in flink table api? If not
> supported in flink sql, how can I define an aggregated udf which could
> output multiple rows to kafka.
> >
> >
> >Thanks for your help!
> >
> >
> >
> >
> >Regards,
> >Hunk
>


Re: pyflink 和 add_jars 的 add_classpaths 路径。

2022-07-27 文章 Weihua Hu
Hi, 你是怎么提交的任务呢?是提交到远端的 session cluster 上吗?有其他的相关日志吗?

Best,
Weihua


On Wed, Jul 27, 2022 at 5:36 PM yidan zhao  wrote:

> 而且pyflink既然打包了flink的完整包,那么真正部署运行的时候是用这个呢?还是需要执行的机器上单独部署一个flink呢?
>
> yidan zhao  于2022年7月27日周三 17:34写道:
> >
> > 我将这3个jar放到pyflink的lib下则是可以的。通过 add_jar 方式给出是不可以的。有人知道原因吗。
> >
> > yidan zhao  于2022年7月27日周三 10:40写道:
> > >
> > > pyflink情况 flink-sql-connector-kafka-1.15.0.jar 可以。
> > > 但 flink-connector-base-1.15.0.jar + flink-connector-kafka-1.15.0.jar +
> > > kafka-clients-2.8.1.jar 却报:
> > > py4j.protocol.Py4JError:
> > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer does
> > > not exist in the JVM
> > >
> > > Weihua Hu  于2022年7月26日周二 21:21写道:
> > > >
> > > > 最终会放到 pipeline.jars 配置中,在提交作业时上传到 blobServer
> > > >
> > > > Best,
> > > > Weihua
> > > >
> > > >
> > > > On Tue, Jul 26, 2022 at 5:40 PM yidan zhao 
> wrote:
> > > >
> > > > > 如题,我看注释和文档。
> > > > > add_jars 是添加要upload到cluster的jar,那么上传到什么路径呢?
> > > > >
>


Re: pyflink 和 add_jars 的 add_classpaths 路径。

2022-07-26 文章 Weihua Hu
最终会放到 pipeline.jars 配置中,在提交作业时上传到 blobServer

Best,
Weihua


On Tue, Jul 26, 2022 at 5:40 PM yidan zhao  wrote:

> 如题,我看注释和文档。
> add_jars 是添加要upload到cluster的jar,那么上传到什么路径呢?
>


Re: 如何获取Job启动时间

2022-07-25 文章 Weihua Hu
Hi,当前的确没有太多的打点和日志,按照我们的经验,需要在代码流程中插入一些日志和打点来辅助做基准测试
Best,
Weihua


On Fri, Jul 22, 2022 at 6:50 PM 邹璨  wrote:

> Hi,
> 有个问题想请教一下~
> 项目需要优化Job启动时间,并做基准测试。查阅资料后发现下面博客中做过类似的测试:
> https://flink.apache.org/2022/01/04/scheduler-performance-part-one.html
> 但不知道里面的时间是如何获取的,不知是否有对应的指标或日志,还是只能人为观测。
>
> 谢谢~
>
> 此电子邮件及其包含的信息将仅发送给上面列出的收件人,必须加以保护,并且可能包含法律或其他原因禁止披露的信息。
> 如果您不是此电子邮件的预期收件人,未经许可,您不得存储、复制、发送、分发或披露它。 禁止存储、复制、发送、分发或披露电子邮件的任何部分。
> 如果此电子邮件发送不正确,请立即联系 NAVER Security(dl_naversecur...@navercorp.com
> )。然后删除所有原件、副本和附件。谢谢您的合作。
> ​
> This email and the information contained in this email are intended solely
> for the recipient(s) addressed above and may contain information that is
> confidential and/or privileged or whose disclosure is prohibited by law or
> other reasons.
> If you are not the intended recipient of this email, please be advised
> that any unauthorized storage, duplication, dissemination, distribution or
> disclosure of all or part of this email is strictly prohibited.
> If you received this email in error, please immediately contact NAVER
> Security (dl_naversecur...@navercorp.com) and delete this email and any
> copies and attachments from your system. Thank you for your cooperation.​
>


Re: flink on yarn 作业挂掉反复重启

2022-07-25 文章 Weihua Hu
可以检查下是不是 JobManager 内存不足被 OOM kill 了,如果有更多的日志也可以贴出来

Best,
Weihua


On Mon, Jul 18, 2022 at 8:41 PM SmileSmile  wrote:

> hi,all
> 遇到这种场景,flink on yarn,并行度3000的场景下,作业包含了多个agg操作,作业recover from checkpoint
> 或者savepoint必现无法恢复的情况,作业反复重启
> jm报错org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> RECEIVED S
> IGNAL 15: SIGTERM. Shutting down as requested.
>
> 请问有什么好的排查思路吗
>
>
>
>
>


Re: Re: flink-hudi-hive

2022-07-12 文章 Weihua Hu
单从这个日志看不到一直 Failover ,相关任务反复初始化是指哪个任务呢?
看到了一些 akka 的链接异常,有可能是对应的 TM 异常退出了,可以再确认下 192.168.10.227:35961 这个是不是
TaskManager 地址,以及为什么退出

Best,
Weihua


On Tue, Jul 12, 2022 at 9:37 AM ynz...@163.com  wrote:

> 这是job managers所有日志:
> 2022-07-12 09:33:02,280 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: execution.shutdown-on-attached-exit, false
> 2022-07-12 09:33:02,280 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: pipeline.jars,
> file:/home/dataxc/opt/flink-1.14.4/opt/flink-python_2.11-1.14.4.jar
> 2022-07-12 09:33:02,280 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: execution.checkpointing.min-pause, 8min
> 2022-07-12 09:33:02,280 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: restart-strategy, failure-rate
> 2022-07-12 09:33:02,280 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.memory.jvm-metaspace.size, 128m
> 2022-07-12 09:33:02,280 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: state.checkpoints.dir, hdfs:///flink/checkpoints
> 2022-07-12 09:33:02,382 WARN  akka.remote.transport.netty.NettyTransport
>  [] - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: n103/192.168.10.227:35961
> 2022-07-12 09:33:02,383 WARN  akka.remote.ReliableDeliverySupervisor
>  [] - Association with remote system 
> [akka.tcp://flink@n103:35961]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@n103:35961]] Caused by:
> [java.net.ConnectException: Connection refused: n103/192.168.10.227:35961]
> 2022-07-12 09:33:02,399 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting
> RPC endpoint for
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager at
> akka://flink/user/rpc/resourcemanager_1 .
> 2022-07-12 09:33:02,405 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Starting the resource manager.
> 2022-07-12 09:33:02,479 INFO
> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider [] -
> Failing over to rm2
> 2022-07-12 09:33:02,509 INFO
> org.apache.flink.yarn.YarnResourceManagerDriver  [] - Recovered
> 0 containers from previous attempts ([]).
> 2022-07-12 09:33:02,509 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Recovered 0 workers from previous attempt.
> 2022-07-12 09:33:02,514 WARN  akka.remote.transport.netty.NettyTransport
>  [] - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: n103/192.168.10.227:35961
> 2022-07-12 09:33:02,515 WARN  akka.remote.ReliableDeliverySupervisor
>  [] - Association with remote system 
> [akka.tcp://flink@n103:35961]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@n103:35961]] Caused by:
> [java.net.ConnectException: Connection refused: n103/192.168.10.227:35961]
> 2022-07-12 09:33:02,528 INFO  org.apache.hadoop.conf.Configuration
>  [] - resource-types.xml not found
> 2022-07-12 09:33:02,528 INFO
> org.apache.hadoop.yarn.util.resource.ResourceUtils   [] - Unable to
> find 'resource-types.xml'.
> 2022-07-12 09:33:02,538 INFO
> org.apache.flink.runtime.externalresource.ExternalResourceUtils [] -
> Enabled external resources: []
> 2022-07-12 09:33:02,541 INFO
> org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper
> bound of the thread pool size is 500
> 2022-07-12 09:33:02,584 WARN  akka.remote.transport.netty.NettyTransport
>  [] - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: n103/192.168.10.227:35961
> 2022-07-12 09:33:02,585 WARN  akka.remote.ReliableDeliverySupervisor
>  [] - Association with remote system 
> [akka.tcp://flink@n103:35961]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@n103:35961]] Caused by:
> [java.net.ConnectException: Connection refused: n103/192.168.10.227:35961]
>
>
>
> best,
> ynz...@163.com
>
> From: Weihua Hu
> Date: 2022-07-11 19:46
> To: user-zh
> Subject: Re: flink-hudi-hive
> Hi,
> 任务反复初始化是指一直在 Failover 吗?在 JobManager.log 里可以看到作业 Failover 原因,搜索关键字; "to
> FAILED"
>
> Best,
> Weihua
>
>
> On Mon, Jul 11, 2022 at 2:46 PM ynz...@163.com  wrote:
>
> > Hi,
> > 我正在使用flink将数据写入hudi并同步至hive,将任务提交到yarn后,我从flink web
> > ui看到:相关任务反复初始化,task managers无任何信息。日志中也无明确错误提示 ;
> > 当我删除代码中sync_hive相关配置,并且不改变其他配置,数据能正常写入hudi ;
> > 我使用的hudi-0.11.1,flink-1.14.4,hadoop-3.3.1,hive-3.1.3 ;
> >
> >
> >
> > best,
> > ynz...@163.com
> >
>


Re: 请教:关于如何释放 Flink Job 中某个对象持有的资源

2022-07-12 文章 Weihua Hu
Hi,

不建议在 TM 内部多个 Task 间共享变量,每个 Task 单独使用自己的资源,在 RichFunction open 时初始化资源,close
时释放资源。否则容易导致资源泄露

Best,
Weihua


On Tue, Jul 12, 2022 at 2:31 PM RS  wrote:

> Hi,
>
>
> 如果是访问ES的话,Flink里面自带ES的connector,你可以直接使用,或者参考源码,source和sink接口都有对应的方法
>
>
>
> 资源是否在一个线程里面,这个取决与你代码逻辑,如果在不同的线程或者进程的话,设计上,就不要用同一个EsClientHolder,各个不同阶段各自去new和close对象,
>
>
> Thanks
>
>
> 在 2022-07-12 12:35:31,"Bruce Zu"  写道:
> > Flink team好,
> > 我有一个很一般的问题,关于如何释放 Flink Job 中某个对象持有的资源。
> >
> > 我是 Flink 的新用户。我搜索了很多,但没有找到相关文件。但我确信有一个标准的方法来解决它。
> >
> >我的Flink 应用程序中需要访问 Elasticsearch 服务器。我们使用从
> >org.elasticsearch.client.RestHighLevelClient 扩展而来的类 EsClient 来完成查询工作,
> >一旦不再使用它就需要调用它的`close`方法来释放资源。
> >
> >所以我需要找到合适的地方来确保资源总是可以被释放,即使在调用的某个地方发生了一些异常
> >
> >我现在能想到的是使用 `ThreadLocal` 将生成的 EsClient 对象保留在main class的 main 方法的开头,并且
> >在 main 方法结束时释放资源。
> >
> >类似这样的伪代码:
> >```java
> >公共类 EsClientHolder {
> >  private static final ThreadLocal local = new
> >InheritableThreadLocal<>();
> >
> >  public static final void createAndSetEsClient(EsClient esClient){
> >local.set(esClient);
> >  }
> >
> >  private static final createAndSetEsClientBy(EsClientConfig
> >esClientConfig){
> >EsClient instance = new EsClient(esClientConfig);
> >createAndSetEsClient(instance)  ;
> >  }
> >
> >   private static final   EsClient get() {
> >EsClient c = local.get();
> >if(c == null){
> >  throw new RuntimeException("确保在使用前创建并设置 EsClient 实例");
> >}
> >return c;
> >  }
> >
> >private static final  close()抛出 IOException {
> >EsClient o = local.get();
> >if(o!= null){
> >  o.close();
> >}
> >  }
> >
> >// 在 Fink 应用程序代码中的用法
> >   public class main class {
> >public static void main(String[] args) throws IOException {
> >  try {
> >property prop = null;
> >EsClientConfig configuration = getEsClientConfig(prop);
> >EsClientHolder.createAndSetEsClientBy(config);
> >   // …
> >   SomeClass.method1();
> >   other classes.method2();
> >   // ...
> >  } at last {
> >EsClientHolder.close();
> >  }
> >}
> >  }
> >
> >class SomeClass{
> >   public void. method 1(){
> >// 1. Use EsClient in any calling method of any other class:
> >EsClient esClient = EsClientHolder.get();
> >   // …
> >   }
> >}
> >class other class {
> >  public void method 2() {
> >  // 2. Use EsClient in any calling method of any forked child thread
> >new thread (
> >() -> {
> >  EsClient client = EsClientHolder.get();
> >  // …
> >})
> >. start();
> > // …
> >  }
> >}
> >
> >```
> >
> >我知道 TaskManager 是一个 Java JVM 进程,而 Task 是由一个 java Thread 执行的。
> >
> >但我不知道 Flink 如何创建作业图,以及 Flink 最终如何将任务分配给线程以及这些线程之间的关系。
> >
> >比如 Flink 把 SomeClass 的 method1 和 OtherClass 的 method2 分配到一个和运行 MainClass
> >的线程不一样的线程,
> >那么运行method1和mehod2的线程就没有办法拿到EsClient了。
> >这里我假设 MainClass 中的 main 方法将在一个线程中执行。如果不是,比如将 set() 和 close()
> 拆分为在不同的线程中运行,则就
> >没有办法释放资源。
> >
> >谢谢!
>


Re: flink web ui Checkpoints显示为空

2022-07-12 文章 Weihua Hu
Hi,
这种情况应该是作业没有开启 CheckPoint,可以检查下作业逻辑

Best,
Weihua


On Tue, Jul 12, 2022 at 3:30 PM 陈卓宇 <2572805...@qq.com.invalid> wrote:

> flink1.14.5
>
> flink web ui Checkpoints显示为:No Data


Re: sql-client java.lang.ClassNotFoundException: org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat

2022-07-11 文章 Weihua Hu
Hi,

看起来 lib 目录下的文件权限不一样,flink-sql-parquet-1.15.0.jar 是 root 用户的,flink
进程是通过什么用户启动的呢?

Best,
Weihua


On Mon, Jul 11, 2022 at 7:36 PM jiangjiguang719 
wrote:

> hi,
>  我使用 sql-client 读取parquet文件,报错:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException:
> org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat
>
>
> Flink版本:1.15.0/1.15.1
>
>
> SQL语句:
> Flink SQL> CREATE TABLE orders_parquet (
> >   int32_fieldINT,
> >   int32_field1   INT,
> >   int32_field2   INT
> > ) WITH (
> >  'connector' = 'filesystem',
> >  'path' = '/data/testdata/PARQUET-1_4',
> >  'format' = 'parquet'
> > );
> [INFO] Execute statement succeed.
>
>
> Flink SQL> select * from orders_parquet where int32_field > 100;
>
>
> Lib包如下:
>
>
> [root@icx20 flink-1.15.1]# ll lib/
> total 212528
> -rw-r--r--. 1 root root 62050 Jul 11 19:27 commons-logging-1.1.3.jar
> -rw-r--r--. 1 sae  sae 194416 Jun 22 02:51 flink-cep-1.15.1.jar
> -rw-r--r--. 1 sae  sae 484728 Jun 22 02:54
> flink-connector-files-1.15.1.jar
> -rw-r--r--. 1 sae  sae  95184 Jun 22 03:03 flink-csv-1.15.1.jar
> -rw-r--r--. 1 sae  sae  115818049 Jun 22 03:13 flink-dist-1.15.1.jar
> -rw-r--r--. 1 sae  sae 175487 Jun 22 03:05 flink-json-1.15.1.jar
> -rw-r--r--. 1 sae  sae   21041716 Jun 22 03:10 flink-scala_2.12-1.15.1.jar
> -rw-rw-r--. 1 sae  sae   10737871 May 12 22:45
> flink-shaded-zookeeper-3.5.9.jar
> -rw-r--r--. 1 root root   5381644 Jul 11 19:25 flink-sql-parquet-1.15.0.jar
> -rw-r--r--. 1 sae  sae   15262738 Jun 22 03:10
> flink-table-api-java-uber-1.15.1.jar
> -rw-r--r--. 1 sae  sae   36236261 Jun 22 03:10
> flink-table-planner-loader-1.15.1.jar
> -rw-r--r--. 1 sae  sae2996565 Jun 22 02:51
> flink-table-runtime-1.15.1.jar
> -rw-r--r--. 1 root root   2792264 Jul 11 19:28 guava-29.0-jre.jar
> -rw-r--r--. 1 root root   3990042 Jul 11 19:26 hadoop-common-2.8.5.jar
> -rw-rw-r--. 1 sae  sae 208006 May 12 22:15 log4j-1.2-api-2.17.1.jar
> -rw-rw-r--. 1 sae  sae 301872 May 12 22:15 log4j-api-2.17.1.jar
> -rw-rw-r--. 1 sae  sae1790452 May 12 22:15 log4j-core-2.17.1.jar
> -rw-rw-r--. 1 sae  sae  24279 May 12 22:15 log4j-slf4j-impl-2.17.1.jar


Re: flink-hudi-hive

2022-07-11 文章 Weihua Hu
Hi,
任务反复初始化是指一直在 Failover 吗?在 JobManager.log 里可以看到作业 Failover 原因,搜索关键字; "to
FAILED"

Best,
Weihua


On Mon, Jul 11, 2022 at 2:46 PM ynz...@163.com  wrote:

> Hi,
> 我正在使用flink将数据写入hudi并同步至hive,将任务提交到yarn后,我从flink web
> ui看到:相关任务反复初始化,task managers无任何信息。日志中也无明确错误提示 ;
> 当我删除代码中sync_hive相关配置,并且不改变其他配置,数据能正常写入hudi ;
> 我使用的hudi-0.11.1,flink-1.14.4,hadoop-3.3.1,hive-3.1.3 ;
>
>
>
> best,
> ynz...@163.com
>


Re: filesystem java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.ParserFactory

2022-07-11 文章 Weihua Hu
Hi,

有更详细的日志吗?看起来是类加载冲突的,需要明确下是哪个类冲突了

Best,
Weihua


On Wed, Jul 6, 2022 at 1:53 PM RS  wrote:

> Hi,
>
>
> 通过sql-client执行flink sql,connector选择filesystem,会出现如下报错
> java.lang.ClassNotFoundException:
> org.apache.flink.table.planner.delegation.ParserFactory
>
>
> Flink SQL> CREATE TABLE t1 (
> > a STRING,
> > b INT
> > )WITH(
> > 'connector'='filesystem',
> > 'path'='/tmp/qwe',
> > 'format'='csv',
> > 'csv.ignore-parse-errors' = 'true',
> > 'csv.allow-comments' = 'true'
> > );
> [INFO] Execute statement succeed.
> Flink SQL> select * from t1;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException:
> org.apache.flink.table.planner.delegation.ParserFactory
>
>
> 我测试了下,是因为我的lib目录下,有
> flink-sql-connector-hive-3.1.2_2.12-1.15.0.jar,放lib下是因为还要其他任务需要读写hive
> 如果lib下没有flink-sql-connector-hive-3.1.2_2.12-1.15.0.jar,则没有这个报错
>
>
> 请教下,这个问题如何解决呢?
>
>
> Thanks


Re: 请教下flink的提交方式

2022-07-04 文章 Weihua Hu
Hi,
根据你的描述你应该使用的 session cluster,并通过命令行提交作业,这种情况下的确只能在日志中看到 job id,并且级别的是 INFO.
可以尝试通过 RestAPI 提交任务[1],这种方式会返回 JobID。但是整体提交流程改动比较大,建议把 client 侧的日志级别调整成
INFO,不会打印非常多的日志

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run

Best,
Weihua


On Mon, Jul 4, 2022 at 7:05 PM Lijie Wang  wrote:

> Hi,
> 拿不到任务 id 是指 Flink job id 么?
> 另外你的部署方式是什么样子的? 如果是 session/perjob 这种在 client 端编译 job graph,你可以在 main 方法中打印
> job id 的
>
> Best,
> Lijie
>
> sherlock zw  于2022年7月4日周一 17:51写道:
>
> > 目前我需要去监控已经提交的flink任务,
> >
> 但是通过命令行方式提交的话拿不到任务id,只能通过INFO级别的日志过滤出来,但是我们的环境里面的日志界别是WARN,看不到任务id的日志输出,所以想问下除了命令行的方式提交任务还有其他方式吗,例如有和Spark类似的SparkLaunch一样的jar提交的方式吗?希望大佬指点下,谢谢。
> >
> >
> >
>


Re: Re: 来自潘明文的邮件

2022-06-28 文章 Weihua Hu
可以通过 FLINK ui 查看两个 SINK 是否 chain 在同一个 Task 内,不过可以简单认为两个 sink 是并行的
Best,
Weihua


On Tue, Jun 28, 2022 at 6:30 PM 潘明文  wrote:

> HI 您好,
>
>
>   我的就是一个源同时写入HBASE SINK,和HBASE SINK.。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-06-27 23:54:33,"Weihua Hu"  写道:
> >Hi,
> >图片看不到了,正常来说做个 Sink 算子之间是没有执行先后顺序保证的,是可以并行的。 但是如果多个 sink 被 operator chain
> >优化在一起,单个 operator chain 内部数据是并行的
> >Best,
> >Weihua
> >
> >
> >On Fri, Jun 24, 2022 at 9:29 PM Lincoln Lee 
> wrote:
> >
> >> Hi,
> >>邮件中直接贴图片无法正常看到,可以发下文本
> >>
> >> Best,
> >> Lincoln Lee
> >>
> >>
> >> 潘明文  于2022年6月24日周五 16:36写道:
> >>
> >> > 你好,下面2个SINK 能够并发同时处理吗?还是要窜行,等第一个SINK 好了,才能第二个SINK.
> >> >
> >> >
> >>
>


Re: IDEA尝试编译最新flink源码报错

2022-06-28 文章 Weihua Hu
Hi,
图片还是挂了,可以尝试用一些图床工具,贴链接到邮件里。

Best,
Weihua


On Tue, Jun 28, 2022 at 5:21 PM Howie Yang  wrote:

> 刚才好像图片挂了
>
> 补充报错文字内容:
>
>
> flink\flink-rpc\flink-rpc-akka\src\main\java\org\apache\flink\runtime\rpc\akka\RobustActorSystem.java:92:56
>
> java: 无法将类 scala.Option中的方法 getOrElse应用到给定类型;
>
>   需要: scala.Function0
>
>   找到: super::unc[...]ndler
>
>   原因: 无法推断类型变量 B
>
> (参数不匹配; scala.Function0 不是函数接口
>
>   在 接口 scala.Function0 中找到多个非覆盖抽象方法)
>
>
>
>
>
> --
> EMAIL: haoyuyan...@126.com
> 2022年6月28日
>
>
> 在 2022-06-28 17:07:04,"Howie Yang"  写道:
>
> 使用IDEA根据官网最新的教程
> https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/ide_setup/
>  将源码下载下来编译,报以下错误
>
>
> Project Structure中版本
> scala-sdk-2.12.7
>
> 请问是什么原因呢?
>
>
>
> --
> EMAIL: haoyuyan...@126.com
> 2022年6月28日
>
>


Re: flink on k8s的application模式

2022-06-28 文章 Weihua Hu
Hi,
图片看不到了,是不是在Main 方法中调用了两次 env.execute 呢?可以提供下日志

Best,
Weihua


On Tue, Jun 28, 2022 at 8:52 PM 陈卓宇 <2572805...@qq.com.invalid> wrote:

>
> flink版本:1.13.1
> 提交flink on k8s的application模式,提交完成发现webui的Running Job有两个,跟session模式非常像
> 截图在附件
>


Re: 来自潘明文的邮件

2022-06-27 文章 Weihua Hu
Hi,
图片看不到了,正常来说做个 Sink 算子之间是没有执行先后顺序保证的,是可以并行的。 但是如果多个 sink 被 operator chain
优化在一起,单个 operator chain 内部数据是并行的
Best,
Weihua


On Fri, Jun 24, 2022 at 9:29 PM Lincoln Lee  wrote:

> Hi,
>邮件中直接贴图片无法正常看到,可以发下文本
>
> Best,
> Lincoln Lee
>
>
> 潘明文  于2022年6月24日周五 16:36写道:
>
> > 你好,下面2个SINK 能够并发同时处理吗?还是要窜行,等第一个SINK 好了,才能第二个SINK.
> >
> >
>


Re: 任务 cancel 失败,个别 task 一直处于 CANCELING 状态

2022-06-27 文章 Weihua Hu
Hi,
Task 长时间 Cancel 失败(默认 180s)会触发 watchdog 导致 TaskManager 主动退出,并定时输出日志打印当前
Task 线程执行的 thread 信息(默认 30s 一次),可以检查下 TaskManager 的日志,找一下关键字

but is stuck in method:


Best,
Weihua


On Mon, Jun 27, 2022 at 6:45 PM Lijie Wang  wrote:

> Hi,
>
> 1. 建议贴下完整的 TM 日志和 jstack
> 2. 可以看下 GC 日志,看下 GC 是否正常
>
> Best,
> Lijie
>
> 李辉  于2022年6月27日周一 15:46写道:
>
> > 求助:如题,Flink 版本 1.13.2,作业部署在 k8s
> >
> > 1、概览:
> >
> >
> > 2、被 hang 住的TM 日志,之后没有其他日志了,也没有异常:
> >
> >
> >
> > 3、jstack 分析,没有发现 Block 状态的线程
> >
> >
> >
>


Re: flink 1.10.1 flinkui 取消任务 任务一直处于cancalling中 很长时间才取消掉

2022-06-16 文章 Weihua Hu
Hi,
建议看一下 JobManager 的日志,检查下再 Canceling 时作业是在什么状态。也检查下 Task 是否使用 UDF ,在 UDF
close 时是否有耗时的操作。

Best,
Weihua


On Thu, Jun 16, 2022 at 3:11 PM 沈保源 <757434...@qq.com.invalid> wrote:

> flink 1.10.1 flinkui 取消任务 任务一直处于cancalling中 很长时间才取消掉


Re: 关于PyFlink的开发环境问题

2022-06-16 文章 Weihua Hu
Hi,

看起来是依赖缺失问题,建议参考官方教程文档跑通一个简单的示例

Table API:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/table_api_tutorial/
DataStreamAPI:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/datastream_tutorial/

Best,
Weihua


On Wed, Jun 15, 2022 at 8:35 PM Xingbo Huang  wrote:

> Hi,
>
> 你可以执行 pip install -r flink-python/dev/dev-requirements.txt 安装开发环境所需要的依赖
>
> Best,
> Xingbo
>
> 张 兴博  于2022年6月15日周三 10:20写道:
>
> > 您好:
> >我是一名学习使用pyflink的用户,我想在ubuntu20.04上开发pyflink,但是在运行代码的时候,报错为:
> >
> > Traceback (most recent call last):
> >   File "/root/.py", line 6, in 
> > s_env = StreamExecutionEnvironment.get_execution_environment()
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
> > line 805, in get_execution_environment
> > return StreamExecutionEnvironment(j_stream_exection_environment)
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
> > line 62, in __init__
> > self._open()
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
> > line 973, in _open
> > startup_loopback_server()
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
> > line 963, in startup_loopback_server
> > from pyflink.fn_execution.beam.beam_worker_pool_service import \
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/pyflink/fn_execution/beam/beam_worker_pool_service.py",
> > line 31, in 
> > from apache_beam.options.pipeline_options import DebugOptions
> >   File "/usr/local/lib/python3.8/dist-packages/apache_beam/__init__.py",
> > line 96, in 
> > from apache_beam import io
> >   File
> > "/usr/local/lib/python3.8/dist-packages/apache_beam/io/__init__.py", line
> > 23, in 
> > from apache_beam.io.avroio import *
> >   File "/usr/local/lib/python3.8/dist-packages/apache_beam/io/avroio.py",
> > line 63, in 
> > from apache_beam.io import filebasedsink
> >   File
> > "/usr/local/lib/python3.8/dist-packages/apache_beam/io/filebasedsink.py",
> > line 36, in 
> > from apache_beam.io import iobase
> >   File "/usr/local/lib/python3.8/dist-packages/apache_beam/io/iobase.py",
> > line 57, in 
> > from apache_beam.transforms import Impulse
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/apache_beam/transforms/__init__.py",
> > line 25, in 
> > from apache_beam.transforms.external import *
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/apache_beam/transforms/external.py",
> > line 45, in 
> > from apache_beam.runners import pipeline_context
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/pipeline_context.py",
> > line 51, in 
> > from apache_beam.transforms import environments
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/apache_beam/transforms/environments.py",
> > line 54, in 
> > from apache_beam.runners.portability.sdk_container_builder import
> > SdkContainerImageBuilder
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/sdk_container_builder.py",
> > line 44, in 
> > from apache_beam.internal.gcp.auth import get_service_credentials
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/apache_beam/internal/gcp/auth.py",
> > line 28, in 
> > from oauth2client.client import GoogleCredentials
> >   File "/usr/local/lib/python3.8/dist-packages/oauth2client/client.py",
> > line 39, in 
> > from oauth2client import transport
> >   File
> "/usr/local/lib/python3.8/dist-packages/oauth2client/transport.py",
> > line 17, in 
> > import httplib2
> > ModuleNotFoundError: No module named 'httplib2'
> >
> > 通过查询发现在python新版中,httplib2已经不用了?采用的名字是http.client?
> > 我的python版本为3.8.10,jdk为openjdk 11.0.15(另一台为java 1.8)
> > 我想知道这是什么原因造成的呢?怎么能解决这个问题呢?
> >
> > 感谢您在百忙之中解答我的问题,万分感谢~!
> >
> > 发送自 Windows 11 版邮件应用
> >
> >
>


Re: flink k8s ha

2022-06-08 文章 Weihua Hu
Hi,
删除 deployment 会将关联到这个 Deployment 的 Pod、Service、flink-conf configmap 等删除。但是
HA 相关的 configmap 没有配置 owner reference,是不会被删除的。主要目的是集群重启时可以从之前的HA
状态中恢复。更多内容参考官方文档[1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up
Best,
Weihua


On Wed, Jun 8, 2022 at 4:24 PM json <18042304...@163.com> wrote:

> configmap 如下
> sql-test--jobmanager-leader
> sql-test-resourcemanager-leader
> sql-test-restserver-leader
> sql-test-dispatcher-leader
>
>
>
> 在 2022-06-08 15:42:52,"json" <18042304...@163.com> 写道:
>
> flink1.13.6 on k8s application 模式,设置HA
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: oss
> 会在 k8s 上生成configmap
>
>
> 1. 但在 k8s 删除此任务的 deployment 后,为什么这些configmap还在?(任务都删了,这些ha应该不需要了吧)
> 2. 任务重新启动后,还是会去这些 configmap 读ha配置,这个逻辑也很奇怪,任务重启,为什么要去读之前HA信息
>
> 为什么会关注这个,因为碰到一个问题:
> 任务重启报错,找不到
> /high-availability.storageDir/task/completedCheckpointe5c125ad20ea 文件,
> 但oss 是有文件
> /high-availability.storageDir/task/completedCheckpointe/completedCheckpointacdfb4309903
> 导致我任务一直报错;删除 上面的configmap 才能正常运行
>
>
>
>
>
>


Re: flink webui stdout疑惑

2022-06-08 文章 Weihua Hu
进入日志是有打印的是指日志目录的 taskmanager.out 里面有内容吗?
cluster 启动的时候指定日志文件了吗

Best,
Weihua


On Wed, Jun 8, 2022 at 4:51 PM 陈卓宇 <2572805...@qq.com.invalid> wrote:

> 您好:
> flink版本:1.13.1
> 部署方式:on k8s
>
> 向flink集群提交的sql:
> CREATE TABLE datagen (
> f_sequence INT,
> f_random INT,
> f_random_str STRING,
> ts AS localtimestamp,
> WATERMARK FOR ts AS ts
>   ) WITH (
> 'connector' = 'datagen',
> -- optional options --
> 'rows-per-second'='5',
> 'fields.f_sequence.kind'='sequence',
> 'fields.f_sequence.start'='1',
> 'fields.f_sequence.end'='500',
> 'fields.f_random.min'='1',
> 'fields.f_random.max'='500',
> 'fields.f_random_str.length'='10'
>   );
>
>   CREATE TABLE print_table (
> f_sequence INT,
> f_random INT,
> f_random_str STRING
> ) WITH (
> 'connector' = 'print'
>   );
>
>   INSERT INTO print_table select f_sequence,f_random,f_random_str from
> datagen;
>
>
> 想请问一下在flink web ui的stdout上发现没有打印一片空白,进入日志是有打印的,这是什么原因导致的?
> 我如何解决,让ui的stdout能把内容打印出来


Re: flink rest 接口部分数据不返回,返回空,状态200正常。

2022-05-31 文章 Weihua Hu
Hi, yidan
/watermark 是通过 Flink 内部 metric 进行采集的,为了防止每次 api 请求都进行 metric query,Flink
在内部实现了缓存机制,真实 query 的间隔可以通过参数[1] 控制,默认是 10s。
在Flink 内部查询 metric 时,如果失败会保存空记录,体现到 API 上就是返回了空的 list,可以尝试开启 DEBUG
日志来确认是否是由于 query metric 失败导致的

[1] metrics.fetcher.update-interval

Best,
Weihua


On Fri, May 20, 2022 at 12:54 PM yidan zhao  wrote:

> 部分任务估计是原先看过ui图,打开后相关数据都能看,但是数字不变。比如其中一个任务的输入节点部分:Records Sent
> 504,685,253,这个数字就不变了(但任务实际是在处理数据的),看网络请求也的确固定一直返回这个数据。
> 纯粹转圈不出数据的任务是新提交的任务。
>
> 按照以往,我重启jm可能解决这个问题。
>
> yidan zhao  于2022年5月20日周五 12:05写道:
> >
> > web ui图:https://s3.bmp.ovh/imgs/2022/05/20/dd142de9be3a2c99.png
> > 网络视图:https://i.bmp.ovh/imgs/2022/05/20/f3c741b28bd208d4.png
> >
> > JM1(rest server leader) 异常日志:
> > WARN  2022-05-20 12:02:12,523
> > org.apache.flink.runtime.checkpoint.CheckpointsCleaner   - Could
> > not properly discard completed checkpoint 22259.
> > java.io.IOException: Directory
> >
> bos://flink-bucket/flink/default-checkpoints/bal_baiduid_ft_job/b03390c8295713fbd79f57f57a1e3bdb/chk-22259
> > is not empty.
> > at
> org.apache.hadoop.fs.bos.BaiduBosFileSystem.delete(BaiduBosFileSystem.java:209)
> > ~[bos-hdfs-sdk-1.0.1-SNAPSHOT-0.jar:?]
> > at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.delete(HadoopFileSystem.java:160)
> > ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> > at
> org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:74)
> > ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> > at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discard(CompletedCheckpoint.java:263)
> > ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> > at
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:60)
> > ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> > at
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanup$2(CheckpointsCleaner.java:85)
> > ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > [?:1.8.0_251]
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > [?:1.8.0_251]
> > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
> > INFO  2022-05-20 12:03:22,441
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator-
> > Triggering checkpoint 21979 (type=CHECKPOINT) @ 1653019401517 for job
> > 07950b109ab5c3a0ed8576673ab562f7.
> > INFO  2022-05-20 12:03:31,061
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator-
> > Completed checkpoint 21979 for job 07950b109ab5c3a0ed8576673ab562f7
> > (1785911977 bytes in 9066 ms).
> >
> >
> > 如上,我web-ui是开启的,所有是一直有请求刷的,不存在相关异常(当然本身从请求返回码200来看也不像是异常)。
> >
> > Shengkai Fang  于2022年5月20日周五 10:50写道:
> > >
> > > 你好,图挂了,应该是需要图床工具。
> > >
> > > 另外,能否贴一下相关的异常日志呢?
> > >
> > > Best,
> > > Shengkai
> > >
> > > yidan zhao  于2022年5月20日周五 10:28写道:
> > >
> > > > UI视图:[image: 1.png].
> > > >
> > > > 网络视图:
> > > > [image: image.png]
> > > >
> > > >
> > > > 补充部分集群部署信息:
> > > > (1)flink1.13,standalone集群,基于zk做的HA。3 jm,若干tm。
> > > > (2)jm的rest api开启了ssl,基于 nginx
> > > > 做了代理转发(但大概率不会是机制问题,因为不是百分百出现此问题,我集群其他任务都正常,都是运行一段时间后会出现)。
> > > >  猜测:是否可能和运行一段时间后,出现jm进程挂掉,任务recover更换,rest jm的leader变换有关呢?
> > > > 目前来看部分jm的日志偶尔存在ssl握手相关报错,但也挺奇怪。  注意:我web
> > > > ui打开,看着jm的日志,是不出日志的(我是基于zk拿到leader,看leader jm的日志)。我web
> > > > ui一直刷,理论上如果出错日志应该有相关报错,但实际没报错,报错和这个无关,都是ckpt吧啦的。
> > > >
>


Re: Flink异步IO使用问题

2022-05-31 文章 Weihua Hu
Hi,

我在 ide 中尝试没有复现该问题,是可以使用 List 的。你代码中的 goodsDetailPage 是如何定义的?

Best,
Weihua


On Thu, May 26, 2022 at 8:59 PM lxk7...@163.com  wrote:

> 重发下图
>
> https://sm.ms/image/12XQHAOZdYoqraC
> https://sm.ms/image/zJ2gfmxvSc85Xl7
>
>
>
> lxk7...@163.com
>
> 发件人: lxk7...@163.com
> 发送时间: 2022-05-26 20:54
> 收件人: user-zh
> 主题: Flink异步IO使用问题
>
> 我在程序里使用了异步IO,但是好像识别不了这个list类型的数据
>
> lxk7...@163.com
>


Re: s3p 如果在本地调试

2022-05-19 文章 Weihua Hu
Hi,
你是在 IDEA 中运行吗?我增加相关的 pom 依赖后在 wordcount 中可以正常运行,可以 idea maven reload project 试试

Best,
Weihua

> 2022年5月19日 下午4:05,z y xing  写道:
> 
> 各位好:
> 了解实际运行是要复制jar到plugin下,但是调试的话用该怎么初始化s3p这个文件系统了?
> 
> flink版本 1.14,win10
> 项目通过flink-quick-start创建,在pom中添加了如下依赖
> 
> 
>   org.apache.flink
>   flink-s3-fs-presto
>   ${flink.version}
> 
> 
> 初始代码类似如下:
> 
> Configuration fileSystemConf = new Configuration();
> 
> fileSystemConf.setBoolean("presto.s3.connection.ssl.enabled", false);
> fileSystemConf.setString("presto.s3.access-key", "minioadmin");
> fileSystemConf.setString("presto.s3.secret-key", "minioadmin");
> fileSystemConf.setString("presto.s3.endpoint", "http://127.0.0.1:9000;);
> 
> FileSystem.initialize(fileSystemConf);
> 
> Path path = new Path("s3p://test/");
> System.out.println(path.getFileSystem().exists(path));
> 
> 但是会抛出如下异常:
> Exception in thread "main"
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 's3p'. The scheme is directly
> supported by Flink through the following plugin: flink-s3-fs-presto. Please
> ensure that each plugin resides within its own subfolder within the plugins
> directory. See
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for
> more information. If you want to use a Hadoop file system for that scheme,
> please add the scheme to the configuration fs.allowed-fallback-filesystems.
> For a full list of supported file systems, please see
> https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
> at org.example.StreamingJob.main(StreamingJob.java:58)
> 
> 但是神奇的是,我可以用s3a
> 初始化配置如下:
> 
> fileSystemConf.setBoolean("fs.s3a.connection.ssl.enabled", false);
> fileSystemConf.setString("fs.s3a.endpoint", "http://127.0.0.1:9000;);
> fileSystemConf.setString("fs.s3a.access.key", "minioadmin");
> fileSystemConf.setString("fs.s3a.secret.key", "minioadmin");
> fileSystemConf.setString("fs.s3a.path.style.access", "true");
> fileSystemConf.setString("fs.s3a.impl",
> "org.apache.hadoop.fs.s3a.S3AFileSystem");
> 
> 
> 谢谢!



Re: flink on k8s native开启ha后根据sp启动任务报错找不到job id 0000

2022-05-17 文章 Weihua Hu
Hi, shimin
用的哪个版本的 Flink?提交命令是什么呢?


Best,
Weihua

> 2022年5月17日 下午1:48,shimin huang  写道:
> 
> flink on native k8s根据savepoint停止任务后在根据savepoint启动任务报错找不到job
> 错误堆栈如下:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find
> Flink job ()
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
> at
> com.xxx.xxx..streaming.job.segment.xx.xxx.main(ProfileConditionJudgmentJob.java:150)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException:
> Could not find Flink job ()
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$requestJobStatus$14(Dispatcher.java:596)
> at java.util.Optional.orElseGet(Optional.java:267)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.requestJobStatus(Dispatcher.java:590)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ... 4 common frames omitted
> 2022-05-17 13:43:28.676 [flink-akka.actor.default-dispatcher-4] WARN
> o.a.f.c.d.application.ApplicationDispatcherBootstrap  - Application failed
> unexpectedly:
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find
> Flink job ()
> at
> 

Re: CheckPoint Dir 路径下引发的一些问题

2020-06-04 文章 Weihua Hu
HI, Px New

1. Checkpoint 保存数量可以通过参数: state.checkpoints.num-retained 来控制,默认是 1
2. _metadata 只是一些元数据,保存了state 的句柄,其他文件是 state 数据,由各 Task 在触发 checkpoint 
的时候上传。相反在恢复 checkpoint 的时候JM 读取_metadata 将相应句柄下发到 Task,Task 通过远端 HDFS 拉取对应的 
state。


Best
Weihua Hu

> 2020年6月5日 13:36,Px New <15701181132mr@gmail.com> 写道:
> 
> Hi everyOne 有一个关于CheckPoint相关的一个问题:
> 1.我在项目中使用的状态后端为:Fsstatebackend
> 2.我在jobManager的log输出找到相应的job ID后 去对应的HDFS 找到了对应的chk目录
> 3.但我有两个疑问: 
> 3.1.没有设置 chk的存储数默认是多保留多少份呢(我这边看到保留了近20次的chk)?
> 3.2 当我点进具体的chk-id 后 发现有很多文件[见2图] 我清楚的是当任务发生异常后tesk 会从hdfs 将_metadata 
> 下载后进行任务恢复操作的,那其他的哪些文件是如何产生的?以及有什么作用呢?
> 期待回复:
> 
> 
> 



Re: flink1.10整合hbase测试遇到的问题

2020-06-04 文章 Weihua Hu
可以尝试把依赖包 shaded 到你的 jar 包里,保证依赖的完整

Best
Weihua Hu

> 2020年6月3日 22:52,liunaihua521  写道:
> 
> 
> 
> - 转发邮件信息 -
> 
> 发件人: liunaihua521  <mailto:liunaihua...@163.com>
> 发送日期: 2020年6月3日 22:18
> 发送至: user-zh-i...@flink.apache.org   
> <mailto:user-zh-i...@flink.apache.org>、 user-zh-...@flink.apache.org 
>  <mailto:user-zh-...@flink.apache.org>
> 主题: flink1.10整合hbase测试遇到的问题
> hi!
> 版本说明:
> flink版本1.10
> HBase版本2.2.4
> ZK版本3.6.1
> Hadoop版本2.10.0
> 
> 程序说明:
> 
> 程序是简单的实现RichSourceFunction和RichSinkFunction,读取和写入hbase,程序打包后上传standalone模式的集群.
> 
> 报错说明:
> 提交任务后,总是报如下错误(附件附文本):
> <1B135BA0-EF16-482E-9388-BB058AB2C06C.png>
> 或者
> <0BC55A5B-26B0-4BD5-A3DF-33985EDCE3E7.png>
> 
> 尝试如下:
> 尝试一:
> flink的lib下有如下jar包:
> <44DE6BF1-09DA-44F5-B0A8-704B3539B675.png>
> 提交的jar包中发现没有下面连个类
> 执行后报错
> 
> 尝试二:
> 将guava-11.0.2.jar包移动到hadoop的lib下,再次执行依然报错
> 
> 尝试结果:
> 反复尝试都一致报错,求大神们指点,再此先谢谢了!
> 
> 



Re: 在yarn-session模式下怎么用rest api 触发savepoint并停止任务

2020-06-03 文章 Weihua Hu
HI, Junbao

 可以参考 API 文档检查一下 HTTP method 是否正确
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-stop
 
<https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-stop>


Best
Weihua Hu

> 2020年6月1日 16:21,wind.fly@outlook.com 写道:
> 
> Hi,all:
> 本人当前用的flink版本1.10,通过yarn-session发布job,通过jobs/job/stop api 停止任务会报Unable to 
> load requested file,问一下在yarn-session模式下没有这个api吗?
> 
> Best,
> Junbao Zhang



Re: 任务假死

2020-04-27 文章 Weihua Hu
你配置的 jobmanager.execution.failover-strategy 是什么呢?如果是 region 的话,作业不会因为 Task 
失败状态变为异常。
可以在WEB ui 进入作业拓扑查看单个 task 的状态


Best
Weihua Hu

> 2020年4月26日 11:43,yanggang_it_job  写道:
> 
> 感谢您的回复,这个问题和您刚才给我的场景有些相似,但还是有些许差异。
> 刚才试了几种方式,图片好像都无法访问。
> 下面我详细介绍下异常情况
> 1、我的任务是从三个kafka读取,然后通过onGroup实现left 
> join语义,然后定义了一个滑动窗口(600,10),最后通过一个CoGroupFunction进行处理具体的数据
> 2、异常出现在其中一个CoGruopFunction(Window(TumblingEventTimeWindows(60), 
> EventTimeTrigger, CoGroupWindowFunction) (15/200))报OOM,异常栈如下
>  java.lang.OutOfMemoryError: unable to create newnative thread
>at java.lang.Thread.start0(NativeMethod)
>at java.lang.Thread.start(Thread.java:717)
>at 
> java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
>at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1237)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
>at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
>at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:137)
>at 
> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
>at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
>at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>at java.lang.Thread.run(Thread.java:748)
> 
> 3、除了这个算子vertice为FAILED,其他vertice都为CANCELED,JobManager状态为RUNNING
> 
> 
> 正常情况下出现这个错,JM会找一台合适的机器重新把TM启起来或者多次尝试后,任务退出。
> 但是现在任务的运行状态为RUNNING,虽然为RUNNING但是也不写入数据到下游存储。
> 
> 
> 
> 
> 
> 
> 
> thanks
> 
> 
> 在 2020-04-26 11:01:04,"Zhefu PENG"  写道:
>> 图好像挂了看不到。是不是和这两个场景描述比较相似
>> 
>> [1] http://apache-flink.147419.n8.nabble.com/flink-kafka-td2386.html
>> [2]  http://apache-flink.147419.n8.nabble.com/Flink-Kafka-td2390.html
>> On Sun, Apr 26, 2020 at 10:58 yanggang_it_job 
>> wrote:
>> 
>>> 1、Flink-UI截图
>>> 我任务的启动配置是 -p 200 -ys 5,也就是说会有40个TM运行,之前正常运行的时候确实是40个TM,而现在只有1个TM在运行;
>>> 同时通过观察数据的同步时间,任务已经在两天前停止写入数据了,但是查看任务的状态却是RUNNING;
>>> 我的理解是当tm申请失败,那么当前任务就会退出并把状态设置为FINISHED,但是真实情况却是上面描述那样。
>>> 请问为什么会出现这种情况呢?
>>> 
>>> thanks
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 



Re: 关于flink run -m yarn提交失败。flink1.9

2020-04-16 文章 Weihua Hu
Hi, dlguanyq

Deployment took more than 60 seconds.
——
这个日志表示已经将 Application 提交到 Yarn 了,但是 AM 一直没有启动,这一步和perjob模式或者 yarn-session 
模式关系不大。
可以用 -yd 多试几次,还是不能成功的话,需要检查下 yarn 的日志

Best
Weihua Hu

> 2020年4月15日 09:06,guanyq  写道:
> 
> 使用的是perjob模式提交作业,没有使用yarn-seesion。为什么perjob模式提交有这个-yd参数会有问题,还是没太懂。
> 在 2020-04-15 08:52:11,"tison"  写道:
>> -yd 参数影响的是你是否使用 perjob 模式提交作业,简单地说
>> 
>> with -yd 以 perjob 模式提交作业,即启动一个新集群
>> without -yd 提交到一个现有的 Flink on YARN 集群
>> 
>> 哪个是你的需求呢?有没有实现用 yarn-session 启动 Flink on YARN 集群呢?
>> 
>> Best,
>> tison.
>> 
>> 
>> guanyq  于2020年4月15日周三 上午8:46写道:
>> 
>>> 提交失败,本人测试与-yd参数有关系,这个参数去掉就可以提交了。但是不知道 -yd这个参数影响了什么?
>>> At 2020-04-14 15:31:00, "guanyq"  wrote:
>>>> 提交失败,yarn资源也还有很多,为什么会提交失败呢?
>>>> 
>>>> 提交脚本
>>>> ./bin/flink run -m yarn-cluster \
>>>> -ynm TestDataProcess \
>>>> -yd \
>>>> -yn 2 \
>>>> -ytm 1024 \
>>>> -yjm 1024 \
>>>> -c com.data.processing.unconditionalacceptance.TestDataProcess \
>>>> ./tasks/UnconditionalAcceptanceDataProcess.jar \
>>>> 
>>>> 
>>>> yarn资源
>>>> Apps Submitted Apps PendingApps RunningApps Completed  Containers
>>> Running  Memory Used Memory TotalMemory Reserved VCores Used
>>> VCores TotalVCores Reserved Active NodesDecommissioned Nodes
>>> Lost Nodes  Unhealthy Nodes Rebooted Nodes
>>>> 2390   12  227 173 334 GB  1.42 TB 0 B 173
>>> 288 0   9   0   0   0   0
>>>> 
>>>> 
>>>> 
>>>> 2020-04-14 15:14:19,002 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
>>>> 2020-04-14 15:14:19,253 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
>>>> 2020-04-14 15:14:19,504 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
>>>> 2020-04-14 15:14:19,755 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
>>>> 2020-04-14 15:14:20,006 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
>>>> 2020-04-14 15:14:20,257 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
>>>> 2020-04-14 15:14:20,508 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
>>>> 2020-04-14 15:14:20,759 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
>>>> 2020-04-14 15:14:21,011 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
>>>> 2020-04-14 15:14:21,262 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
>>>> 2020-04-14 15:14:21,513 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
>>>> 2020-04-14 15:14:21,764 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
>>>> 2020-04-14 15:14:22,015 INFO

Re: flink-1.10.0通过run -m yarn-cluster提交任务时异常

2020-02-17 文章 Weihua Hu
Hi, amenhub

你应该是要把作业提交到 yarn 上吧。这个错误应该没有正确的加载 FlinkYarnSessionCli 
导致的,这些日志不是失败的根因。可以多提供一些日志看看。


Best
Weihua Hu

> 2020年2月18日 10:56,amenhub  写道:
> 
> parseHostPortAddress