回复: flink-checkpoint 问题
看现象是这样,谢了,我抽空看下这块源码 | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | Zakelly Lan | | 发送日期 | 2024年1月11日 16:33 | | 收件人 | | | 主题 | Re: flink-checkpoint 问题 | 看了下代码,这个问题有可能的原因是: 1. flink是先创建chk目录,然后再打 Triggering checkpoint 的 log 的,所以有概率是目录创建了,但是log没输出trigger 2. 作业失败,和触发下一个cp,这是两个异步线程,所以有可能是先执行了创建25548目录的操作然后作业再失败,然后trigger 25548还没输出就退了。 版本1.14.5之后代码已经把上述1行为改了,先打log再创建目录,就不会有这样奇怪的问题了。 On Thu, Jan 11, 2024 at 3:03 PM 吴先生 <15951914...@163.com> wrote: TM日志: 2023-12-31 18:50:11.180 [flink-akka.actor.default-dispatcher-26] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state CANCELED to JobManager for task ChargeRangeBroadcastFunction -> Timestamps/Watermarks (4/6)#0 e960208bbd95b1b219bafe4887b48392. 2023-12-31 18:50:11.232 [Flink Netty Server (288) Thread 0] ERROR o.a.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered error while consuming partitions java.nio.channels.ClosedChannelException: null at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:606) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1352) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:472) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:957) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.close(AbstractChannel.java:232) at org.apache.flink.runtime.io .network.netty.PartitionRequestQueue.close(PartitionRequestQueue.java:134) at org.apache.flink.runtime.io .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:160) at org.apache.flink.runtime.io .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:47) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748) JM日志,没有25548的触发记录: 2023-12-31 18:39:10.664 [jobmanager-future-thread-20] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 25546 for job d12f3c6e836f56fb23d96e31737ff0b3 (411347
回复: flink-checkpoint 问题
0b3 expired before completing. 2023-12-31 18:50:10.698 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from a global failure. org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90) at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1760) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1733) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1870) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) checkpoing路径下有: 25546:正常 25547:无 25548:有,路径下为空 任务人为从25548恢复时失败,抛出异常找不到_metadate文件 | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | Xuyang | | 发送日期 | 2024年1月11日 14:55 | | 收件人 | | | 主题 | Re:回复: flink-checkpoint 问题 | Hi, 你的图挂了,可以用图床处理一下,或者直接贴log。 -- Best! Xuyang 在 2024-01-11 13:40:43,"吴先生" <15951914...@163.com> 写道: JM中chk失败时间点日志,没有25548的触发记录: 自动recovery失败: TM日志: checkpoint文件路径,25548里面空的: | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | Zakelly Lan | | 发送日期 | 2024年1月10日 18:20 | | 收件人 | | | 主题 | Re: flink-checkpoint 问题 | 你好, 方便的话贴一下jobmanager的log吧,应该有一些线索 On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote: Flink版本: 1.12 checkpoint配置:hdfs 现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的
回复: flink-checkpoint 问题
JM中chk失败时间点日志,没有25548的触发记录: 自动recovery失败: TM日志: checkpoint文件路径,25548里面空的: | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | Zakelly Lan | | 发送日期 | 2024年1月10日 18:20 | | 收件人 | | | 主题 | Re: flink-checkpoint 问题 | 你好, 方便的话贴一下jobmanager的log吧,应该有一些线索 On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote: Flink版本: 1.12 checkpoint配置:hdfs 现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的
flink-checkpoint 问题
Flink版本: 1.12 checkpoint配置:hdfs 现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的
回复:flink-metrics如何获取applicationid
请问好使吗,怎么使用的 | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | allanqinjy | | 发送日期 | 2023年8月30日 20:02 | | 收件人 | user-zh@flink.apache.org | | 主题 | 回复:flink-metrics如何获取applicationid | 多谢了,明天改一下代码试试 回复的原邮件 | 发件人 | Feng Jin | | 发送日期 | 2023年08月30日 19:42 | | 收件人 | user-zh | | 主题 | Re: flink-metrics如何获取applicationid | hi, 可以尝试获取下 _APP_ID 这个 JVM 环境变量. System.getenv(YarnConfigKeys.ENV_APP_ID); https://github.com/apache/flink/blob/6c9bb3716a3a92f3b5326558c6238432c669556d/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java#L28 Best, Feng On Wed, Aug 30, 2023 at 7:14 PM allanqinjy wrote: hi, 请教大家一个问题,就是在上报指标到prometheus时候,jobname会随机生成一个后缀,看源码也是new Abstract ID(),有方法在这里获取本次上报的作业applicationid吗?
自定义trigger触发问题
自定义的trigger,实现满足maxcount或者到达窗口结束时间时输出结果; 问题:同一个窗口,在代码窗口结束时onProcessingTime会触发多次,理论上每个爽口只应该在到达窗口结束时间触发一次,是什么原因 主类代码片段: SingleOutputStreamOperator> windowMap = afterMap.timeWindowAll(Time.seconds(5)) .trigger(new CountAndProcessingTimeTrigger( 100)) .process(simpleConfig.getWindowFunction().newInstance()) 触发器代码: public class CountAndProcessingTimeTrigger extends Trigger { private static final long serialVersionUID = 1L; //窗口最大个数 private final long maxCount; private final ReducingStateDescriptor stateDesc; public CountAndProcessingTimeTrigger(long maxCount) { this.stateDesc = new ReducingStateDescriptor<>("count_time", new CountAndProcessingTimeTrigger.Sum(), LongSerializer.INSTANCE); this.maxCount = maxCount; } /** * 元素添加 * * @param o 元素 * @param timestamp timestamp * @param window window * @param triggerContext triggerContext * @return TriggerResult * CONTINUE:表示啥都不做。 * FIRE:表示触发计算,同时保留窗口中的数据 * PURGE:简单地删除窗口的内容,并保留关于窗口和任何触发器状态的任何潜在元信息。 * FIRE_AND_PURGE:触发计算,然后清除窗口中的元素。 * @throws Exception Exception */ @Override public TriggerResult onElement(Object o, long timestamp, TimeWindow window, TriggerContext triggerContext) throws Exception { triggerContext.registerProcessingTimeTimer(window.maxTimestamp()); ReducingState countState = triggerContext.getPartitionedState(stateDesc); countState.add(1L); if (countState.get() >= maxCount) { log.info("countTrigger: {}", countState.get()); countState.clear(); return TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; } /** * 窗口关闭 * * @param timestamp timestamp * @param window window * @param triggerContext triggerContext * @return TriggerResult * @throws Exception Exception */ @Override public TriggerResult onProcessingTime(long timestamp, TimeWindow window, TriggerContext triggerContext) throws Exception { ReducingState countState = triggerContext.getPartitionedState(stateDesc); log.info("timeTrigger: {}, currentProcessingTime:{}", countState.get(), window.maxTimestamp()); countState.clear(); return TriggerResult.FIRE_AND_PURGE; } @Override public TriggerResult onEventTime(long timestamp, TimeWindow window, TriggerContext triggerContext) throws Exception { return TriggerResult.CONTINUE; } @Override public boolean canMerge() { return false; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { ctx.mergePartitionedState(stateDesc); long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); } } /** * 窗口删除 * * @param window window * @param triggerContext triggerContext * @throws Exception Exception */ @Override public void clear(TimeWindow window, TriggerContext triggerContext) throws Exception { triggerContext.deleteProcessingTimeTimer(window.maxTimestamp()); triggerContext.getPartitionedState(stateDesc).clear(); } /** * 计数方法 */ private static class Sum implements ReduceFunction { private static final long serialVersionUID = 1L; private Sum() { } public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } } } | | 吴先生 | | 15951914...@163.com |
回复: Flink-Sql Watermarkers问题
好的感谢,我关注下 | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年3月13日 18:49 | | 收件人 | | | 主题 | Re: Flink-Sql Watermarkers问题 | Hi 目前sql只能在create table时指定,不过有新的扩展功能,相关FLIP正在讨论中,你可以关注一下 https://cwiki.apache.org/confluence/display/FLINK/FLIP-296%3A+Extend+watermark-related+features+for+SQL Best, Shammon.FY On Mon, Mar 13, 2023 at 6:29 PM 吴先生 <15951914...@163.com> wrote: hi, 我在使用Flink-Sql 1.14版本时能否不在create table处指定watermarkers,因为源数据需要做一些清洗之后再指定水位线 | | 吴先生 | | 15951914...@163.com |
Flink-Sql Watermarkers问题
hi, 我在使用Flink-Sql 1.14版本时能否不在create table处指定watermarkers,因为源数据需要做一些清洗之后再指定水位线 | | 吴先生 | | 15951914...@163.com |
回复: Flink内存问题
感谢,我看下 | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | Weihua Hu | | 发送日期 | 2023年3月3日 10:37 | | 收件人 | | | 主题 | Re: Flink内存问题 | Hi, 针对问题 2, 可以增加下列环境变量来排除 Glibc 的问题,详情可以参考[1] containerized.master.env.MALLOC_ARENA_MAX: 1 containerized.taskmanager.env.MALLOC_ARENA_MAX: 1 [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_trouble/ Best, Weihua On Thu, Mar 2, 2023 at 8:10 PM 吴先生 <15951914...@163.com> wrote: Hi, 目前分析问题应该在堆外,大概率是managed和overhead这两部分,这两部分的内存分配比例都是默认配置,通过网上的相关资料来看有两种解决方案: 1、调大managed和overhead这两块的内存比例, 问题:调整多大合适?是否调整之后还会持续增长 2、还有另一种说法是glibc内存分配器有个64M的问题引起(这里可有深入研究),替换为jemalloc可避免 问题:有具体的知道方案吗 | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年3月2日 19:24 | | 收件人 | | | 主题 | Re: Flink内存问题 | Hi 如果有搜集metrics,可以根据metrics查看一下是哪部分内存上涨导致container被kill掉;然后将上涨比较快的container内存dump一下,查看具体是哪些对象占用内存比较多 Best, Shammon On Thu, Mar 2, 2023 at 7:14 PM 吴先生 <15951914...@163.com> wrote: Hi, Flink版本:1.12 部署模式:on yarn per-job 开发方式:DataStream Api 状态后端:RocksDB Job逻辑为一个15分钟的窗口计算,任务在运行一段时间后会出现内存使用超限,container被yarn kill的现象,目前有不少任务都会存在类似问题: Closing TaskExecutor connection container_e02_1654567136606_1034_01_12 because: [2023-03-02 08:12:44.794]Container [pid=11455,containerID=container_e02_1654567136606_1034_01_12] is running 745472B beyond the 'PHYSICAL' memory limit. Current usage: 8.0 GB of 8 GB physical memory used; 10.0 GB of 40 GB virtual memory used. Killing container. 请问: 该如何排查及优化 | | 吴先生 | | 15951914...@163.com |
回复: Flink内存问题
Hi, 目前分析问题应该在堆外,大概率是managed和overhead这两部分,这两部分的内存分配比例都是默认配置,通过网上的相关资料来看有两种解决方案: 1、调大managed和overhead这两块的内存比例, 问题:调整多大合适?是否调整之后还会持续增长 2、还有另一种说法是glibc内存分配器有个64M的问题引起(这里可有深入研究),替换为jemalloc可避免 问题:有具体的知道方案吗 | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年3月2日 19:24 | | 收件人 | | | 主题 | Re: Flink内存问题 | Hi 如果有搜集metrics,可以根据metrics查看一下是哪部分内存上涨导致container被kill掉;然后将上涨比较快的container内存dump一下,查看具体是哪些对象占用内存比较多 Best, Shammon On Thu, Mar 2, 2023 at 7:14 PM 吴先生 <15951914...@163.com> wrote: Hi, Flink版本:1.12 部署模式:on yarn per-job 开发方式:DataStream Api 状态后端:RocksDB Job逻辑为一个15分钟的窗口计算,任务在运行一段时间后会出现内存使用超限,container被yarn kill的现象,目前有不少任务都会存在类似问题: Closing TaskExecutor connection container_e02_1654567136606_1034_01_12 because: [2023-03-02 08:12:44.794]Container [pid=11455,containerID=container_e02_1654567136606_1034_01_12] is running 745472B beyond the 'PHYSICAL' memory limit. Current usage: 8.0 GB of 8 GB physical memory used; 10.0 GB of 40 GB virtual memory used. Killing container. 请问: 该如何排查及优化 | | 吴先生 | | 15951914...@163.com |
Flink内存问题
Hi, Flink版本:1.12 部署模式:on yarn per-job 开发方式:DataStream Api 状态后端:RocksDB Job逻辑为一个15分钟的窗口计算,任务在运行一段时间后会出现内存使用超限,container被yarn kill的现象,目前有不少任务都会存在类似问题: Closing TaskExecutor connection container_e02_1654567136606_1034_01_12 because: [2023-03-02 08:12:44.794]Container [pid=11455,containerID=container_e02_1654567136606_1034_01_12] is running 745472B beyond the 'PHYSICAL' memory limit. Current usage: 8.0 GB of 8 GB physical memory used; 10.0 GB of 40 GB virtual memory used. Killing container. 请问: 该如何排查及优化 | | 吴先生 | | 15951914...@163.com |