TaskManager的Slot的释放时机

2022-01-24 文章 johnjlong
各位大佬好,请教一个问题。
我根据ResourceID主动释放TM的链接的时候,我发现TM对应的Slots仅仅是标记为free。
而其真正是释放却要等到JobMaster主动cancel整个ExecuteGraph的时候,此时会逐个调用每个定点所在的slot的TM的cancel方法。
但是此时相关联的TM已经close掉,触发了rpc超时,默认20s。然后slot才会被释放。


我的问题是:为什么不在调用TaskExecutor的cancelTask之间判断下TM是否存活,如果不存活就直接走cancel的流程,不用等rpc超时后,才进行下一步???


附上日志截图:


| |
johnjlong
|
|
johnjl...@163.com
|
签名由网易邮箱大师定制

退订

2022-01-24 文章 imnu205...@126.com
退订



imnu205...@126.com


Re:数据库Table Schema 转换为 Flink Schema

2022-01-24 文章 Michael Ran
table api 里面有 catalogTable 的实现
在 2022-01-24 16:50:25,"WuKong"  写道:
>hi all:
>   大家有没有了解, 社区或者其他渠道可以提供 将 数据库的建表语句(比如 Mysql Create table ) 自动转换为 Flink 的 
> Schema 对象(org.apache.flink.table.api.Schema) ,求推荐
>
>
>
>---
>Best,
>WuKong


Re: flink任务提交到集群执行一段时间报错Java heap space

2022-01-24 文章 Caizhi Weng
Hi!

你的 state backend 是 heap state backend 吗?如果是的话,Flink
流作业运行过程中的状态会存储在堆中,checkpoint 也会存储在堆中,确实有可能导致 OOM。可以尝试换成其他 state backend 看一下。

Liu Join  于2022年1月21日周五 13:20写道:

>
> 我已经将5s的时间窗口替换为100条的countWindowAll,具体实现为使用aggregate函数将窗口内的数据拼接为一条sql语句,sql语句如下:replace
> into table (a1,a2,a3,a4,..) values(…)
> 但还是没有解决,
> heap dump暂时无法提供,
> taskmanager内存分配如下:
> task heap:2.76G,network:343MB,JVMMetaspace:256MB
>
>
> 我一共运行了两个任务,都会出现这种问题,但之前写过一个简单的数据同步的程序没有出错,就是将一个MySQL库中的500张表同步到另一个MySQL库,不知道对于这种问题有没有解决的方向。
>
> 之前在监控任务运行时发现是MySQLsource先失败,然后导致整个任务挂了,在开启checkpoint时,MySQLsource和开窗之前的部分为一个parallelism,这个parallelism的checkpoint大小一直是136MB,从任务开始到结束都是136MB,其他运算的checkpoint不到1MB,是否有这部分原因
> 从 Windows 版邮件发送
>
> 发件人: Caizhi Weng
> 发送时间: 2022年1月21日 10:52
> 收件人: flink中文邮件组
> 主题: Re: flink任务提交到集群执行一段时间报错Java heap space
>
> Hi!
>
> 5s 的窗口拼接 sql 语句看起来比较可疑,具体是怎么实现的?另外可以把 task manager 的 heap dump
> 出来看一下哪里占比较多的堆内存。
>
> Liu Join  于2022年1月20日周四 13:28写道:
>
> > 环境:
> >
> >
> flink1.13.5,Standalone模式集群,jobmanager内存2GB,taskmanager内存4GB,集群包括一个jobmanager和两个taskmanager,每个taskmanager有2个slot。
> >
> >
> >
> 任务内容是读取2万张表的数据,数据每1分钟一条,每10分钟输出每张表的最后一条数据。代码中使用了map、filter、watermark、开了一个10分钟的滑动窗口,使用reduce获得最后一条数据,因为sink是mysql,配置不高,所以将最后一条数据拼成批量插入语句才往MySQL写入。开了一个5s的窗口用于拼接sql语句。
> >
> > 报错内容:
> > java.lang.OutOfMemoryError: Java heap space
> >
> > 报错表象:
> >
> >
> 整个taskmanager内存被占满,任务失败重启后taskmanager内存仍然是满的,导致任务再次失败。之后任务直接挂了。时间长了之后内存没释放,Taskmanager进程也会挂了。
> > 从 Windows 版邮件发送
> >
> >
>
>


Re: flink执行任务失败,taskmanager内存不释放

2022-01-24 文章 Caizhi Weng
Hi!

taskmanager 内存就一直上涨指的是堆内存吗?可以把 heap dump 出来看一下具体是哪里占用了内存。

Liu Join  于2022年1月21日周五 15:21写道:

> 环境:flink1.13.5,Standalone模式,taskmanager内存4GB,两个slot
>
>
> 任务数据量很少,不到10MB,任务执行时,taskmanager内存就一直上涨知道报错重启,但因为任务重启后taskmanager内存没有释放,导致任务彻底失败,taskmanager服务也挂了
>
> 从 Windows 版邮件发送
>
>


Re: flink 1.13.2 计算hive仓库数据时错误,NullPointerException

2022-01-24 文章 Caizhi Weng
Hi!

这看起来像是一个 bug,能否提供一下 hive 表的 DDL 还有运行的 query 语句,这样大家可以更好地调查这个问题?

Asahi Lee  于2022年1月24日周一 09:53写道:

> 2022-01-23 04:31:39,568 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  
> [] - Source:
> HiveSource-cosldatacenter.ods_rimdrill_dailyincidentsevents -
> Calc(select=[jobid, reportno, dayssincelast], where=[(idno = 1:BIGINT)])
> (1/1) (7533d77baa7eb16e8242ae63e0706dff) switched from RUNNING to CANCELING.
> 2022-01-23 04:31:39,570 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  
> [] - Discarding the results produced by task execution
> 07b2cd514c6b6d85f79ab5b953971f82.
> 2022-01-23 04:31:39,570 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  
> [] - MultipleInput(readOrder=[0,0,1],
> members=[\nHashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND
> ($f109 = reportno0))], select=[jobid, reportno, reportdate, depthprogress,
> numperstype1, numperstype2, numperstype3, numperstype4, numperstype5,
> numperstype6, currentops, futureops, dayssincelast, $f109, jobid0,
> reportno0, boptestdate, bopfunctiontestdate], build=[right])\n:-
> Calc(select=[jobid, reportno, reportdate, depthprogress, numperstype1,
> numperstype2, numperstype3, numperstype4, numperstype5, numperstype6,
> currentops, futureops, dayssincelast, bigint(reportno) AS $f109])\n:
> +- HashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND ($f94 =
> reportno0))], select=[jobid, reportno, reportdate, depthprogress,
> numperstype1, numperstype2, numperstype3, numperstype4, numperstype5,
> numperstype6, currentops, futureops, $f94, jobid0, reportno0,
> dayssincelast], build=[right])\n:  :- Calc(select=[jobid,
> reportno, reportdate, depthprogress, numperstype1, numperstype2,
> numperstype3, numperstype4, numperstype5, numperstype6, currentops,
> futureops, bigint(reportno) AS $f94])\n:  : +- [#3]
> Exchange(distribution=[hash[jobid]])\n:  +- [#2]
> Exchange(distribution=[hash[jobid]])\n+- [#1]
> Exchange(distribution=[hash[jobid]])\n]) - Calc(select=[jobid AS $f0,
> reportno AS $f1, dayssincelast AS $f2, depthprogress AS $f3, currentops AS
> $f4, futureops AS $f5, reportdate AS $f6, numperstype1 AS $f7, numperstype2
> AS $f8, numperstype3 AS $f9, numperstype4 AS $f10, numperstype5 AS $f11,
> numperstype6 AS $f12, boptestdate AS $f13, bopfunctiontestdate AS $f14])
> - HashAggregate(isMerge=[false], groupBy=[$f0, $f1, $f2, $f3, $f4, $f5,
> $f6, $f7, $f8, $f9, $f10, $f11, $f12], select=[$f0, $f1, $f2, $f3, $f4,
> $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12, MAX($f13) AS lasboptestdate,
> MAX($f14) AS lasbopfunctiontestdate]) - Calc(select=[$f0 AS jobid, $f1
> AS reportno, string($f6) AS reportdate, bigint((nvl($f7, 0) + nvl($f8,
> 0)) + nvl($f9, 0)) + nvl($f10, 0)) + nvl($f11, 0)) + nvl($f12, 0))) AS
> pobcnt, $f2 AS dayssincelast, $f3 AS depthprogress, $f4 AS currentops, $f5
> AS futureops, lasboptestdate, lasbopfunctiontestdate]) - Map -
> Sink: Unnamed (1/1) (3c555cbd6bf411a6111cf7eaab527d33) switched from
> CREATED to CANCELING.
> 2022-01-23 04:31:39,570 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  
> [] - MultipleInput(readOrder=[0,0,1],
> members=[\nHashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND
> ($f109 = reportno0))], select=[jobid, reportno, reportdate, depthprogress,
> numperstype1, numperstype2, numperstype3, numperstype4, numperstype5,
> numperstype6, currentops, futureops, dayssincelast, $f109, jobid0,
> reportno0, boptestdate, bopfunctiontestdate], build=[right])\n:-
> Calc(select=[jobid, reportno, reportdate, depthprogress, numperstype1,
> numperstype2, numperstype3, numperstype4, numperstype5, numperstype6,
> currentops, futureops, dayssincelast, bigint(reportno) AS $f109])\n:
> +- HashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND ($f94 =
> reportno0))], select=[jobid, reportno, reportdate, depthprogress,
> numperstype1, numperstype2, numperstype3, numperstype4, numperstype5,
> numperstype6, currentops, futureops, $f94, jobid0, reportno0,
> dayssincelast], build=[right])\n:  :- Calc(select=[jobid,
> reportno, reportdate, depthprogress, numperstype1, numperstype2,
> numperstype3, numperstype4, numperstype5, numperstype6, currentops,
> futureops, bigint(reportno) AS $f94])\n:  : +- [#3]
> Exchange(distribution=[hash[jobid]])\n:  +- [#2]
> Exchange(distribution=[hash[jobid]])\n+- [#1]
> Exchange(distribution=[hash[jobid]])\n]) - Calc(select=[jobid AS $f0,
> reportno AS $f1, dayssincelast AS $f2, depthprogress AS $f3, currentops AS
> $f4, futureops AS $f5, reportdate AS $f6, numperstype1 AS $f7, numperstype2
> AS $f8, numperstype3 AS $f9, numperstype4 AS $f10, numperstype5 AS $f11,
> numperstype6 AS $f12, boptestdate AS $f13, bopfunctiontestdate AS $f14])
> - HashAggregate(isMerge=[false], groupBy=[$f0, $f1, $f2, $f3, $f4, $f5,
> $f6, $f7, $f8, $f9, $f10, $f11, $f12], select=[$f0, $f1, $f2, $f3, $f4,
> $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12, MAX($f13) AS lasboptestdate,
> MAX($f14) AS lasbopfunctiontestdate]) - 

数据库Table Schema 转换为 Flink Schema

2022-01-24 文章 WuKong
hi all:
   大家有没有了解, 社区或者其他渠道可以提供 将 数据库的建表语句(比如 Mysql Create table ) 自动转换为 Flink 的 
Schema 对象(org.apache.flink.table.api.Schema) ,求推荐



---
Best,
WuKong