回复: flink-checkpoint 问题

2024-01-11 文章
看现象是这样,谢了,我抽空看下这块源码


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Zakelly Lan |
| 发送日期 | 2024年1月11日 16:33 |
| 收件人 |  |
| 主题 | Re: flink-checkpoint 问题 |
看了下代码,这个问题有可能的原因是:
1. flink是先创建chk目录,然后再打 Triggering checkpoint 的 log
的,所以有概率是目录创建了,但是log没输出trigger
2. 作业失败,和触发下一个cp,这是两个异步线程,所以有可能是先执行了创建25548目录的操作然后作业再失败,然后trigger
25548还没输出就退了。

版本1.14.5之后代码已经把上述1行为改了,先打log再创建目录,就不会有这样奇怪的问题了。



On Thu, Jan 11, 2024 at 3:03 PM 吴先生 <15951914...@163.com> wrote:

TM日志:
2023-12-31 18:50:11.180 [flink-akka.actor.default-dispatcher-26] INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task
and sending final execution state CANCELED to JobManager for task
ChargeRangeBroadcastFunction -> Timestamps/Watermarks (4/6)#0
e960208bbd95b1b219bafe4887b48392.
2023-12-31 18:50:11.232 [Flink Netty Server (288) Thread 0] ERROR
o.a.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered
error while consuming partitions
java.nio.channels.ClosedChannelException: null
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:606)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1352)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:472)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:957)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.close(AbstractChannel.java:232)
at org.apache.flink.runtime.io
.network.netty.PartitionRequestQueue.close(PartitionRequestQueue.java:134)
at org.apache.flink.runtime.io
.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:160)
at org.apache.flink.runtime.io
.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:47)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)


JM日志,没有25548的触发记录:
2023-12-31 18:39:10.664 [jobmanager-future-thread-20] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 25546 for job d12f3c6e836f56fb23d96e31737ff0b3 (411347

回复: flink-checkpoint 问题

2024-01-10 文章
0b3 expired before completing.
2023-12-31 18:50:10.698 [flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from a global 
failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
 at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90)
 at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1760)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1733)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1870)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)




checkpoing路径下有:
25546:正常
25547:无
25548:有,路径下为空




任务人为从25548恢复时失败,抛出异常找不到_metadate文件


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Xuyang |
| 发送日期 | 2024年1月11日 14:55 |
| 收件人 |  |
| 主题 | Re:回复: flink-checkpoint 问题 |
Hi, 你的图挂了,可以用图床处理一下,或者直接贴log。




--

Best!
Xuyang




在 2024-01-11 13:40:43,"吴先生" <15951914...@163.com> 写道:

JM中chk失败时间点日志,没有25548的触发记录:


自动recovery失败:


TM日志:


checkpoint文件路径,25548里面空的:


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Zakelly Lan |
| 发送日期 | 2024年1月10日 18:20 |
| 收件人 |  |
| 主题 | Re: flink-checkpoint 问题 |
你好,
方便的话贴一下jobmanager的log吧,应该有一些线索


On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote:

Flink版本: 1.12
checkpoint配置:hdfs

现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的




回复: flink-checkpoint 问题

2024-01-10 文章
JM中chk失败时间点日志,没有25548的触发记录:


自动recovery失败:


TM日志:


checkpoint文件路径,25548里面空的:


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Zakelly Lan |
| 发送日期 | 2024年1月10日 18:20 |
| 收件人 |  |
| 主题 | Re: flink-checkpoint 问题 |
你好,
方便的话贴一下jobmanager的log吧,应该有一些线索


On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote:

Flink版本: 1.12
checkpoint配置:hdfs

现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的




flink-checkpoint 问题

2024-01-10 文章
Flink版本: 1.12
checkpoint配置:hdfs
现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的



回复:flink-metrics如何获取applicationid

2023-09-11 文章
请问好使吗,怎么使用的


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | allanqinjy |
| 发送日期 | 2023年8月30日 20:02 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | 回复:flink-metrics如何获取applicationid |
多谢了,明天改一下代码试试
 回复的原邮件 
| 发件人 | Feng Jin |
| 发送日期 | 2023年08月30日 19:42 |
| 收件人 | user-zh |
| 主题 | Re: flink-metrics如何获取applicationid |
hi,

可以尝试获取下 _APP_ID  这个 JVM 环境变量.
System.getenv(YarnConfigKeys.ENV_APP_ID);

https://github.com/apache/flink/blob/6c9bb3716a3a92f3b5326558c6238432c669556d/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java#L28


Best,
Feng

On Wed, Aug 30, 2023 at 7:14 PM allanqinjy  wrote:

hi,
请教大家一个问题,就是在上报指标到prometheus时候,jobname会随机生成一个后缀,看源码也是new Abstract
ID(),有方法在这里获取本次上报的作业applicationid吗?


自定义trigger触发问题

2023-05-28 文章
自定义的trigger,实现满足maxcount或者到达窗口结束时间时输出结果;
问题:同一个窗口,在代码窗口结束时onProcessingTime会触发多次,理论上每个爽口只应该在到达窗口结束时间触发一次,是什么原因
主类代码片段:
SingleOutputStreamOperator> windowMap =

afterMap.timeWindowAll(Time.seconds(5))
.trigger(new CountAndProcessingTimeTrigger(
100))
.process(simpleConfig.getWindowFunction().newInstance())
触发器代码:


public class CountAndProcessingTimeTrigger extends Trigger {
private static final long serialVersionUID = 1L;
//窗口最大个数
private final long maxCount;
private final ReducingStateDescriptor stateDesc;
public CountAndProcessingTimeTrigger(long maxCount) {
this.stateDesc = new ReducingStateDescriptor<>("count_time", new 
CountAndProcessingTimeTrigger.Sum(),
LongSerializer.INSTANCE);
this.maxCount = maxCount;
}
/**
 * 元素添加
 *
 * @param o 元素
 * @param timestamp timestamp
 * @param window window
 * @param triggerContext triggerContext
 * @return TriggerResult
 * CONTINUE:表示啥都不做。
 * FIRE:表示触发计算,同时保留窗口中的数据
 * PURGE:简单地删除窗口的内容,并保留关于窗口和任何触发器状态的任何潜在元信息。
 * FIRE_AND_PURGE:触发计算,然后清除窗口中的元素。
 * @throws Exception Exception
 */
@Override
public TriggerResult onElement(Object o, long timestamp, TimeWindow window, 
TriggerContext triggerContext)
throws Exception {
triggerContext.registerProcessingTimeTimer(window.maxTimestamp());
ReducingState countState = 
triggerContext.getPartitionedState(stateDesc);
countState.add(1L);
if (countState.get() >= maxCount) {
log.info("countTrigger: {}", countState.get());
countState.clear();
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
/**
 * 窗口关闭
 *
 * @param timestamp timestamp
 * @param window window
 * @param triggerContext triggerContext
 * @return TriggerResult
 * @throws Exception Exception
 */
@Override
public TriggerResult onProcessingTime(long timestamp, TimeWindow window, 
TriggerContext triggerContext)
throws Exception {
ReducingState countState = 
triggerContext.getPartitionedState(stateDesc);
log.info("timeTrigger: {}, currentProcessingTime:{}", countState.get(),
window.maxTimestamp());
countState.clear();
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long timestamp, TimeWindow window, 
TriggerContext triggerContext)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public boolean canMerge() {
return false;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
ctx.mergePartitionedState(stateDesc);
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
ctx.registerProcessingTimeTimer(windowMaxTimestamp);
}
}
/**
 * 窗口删除
 *
 * @param window window
 * @param triggerContext triggerContext
 * @throws Exception Exception
 */
@Override
public void clear(TimeWindow window, TriggerContext triggerContext) throws 
Exception {
triggerContext.deleteProcessingTimeTimer(window.maxTimestamp());
triggerContext.getPartitionedState(stateDesc).clear();
}
/**
 * 计数方法
 */
private static class Sum implements ReduceFunction {
private static final long serialVersionUID = 1L;
private Sum() {
}
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}


| |
吴先生
|
|
15951914...@163.com
|

回复: Flink-Sql Watermarkers问题

2023-03-13 文章
好的感谢,我关注下


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年3月13日 18:49 |
| 收件人 |  |
| 主题 | Re: Flink-Sql Watermarkers问题 |
Hi

目前sql只能在create table时指定,不过有新的扩展功能,相关FLIP正在讨论中,你可以关注一下
https://cwiki.apache.org/confluence/display/FLINK/FLIP-296%3A+Extend+watermark-related+features+for+SQL

Best,
Shammon.FY

On Mon, Mar 13, 2023 at 6:29 PM 吴先生 <15951914...@163.com> wrote:

hi,
我在使用Flink-Sql 1.14版本时能否不在create table处指定watermarkers,因为源数据需要做一些清洗之后再指定水位线


| |
吴先生
|
|
15951914...@163.com
|


Flink-Sql Watermarkers问题

2023-03-13 文章
hi,
我在使用Flink-Sql 1.14版本时能否不在create table处指定watermarkers,因为源数据需要做一些清洗之后再指定水位线


| |
吴先生
|
|
15951914...@163.com
|

回复: Flink内存问题

2023-03-02 文章
感谢,我看下


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Weihua Hu |
| 发送日期 | 2023年3月3日 10:37 |
| 收件人 |  |
| 主题 | Re: Flink内存问题 |
Hi,

针对问题 2, 可以增加下列环境变量来排除 Glibc 的问题,详情可以参考[1]

containerized.master.env.MALLOC_ARENA_MAX: 1

containerized.taskmanager.env.MALLOC_ARENA_MAX: 1

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_trouble/

Best,
Weihua


On Thu, Mar 2, 2023 at 8:10 PM 吴先生 <15951914...@163.com> wrote:

Hi,
目前分析问题应该在堆外,大概率是managed和overhead这两部分,这两部分的内存分配比例都是默认配置,通过网上的相关资料来看有两种解决方案:
1、调大managed和overhead这两块的内存比例,
问题:调整多大合适?是否调整之后还会持续增长
2、还有另一种说法是glibc内存分配器有个64M的问题引起(这里可有深入研究),替换为jemalloc可避免
问题:有具体的知道方案吗


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年3月2日 19:24 |
| 收件人 |  |
| 主题 | Re: Flink内存问题 |
Hi


如果有搜集metrics,可以根据metrics查看一下是哪部分内存上涨导致container被kill掉;然后将上涨比较快的container内存dump一下,查看具体是哪些对象占用内存比较多

Best,
Shammon


On Thu, Mar 2, 2023 at 7:14 PM 吴先生 <15951914...@163.com> wrote:

Hi,
Flink版本:1.12
部署模式:on yarn per-job
开发方式:DataStream Api
状态后端:RocksDB
Job逻辑为一个15分钟的窗口计算,任务在运行一段时间后会出现内存使用超限,container被yarn
kill的现象,目前有不少任务都会存在类似问题:
Closing TaskExecutor connection container_e02_1654567136606_1034_01_12
because: [2023-03-02 08:12:44.794]Container
[pid=11455,containerID=container_e02_1654567136606_1034_01_12] is
running 745472B beyond the 'PHYSICAL' memory limit. Current usage: 8.0 GB
of 8 GB physical memory used; 10.0 GB of 40 GB virtual memory used. Killing
container.
请问:
该如何排查及优化


| |
吴先生
|
|
15951914...@163.com
|



回复: Flink内存问题

2023-03-02 文章
Hi,
目前分析问题应该在堆外,大概率是managed和overhead这两部分,这两部分的内存分配比例都是默认配置,通过网上的相关资料来看有两种解决方案:
1、调大managed和overhead这两块的内存比例,
问题:调整多大合适?是否调整之后还会持续增长
2、还有另一种说法是glibc内存分配器有个64M的问题引起(这里可有深入研究),替换为jemalloc可避免
问题:有具体的知道方案吗


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年3月2日 19:24 |
| 收件人 |  |
| 主题 | Re: Flink内存问题 |
Hi

如果有搜集metrics,可以根据metrics查看一下是哪部分内存上涨导致container被kill掉;然后将上涨比较快的container内存dump一下,查看具体是哪些对象占用内存比较多

Best,
Shammon


On Thu, Mar 2, 2023 at 7:14 PM 吴先生 <15951914...@163.com> wrote:

Hi,
Flink版本:1.12
部署模式:on yarn per-job
开发方式:DataStream Api
状态后端:RocksDB
Job逻辑为一个15分钟的窗口计算,任务在运行一段时间后会出现内存使用超限,container被yarn
kill的现象,目前有不少任务都会存在类似问题:
Closing TaskExecutor connection container_e02_1654567136606_1034_01_12
because: [2023-03-02 08:12:44.794]Container
[pid=11455,containerID=container_e02_1654567136606_1034_01_12] is
running 745472B beyond the 'PHYSICAL' memory limit. Current usage: 8.0 GB
of 8 GB physical memory used; 10.0 GB of 40 GB virtual memory used. Killing
container.
请问:
该如何排查及优化


| |
吴先生
|
|
15951914...@163.com
|


Flink内存问题

2023-03-02 文章
Hi,
Flink版本:1.12
部署模式:on yarn per-job
开发方式:DataStream Api
状态后端:RocksDB
Job逻辑为一个15分钟的窗口计算,任务在运行一段时间后会出现内存使用超限,container被yarn kill的现象,目前有不少任务都会存在类似问题:
Closing TaskExecutor connection container_e02_1654567136606_1034_01_12 
because: [2023-03-02 08:12:44.794]Container 
[pid=11455,containerID=container_e02_1654567136606_1034_01_12] is running 
745472B beyond the 'PHYSICAL' memory limit. Current usage: 8.0 GB of 8 GB 
physical memory used; 10.0 GB of 40 GB virtual memory used. Killing container.
请问:
该如何排查及优化


| |
吴先生
|
|
15951914...@163.com
|