回复: flink-checkpoint 问题
看现象是这样,谢了,我抽空看下这块源码 | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | Zakelly Lan | | 发送日期 | 2024年1月11日 16:33 | | 收件人 | | | 主题 | Re: flink-checkpoint 问题 | 看了下代码,这个问题有可能的原因是: 1. flink是先创建chk目录,然后再打 Triggering checkpoint 的 log 的,所以有概率是目录创建了,但是log没输出trigger 2. 作业失败,和触发下一个cp,这是两个异步线程,所以有可能是先执行了创建25548目录的操作然后作业再失败,然后trigger 25548还没输出就退了。 版本1.14.5之后代码已经把上述1行为改了,先打log再创建目录,就不会有这样奇怪的问题了。 On Thu, Jan 11, 2024 at 3:03 PM 吴先生 <15951914...@163.com> wrote: TM日志: 2023-12-31 18:50:11.180 [flink-akka.actor.default-dispatcher-26] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state CANCELED to JobManager for task ChargeRangeBroadcastFunction -> Timestamps/Watermarks (4/6)#0 e960208bbd95b1b219bafe4887b48392. 2023-12-31 18:50:11.232 [Flink Netty Server (288) Thread 0] ERROR o.a.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered error while consuming partitions java.nio.channels.ClosedChannelException: null at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:606) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1352) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:472) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:957) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.close(AbstractChannel.java:232) at org.apache.flink.runtime.io .network.netty.PartitionRequestQueue.close(PartitionRequestQueue.java:134) at org.apache.flink.runtime.io .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:160) at org.apache.flink.runtime.io .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:47) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748) JM日志,没有25548的触发记录: 2023-12-31 18:39:10.664 [jobmanager-future-thread-20] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 25546 for job d12f3c6e836f56fb23d96e31737ff0b3 (411347
Re: flink-checkpoint 问题
748) > > > JM日志,没有25548的触发记录: > 2023-12-31 18:39:10.664 [jobmanager-future-thread-20] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 25546 for job d12f3c6e836f56fb23d96e31737ff0b3 (411347921 bytes > in 50128 ms). > 2023-12-31 18:40:10.681 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 25547 (type=CHECKPOINT) @ 1704019210665 for job > d12f3c6e836f56fb23d96e31737ff0b3. > 2023-12-31 18:50:10.681 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > 25547 of job d12f3c6e836f56fb23d96e31737ff0b3 expired before completing. > 2023-12-31 18:50:10.698 [flink-akka.actor.default-dispatcher-3] INFO > org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from a > global failure. > org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable > failure threshold. > at > org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90) > at > org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1760) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1733) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1870) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > > > > checkpoing路径下有: > 25546:正常 > 25547:无 > 25548:有,路径下为空 > > > > > 任务人为从25548恢复时失败,抛出异常找不到_metadate文件 > > > | | > 吴先生 > | > | > 15951914...@163.com > | > 回复的原邮件 > | 发件人 | Xuyang | > | 发送日期 | 2024年1月11日 14:55 | > | 收件人 | | > | 主题 | Re:回复: flink-checkpoint 问题 | > Hi, 你的图挂了,可以用图床处理一下,或者直接贴log。 > > > > > -- > > Best! > Xuyang > > > > > 在 2024-01-11 13:40:43,"吴先生" <15951914...@163.com> 写道: > > JM中chk失败时间点日志,没有25548的触发记录: > > > 自动recovery失败: > > > TM日志: > > > checkpoint文件路径,25548里面空的: > > > | | > 吴先生 > | > | > 15951914...@163.com > | > 回复的原邮件 > | 发件人 | Zakelly Lan | > | 发送日期 | 2024年1月10日 18:20 | > | 收件人 | | > | 主题 | Re: flink-checkpoint 问题 | > 你好, > 方便的话贴一下jobmanager的log吧,应该有一些线索 > > > On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote: > > Flink版本: 1.12 > checkpoint配置:hdfs > > > 现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的 > > >
回复: flink-checkpoint 问题
0b3 expired before completing. 2023-12-31 18:50:10.698 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from a global failure. org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90) at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1760) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1733) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1870) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) checkpoing路径下有: 25546:正常 25547:无 25548:有,路径下为空 任务人为从25548恢复时失败,抛出异常找不到_metadate文件 | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | Xuyang | | 发送日期 | 2024年1月11日 14:55 | | 收件人 | | | 主题 | Re:回复: flink-checkpoint 问题 | Hi, 你的图挂了,可以用图床处理一下,或者直接贴log。 -- Best! Xuyang 在 2024-01-11 13:40:43,"吴先生" <15951914...@163.com> 写道: JM中chk失败时间点日志,没有25548的触发记录: 自动recovery失败: TM日志: checkpoint文件路径,25548里面空的: | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | Zakelly Lan | | 发送日期 | 2024年1月10日 18:20 | | 收件人 | | | 主题 | Re: flink-checkpoint 问题 | 你好, 方便的话贴一下jobmanager的log吧,应该有一些线索 On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote: Flink版本: 1.12 checkpoint配置:hdfs 现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的
Re:回复: flink-checkpoint 问题
Hi, 你的图挂了,可以用图床处理一下,或者直接贴log。 -- Best! Xuyang 在 2024-01-11 13:40:43,"吴先生" <15951914...@163.com> 写道: JM中chk失败时间点日志,没有25548的触发记录: 自动recovery失败: TM日志: checkpoint文件路径,25548里面空的: | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | Zakelly Lan | | 发送日期 | 2024年1月10日 18:20 | | 收件人 | | | 主题 | Re: flink-checkpoint 问题 | 你好, 方便的话贴一下jobmanager的log吧,应该有一些线索 On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote: Flink版本: 1.12 checkpoint配置:hdfs 现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的
回复: flink-checkpoint 问题
JM中chk失败时间点日志,没有25548的触发记录: 自动recovery失败: TM日志: checkpoint文件路径,25548里面空的: | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | Zakelly Lan | | 发送日期 | 2024年1月10日 18:20 | | 收件人 | | | 主题 | Re: flink-checkpoint 问题 | 你好, 方便的话贴一下jobmanager的log吧,应该有一些线索 On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote: Flink版本: 1.12 checkpoint配置:hdfs 现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的
Re:flink-checkpoint 问题
我记得flink低版本有这个bug,会错误的删除某一个checkpoint的,你这个版本太老了,可以升级到新版本。 The following is the content of the forwarded email From:"吴先生" <15951914...@163.com> To:user-zh Date:2024-01-10 17:54:42 Subject:flink-checkpoint 问题 Flink版本: 1.12 checkpoint配置:hdfs 现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的
Re: flink-checkpoint 问题
你好, 方便的话贴一下jobmanager的log吧,应该有一些线索 On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote: > Flink版本: 1.12 > checkpoint配置:hdfs > > 现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的 > >
flink-checkpoint 问题
Flink版本: 1.12 checkpoint配置:hdfs 现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的
Re: flink 无法checkpoint问题
Hi! 图片无法显示,建议使用外部图床上传。 checkpoint 慢的原因可能有很多,最可能的原因是由于算子处理数据太慢导致反压(可以通过 Flink web UI 每个节点的 busy 百分比大致看出来)。建议检查资源是否充足,数据是否倾斜,gc 是否过于频繁等。 紫月幽魔灵 于2021年12月28日周二 10:38写道: > 版本:flink版本1.14.0 > 问题: 使用flink 1.14.0版本提交到jdk1.7版本的yarn集群上checkpoint无法生成,一直处于IN_PROGRESS状态 > 提交命令如下: > ./bin/flinksql-submit.sh \ > --sql sqlserver-cdc-to-kafka.sql \ > -m yarn-cluster \ > -ynm sqlserverTOkafka \ > -ys 2 \ > -yjm 1024 \ > -ytm 1024 \ > -yid application_1640657115196_0001 \ > -yD yarn.taskmanager.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \ > -yD containerized.master.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \ > -yD containerized.taskmanager.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \ > -yD log4j2.formatMsgNoLookups=true > 这是什么原因造成的呢? >
回复: checkpoint问题
感谢解答 smq 发件人: Yun Tang 发送时间: 2020年9月17日 10:30 收件人: user-zh 主题: Re: checkpoint问题 Hi checkpoint使用operator id进行一一映射进行恢复,请参照 设置id[1],以及如果checkpoint中存在某个operator但是修改后的作业并不存在该operator时的处理逻辑[2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state 祝好 唐云 From: smq <374060...@qq.com> Sent: Thursday, September 17, 2020 7:02 To: user-zh Subject: checkpoint问题 如果我的程序逻辑修改,还能用之前的checkpoint吗
Re: checkpoint问题
Hi checkpoint使用operator id进行一一映射进行恢复,请参照 设置id[1],以及如果checkpoint中存在某个operator但是修改后的作业并不存在该operator时的处理逻辑[2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state 祝好 唐云 From: smq <374060...@qq.com> Sent: Thursday, September 17, 2020 7:02 To: user-zh Subject: checkpoint问题 如果我的程序逻辑修改,还能用之前的checkpoint吗
checkpoint问题
如果我的程序逻辑修改,还能用之前的checkpoint吗
转发:Sql-client的checkpoint问题
Checkpoint只生成了shared和taskowned目录,没有chk,望解答,谢谢 | | king | | 邮箱:kingjinhe2...@163.com | Signature is customized by Netease Mail Master - 转发的邮件 - 发件人: king 发送日期: 2020年08月07日 09:05 收件人: user-zh 主题: 转发:Sql-client的checkpoint问题 抱歉,不是flink-site.yaml是flink-conf.yaml | | king | | 邮箱:kingjinhe2...@163.com | Signature is customized by Netease Mail Master - 转发的邮件 - 发件人: king 发送日期: 2020年08月07日 08:23 收件人: user-zh 主题: Sql-client的checkpoint问题 您好,flink1.11.0,请问, 1.sql-client 如何设置checkpoint时间(生成周期),在做file streaming时候hdfs文件一直In-progress处状态,不能Finalized 2.之前在flink-site.yaml文件中设置了checkpoint,systemfile类型,只生成了另外两个目录,没有生成chk,望解答,谢谢 以上问题在编程方式下无问题。 | | king | | 邮箱:kingjinhe2...@163.com | Signature is customized by Netease Mail Master
转发:Sql-client的checkpoint问题
抱歉,不是flink-site.yaml是flink-conf.yaml | | king | | 邮箱:kingjinhe2...@163.com | Signature is customized by Netease Mail Master - 转发的邮件 - 发件人: king 发送日期: 2020年08月07日 08:23 收件人: user-zh 主题: Sql-client的checkpoint问题 您好,flink1.11.0,请问, 1.sql-client 如何设置checkpoint时间(生成周期),在做file streaming时候hdfs文件一直In-progress处状态,不能Finalized 2.之前在flink-site.yaml文件中设置了checkpoint,systemfile类型,只生成了另外两个目录,没有生成chk,望解答,谢谢 以上问题在编程方式下无问题。 | | king | | 邮箱:kingjinhe2...@163.com | Signature is customized by Netease Mail Master
Sql-client的checkpoint问题
您好,flink1.11.0,请问, 1.sql-client 如何设置checkpoint时间(生成周期),在做file streaming时候hdfs文件一直In-progress处状态,不能Finalized 2.之前在flink-site.yaml文件中设置了checkpoint,systemfile类型,只生成了另外两个目录,没有生成chk,望解答,谢谢 以上问题在编程方式下无问题。 | | king | | 邮箱:kingjinhe2...@163.com | Signature is customized by Netease Mail Master
Re:Re:FLINK Checkpoint 问题咨询
1.是采用增量方式 RocksDBStateBackend backend = new RocksDBStateBackend("hdfs:/user/flink", true) 2.时间间隔 20 分钟 谢谢! ID Status Acknowledged Trigger Time Latest Acknowledgement End to End Duration State Size Buffered During Alignment 50459/59 16:51:30 16:51:34 4s 80.1 MB 0 B 50359/59 16:31:30 16:31:33 3s 80.1 MB 0 B 50259/59 16:11:30 16:11:34 4s 80.4 MB 0 B 50159/59 15:51:30 15:52:23 53s 3.56 GB 0 B 50059/59 15:31:51 15:31:56 5s 76.8 MB 0 B 49959/59 15:14:16 15:14:19 3s 93.8 MB 0 B 49859/59 14:54:16 14:54:20 3s 93.9 MB 0 B 49759/59 14:34:16 14:35:04 47s 3.54 GB 0 B 49659/59 14:14:16 14:14:20 3s 92.9 MB 0 B 49559/59 13:54:16 13:54:19 3s 92.8 MB 0 B 在 2019-07-16 20:46:02,"唐军亮" 写道: >确定两个问题: >1、使用的是rocksdb 增量state? >2、checkpoint的时间间隔设置的多少? > > >-- Original -- >From: "USERNAME"; >Date: Tue, Jul 16, 2019 05:36 PM >To: "user-zh"; > >Subject: FLINK Checkpoint 问题咨询 > > > >先谢谢各位大佬! > > >1.环境 >FLINK 版本 :1.7.2 >运行模式:flink on yarn (yarn single job) > > >2.配置 >状态保存方式:RocksDBStateBackend backend = new >RocksDBStateBackend("hdfs:/user/flink", true) >窗口方式:EventTimeSessionWindows.withGap(Time.hours(1)) >计算方式:.aggregate(new MyAggregate(), new MyProcess()) > > >3.数据 >数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右 > > >4.需求 >监测设备超过1小时没有数据,离线报警 >设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间) > > >5.现象 >该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据) > > >6.咨询问题 >a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB >b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样? >c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算? >问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢! > > >祝大家 头发越来越多,代码BUG越来越少! > > > > >--样例数据 >IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End DurationState >SizeBuffered During Alignment >50459/5916:51:3016:51:344s80.1 MB0 B >50359/5916:31:3016:31:333s80.1 MB0 B >50259/5916:11:3016:11:344s80.4 MB0 B >50159/5915:51:3015:52:2353s3.56 GB0 B >50059/5915:31:5115:31:565s76.8 MB0 B >49959/5915:14:1615:14:193s93.8 MB0 B >49859/5914:54:1614:54:203s93.9 MB0 B >49759/5914:34:1614:35:0447s3.54 GB0 B >49659/5914:14:1614:14:203s92.9 MB0 B >49559/5913:54:1613:54:193s92.8 MB0 B
Re:FLINK Checkpoint 问题咨询
确定两个问题: 1、使用的是rocksdb 增量state? 2、checkpoint的时间间隔设置的多少? -- Original -- From: "USERNAME"; Date: Tue, Jul 16, 2019 05:36 PM To: "user-zh"; Subject: FLINK Checkpoint 问题咨询 先谢谢各位大佬! 1.环境 FLINK 版本 :1.7.2 运行模式:flink on yarn (yarn single job) 2.配置 状态保存方式:RocksDBStateBackend backend = new RocksDBStateBackend("hdfs:/user/flink", true) 窗口方式:EventTimeSessionWindows.withGap(Time.hours(1)) 计算方式:.aggregate(new MyAggregate(), new MyProcess()) 3.数据 数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右 4.需求 监测设备超过1小时没有数据,离线报警 设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间) 5.现象 该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据) 6.咨询问题 a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样? c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算? 问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢! 祝大家 头发越来越多,代码BUG越来越少! --样例数据 IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End DurationState SizeBuffered During Alignment 50459/5916:51:3016:51:344s80.1 MB0 B 50359/5916:31:3016:31:333s80.1 MB0 B 50259/5916:11:3016:11:344s80.4 MB0 B 50159/5915:51:3015:52:2353s3.56 GB0 B 50059/5915:31:5115:31:565s76.8 MB0 B 49959/5915:14:1615:14:193s93.8 MB0 B 49859/5914:54:1614:54:203s93.9 MB0 B 49759/5914:34:1614:35:0447s3.54 GB0 B 49659/5914:14:1614:14:203s92.9 MB0 B 49559/5913:54:1613:54:193s92.8 MB0 B
Re: FLINK Checkpoint 问题咨询
你好 1. 窗口数据都会保存的,保存在 State 中,在你的例子中,保存在 RocksDB 中 2. 从给的样例看,应该是增量数据变多了,猜测是往 RocksDB 写数据比较频繁,导致 compaction 之前的 sst 文件无用,这个尝试看 RocksDB 的 log 来验证 3. 窗口的状态暂时无法清楚,你可以用 ProcessFunction [1] 来模拟 window,在 ProcessFunction 中可以按照自己的逻辑清理状态数据 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html Best, Congxian USERNAME 于2019年7月16日周二 下午5:36写道: > 先谢谢各位大佬! > > > 1.环境 > FLINK 版本 :1.7.2 > 运行模式:flink on yarn (yarn single job) > > > 2.配置 > 状态保存方式:RocksDBStateBackend backend = new > RocksDBStateBackend("hdfs:/user/flink", true) > 窗口方式:EventTimeSessionWindows.withGap(Time.hours(1)) > 计算方式:.aggregate(new MyAggregate(), new MyProcess()) > > > 3.数据 > 数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右 > > > 4.需求 > 监测设备超过1小时没有数据,离线报警 > 设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间) > > > 5.现象 > 该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据) > > > 6.咨询问题 > a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB > b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样? > c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算? > 问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢! > > > 祝大家 头发越来越多,代码BUG越来越少! > > > > > --样例数据 > IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End > DurationState SizeBuffered During Alignment > 50459/5916:51:3016:51:344s80.1 MB0 B > 50359/5916:31:3016:31:333s80.1 MB0 B > 50259/5916:11:3016:11:344s80.4 MB0 B > 50159/5915:51:3015:52:2353s3.56 GB0 B > 50059/5915:31:5115:31:565s76.8 MB0 B > 49959/5915:14:1615:14:193s93.8 MB0 B > 49859/5914:54:1614:54:203s93.9 MB0 B > 49759/5914:34:1614:35:0447s3.54 GB0 B > 49659/5914:14:1614:14:203s92.9 MB0 B > 49559/5913:54:1613:54:193s92.8 MB0 B
FLINK Checkpoint 问题咨询
先谢谢各位大佬! 1.环境 FLINK 版本 :1.7.2 运行模式:flink on yarn (yarn single job) 2.配置 状态保存方式:RocksDBStateBackend backend = new RocksDBStateBackend("hdfs:/user/flink", true) 窗口方式:EventTimeSessionWindows.withGap(Time.hours(1)) 计算方式:.aggregate(new MyAggregate(), new MyProcess()) 3.数据 数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右 4.需求 监测设备超过1小时没有数据,离线报警 设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间) 5.现象 该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据) 6.咨询问题 a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样? c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算? 问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢! 祝大家 头发越来越多,代码BUG越来越少! --样例数据 IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End DurationState SizeBuffered During Alignment 50459/5916:51:3016:51:344s80.1 MB0 B 50359/5916:31:3016:31:333s80.1 MB0 B 50259/5916:11:3016:11:344s80.4 MB0 B 50159/5915:51:3015:52:2353s3.56 GB0 B 50059/5915:31:5115:31:565s76.8 MB0 B 49959/5915:14:1615:14:193s93.8 MB0 B 49859/5914:54:1614:54:203s93.9 MB0 B 49759/5914:34:1614:35:0447s3.54 GB0 B 49659/5914:14:1614:14:203s92.9 MB0 B 49559/5913:54:1613:54:193s92.8 MB0 B