回复: flink-checkpoint 问题

2024-01-11 文章 吴先生
看现象是这样,谢了,我抽空看下这块源码


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Zakelly Lan |
| 发送日期 | 2024年1月11日 16:33 |
| 收件人 |  |
| 主题 | Re: flink-checkpoint 问题 |
看了下代码,这个问题有可能的原因是:
1. flink是先创建chk目录,然后再打 Triggering checkpoint 的 log
的,所以有概率是目录创建了,但是log没输出trigger
2. 作业失败,和触发下一个cp,这是两个异步线程,所以有可能是先执行了创建25548目录的操作然后作业再失败,然后trigger
25548还没输出就退了。

版本1.14.5之后代码已经把上述1行为改了,先打log再创建目录,就不会有这样奇怪的问题了。



On Thu, Jan 11, 2024 at 3:03 PM 吴先生 <15951914...@163.com> wrote:

TM日志:
2023-12-31 18:50:11.180 [flink-akka.actor.default-dispatcher-26] INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task
and sending final execution state CANCELED to JobManager for task
ChargeRangeBroadcastFunction -> Timestamps/Watermarks (4/6)#0
e960208bbd95b1b219bafe4887b48392.
2023-12-31 18:50:11.232 [Flink Netty Server (288) Thread 0] ERROR
o.a.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered
error while consuming partitions
java.nio.channels.ClosedChannelException: null
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:606)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1352)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:472)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:957)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.close(AbstractChannel.java:232)
at org.apache.flink.runtime.io
.network.netty.PartitionRequestQueue.close(PartitionRequestQueue.java:134)
at org.apache.flink.runtime.io
.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:160)
at org.apache.flink.runtime.io
.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:47)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)


JM日志,没有25548的触发记录:
2023-12-31 18:39:10.664 [jobmanager-future-thread-20] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 25546 for job d12f3c6e836f56fb23d96e31737ff0b3 (411347

Re: flink-checkpoint 问题

2024-01-11 文章 Zakelly Lan
748)
>
>
> JM日志,没有25548的触发记录:
> 2023-12-31 18:39:10.664 [jobmanager-future-thread-20] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 25546 for job d12f3c6e836f56fb23d96e31737ff0b3 (411347921 bytes
> in 50128 ms).
> 2023-12-31 18:40:10.681 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 25547 (type=CHECKPOINT) @ 1704019210665 for job
> d12f3c6e836f56fb23d96e31737ff0b3.
> 2023-12-31 18:50:10.681 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint
> 25547 of job d12f3c6e836f56fb23d96e31737ff0b3 expired before completing.
> 2023-12-31 18:50:10.698 [flink-akka.actor.default-dispatcher-3] INFO
> org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from a
> global failure.
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
>  at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90)
>  at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)
>  at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1760)
>  at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1733)
>  at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)
>  at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1870)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>
>
>
>
> checkpoing路径下有:
> 25546:正常
> 25547:无
> 25548:有,路径下为空
>
>
>
>
> 任务人为从25548恢复时失败,抛出异常找不到_metadate文件
>
>
> | |
> 吴先生
> |
> |
> 15951914...@163.com
> |
>  回复的原邮件 
> | 发件人 | Xuyang |
> | 发送日期 | 2024年1月11日 14:55 |
> | 收件人 |  |
> | 主题 | Re:回复: flink-checkpoint 问题 |
> Hi, 你的图挂了,可以用图床处理一下,或者直接贴log。
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
> 在 2024-01-11 13:40:43,"吴先生" <15951914...@163.com> 写道:
>
> JM中chk失败时间点日志,没有25548的触发记录:
>
>
> 自动recovery失败:
>
>
> TM日志:
>
>
> checkpoint文件路径,25548里面空的:
>
>
> | |
> 吴先生
> |
> |
> 15951914...@163.com
> |
>  回复的原邮件 
> | 发件人 | Zakelly Lan |
> | 发送日期 | 2024年1月10日 18:20 |
> | 收件人 |  |
> | 主题 | Re: flink-checkpoint 问题 |
> 你好,
> 方便的话贴一下jobmanager的log吧,应该有一些线索
>
>
> On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote:
>
> Flink版本: 1.12
> checkpoint配置:hdfs
>
>
> 现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的
>
>
>


回复: flink-checkpoint 问题

2024-01-10 文章 吴先生
0b3 expired before completing.
2023-12-31 18:50:10.698 [flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from a global 
failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
 at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90)
 at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1760)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1733)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1870)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)




checkpoing路径下有:
25546:正常
25547:无
25548:有,路径下为空




任务人为从25548恢复时失败,抛出异常找不到_metadate文件


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Xuyang |
| 发送日期 | 2024年1月11日 14:55 |
| 收件人 |  |
| 主题 | Re:回复: flink-checkpoint 问题 |
Hi, 你的图挂了,可以用图床处理一下,或者直接贴log。




--

Best!
Xuyang




在 2024-01-11 13:40:43,"吴先生" <15951914...@163.com> 写道:

JM中chk失败时间点日志,没有25548的触发记录:


自动recovery失败:


TM日志:


checkpoint文件路径,25548里面空的:


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Zakelly Lan |
| 发送日期 | 2024年1月10日 18:20 |
| 收件人 |  |
| 主题 | Re: flink-checkpoint 问题 |
你好,
方便的话贴一下jobmanager的log吧,应该有一些线索


On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote:

Flink版本: 1.12
checkpoint配置:hdfs

现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的




Re:回复: flink-checkpoint 问题

2024-01-10 文章 Xuyang
Hi, 你的图挂了,可以用图床处理一下,或者直接贴log。




--

Best!
Xuyang




在 2024-01-11 13:40:43,"吴先生" <15951914...@163.com> 写道:

JM中chk失败时间点日志,没有25548的触发记录:


自动recovery失败:


TM日志:


checkpoint文件路径,25548里面空的:


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Zakelly Lan |
| 发送日期 | 2024年1月10日 18:20 |
| 收件人 |  |
| 主题 | Re: flink-checkpoint 问题 |
你好,
方便的话贴一下jobmanager的log吧,应该有一些线索


On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote:

Flink版本: 1.12
checkpoint配置:hdfs

现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的




回复: flink-checkpoint 问题

2024-01-10 文章 吴先生
JM中chk失败时间点日志,没有25548的触发记录:


自动recovery失败:


TM日志:


checkpoint文件路径,25548里面空的:


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Zakelly Lan |
| 发送日期 | 2024年1月10日 18:20 |
| 收件人 |  |
| 主题 | Re: flink-checkpoint 问题 |
你好,
方便的话贴一下jobmanager的log吧,应该有一些线索


On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote:

Flink版本: 1.12
checkpoint配置:hdfs

现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的




Re:flink-checkpoint 问题

2024-01-10 文章 ouywl
我记得flink低版本有这个bug,会错误的删除某一个checkpoint的,你这个版本太老了,可以升级到新版本。


The following is the content of the forwarded email
From:"吴先生" <15951914...@163.com>
To:user-zh 
Date:2024-01-10 17:54:42
Subject:flink-checkpoint 问题

Flink版本: 1.12
checkpoint配置:hdfs
现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的




Re: flink-checkpoint 问题

2024-01-10 文章 Zakelly Lan
你好,
方便的话贴一下jobmanager的log吧,应该有一些线索


On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote:

> Flink版本: 1.12
> checkpoint配置:hdfs
>
> 现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的
>
>


flink-checkpoint 问题

2024-01-10 文章 吴先生
Flink版本: 1.12
checkpoint配置:hdfs
现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的



Re: flink 无法checkpoint问题

2021-12-29 文章 Caizhi Weng
Hi!

图片无法显示,建议使用外部图床上传。

checkpoint 慢的原因可能有很多,最可能的原因是由于算子处理数据太慢导致反压(可以通过 Flink web UI 每个节点的 busy
百分比大致看出来)。建议检查资源是否充足,数据是否倾斜,gc 是否过于频繁等。

紫月幽魔灵  于2021年12月28日周二 10:38写道:

> 版本:flink版本1.14.0
> 问题: 使用flink 1.14.0版本提交到jdk1.7版本的yarn集群上checkpoint无法生成,一直处于IN_PROGRESS状态
> 提交命令如下:
> ./bin/flinksql-submit.sh \
> --sql sqlserver-cdc-to-kafka.sql \
> -m yarn-cluster \
> -ynm sqlserverTOkafka \
> -ys 2 \
> -yjm 1024 \
> -ytm 1024 \
> -yid application_1640657115196_0001 \
> -yD yarn.taskmanager.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \
> -yD containerized.master.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \
> -yD containerized.taskmanager.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \
> -yD log4j2.formatMsgNoLookups=true
> 这是什么原因造成的呢?
>


回复: checkpoint问题

2020-09-16 文章 明启 孙
感谢解答

smq

发件人: Yun Tang
发送时间: 2020年9月17日 10:30
收件人: user-zh
主题: Re: checkpoint问题

Hi

checkpoint使用operator id进行一一映射进行恢复,请参照 
设置id[1],以及如果checkpoint中存在某个operator但是修改后的作业并不存在该operator时的处理逻辑[2]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state

祝好
唐云

From: smq <374060...@qq.com>
Sent: Thursday, September 17, 2020 7:02
To: user-zh 
Subject: checkpoint问题

如果我的程序逻辑修改,还能用之前的checkpoint吗



Re: checkpoint问题

2020-09-16 文章 Yun Tang
Hi

checkpoint使用operator id进行一一映射进行恢复,请参照 
设置id[1],以及如果checkpoint中存在某个operator但是修改后的作业并不存在该operator时的处理逻辑[2]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state

祝好
唐云

From: smq <374060...@qq.com>
Sent: Thursday, September 17, 2020 7:02
To: user-zh 
Subject: checkpoint问题

如果我的程序逻辑修改,还能用之前的checkpoint吗


checkpoint问题

2020-09-16 文章 smq
如果我的程序逻辑修改,还能用之前的checkpoint吗

转发:Sql-client的checkpoint问题

2020-08-06 文章 king


Checkpoint只生成了shared和taskowned目录,没有chk,望解答,谢谢
| |
king
|
|
邮箱:kingjinhe2...@163.com
|

Signature is customized by Netease Mail Master

- 转发的邮件 -

发件人: king
发送日期: 2020年08月07日 09:05
收件人: user-zh
主题: 转发:Sql-client的checkpoint问题


抱歉,不是flink-site.yaml是flink-conf.yaml
| |
king
|
|
邮箱:kingjinhe2...@163.com
|

Signature is customized by Netease Mail Master

- 转发的邮件 -

发件人: king
发送日期: 2020年08月07日 08:23
收件人: user-zh
主题: Sql-client的checkpoint问题


您好,flink1.11.0,请问,
1.sql-client 如何设置checkpoint时间(生成周期),在做file 
streaming时候hdfs文件一直In-progress处状态,不能Finalized
2.之前在flink-site.yaml文件中设置了checkpoint,systemfile类型,只生成了另外两个目录,没有生成chk,望解答,谢谢


以上问题在编程方式下无问题。
| |
king
|
|
邮箱:kingjinhe2...@163.com
|

Signature is customized by Netease Mail Master

转发:Sql-client的checkpoint问题

2020-08-06 文章 king


抱歉,不是flink-site.yaml是flink-conf.yaml
| |
king
|
|
邮箱:kingjinhe2...@163.com
|

Signature is customized by Netease Mail Master

- 转发的邮件 -

发件人: king
发送日期: 2020年08月07日 08:23
收件人: user-zh
主题: Sql-client的checkpoint问题


您好,flink1.11.0,请问,
1.sql-client 如何设置checkpoint时间(生成周期),在做file 
streaming时候hdfs文件一直In-progress处状态,不能Finalized
2.之前在flink-site.yaml文件中设置了checkpoint,systemfile类型,只生成了另外两个目录,没有生成chk,望解答,谢谢


以上问题在编程方式下无问题。
| |
king
|
|
邮箱:kingjinhe2...@163.com
|

Signature is customized by Netease Mail Master

Sql-client的checkpoint问题

2020-08-06 文章 king


您好,flink1.11.0,请问,
1.sql-client 如何设置checkpoint时间(生成周期),在做file 
streaming时候hdfs文件一直In-progress处状态,不能Finalized
2.之前在flink-site.yaml文件中设置了checkpoint,systemfile类型,只生成了另外两个目录,没有生成chk,望解答,谢谢


以上问题在编程方式下无问题。
| |
king
|
|
邮箱:kingjinhe2...@163.com
|

Signature is customized by Netease Mail Master

Re:Re:FLINK Checkpoint 问题咨询

2019-07-16 文章 USERNAME
1.是采用增量方式 RocksDBStateBackend backend = new 
RocksDBStateBackend("hdfs:/user/flink", true)
2.时间间隔 20 分钟
谢谢!


ID  Status  Acknowledged  Trigger Time  Latest Acknowledgement  
End to End Duration  State Size  Buffered During Alignment  
50459/59  16:51:30  16:51:34  4s  80.1 MB  0 B  

50359/59  16:31:30  16:31:33  3s  80.1 MB  0 B  

50259/59  16:11:30  16:11:34  4s  80.4 MB  0 B  

50159/59  15:51:30  15:52:23  53s  3.56 GB  0 B 
 
50059/59  15:31:51  15:31:56  5s  76.8 MB  0 B  

49959/59  15:14:16  15:14:19  3s  93.8 MB  0 B  

49859/59  14:54:16  14:54:20  3s  93.9 MB  0 B  

49759/59  14:34:16  14:35:04  47s  3.54 GB  0 B 
 
49659/59  14:14:16  14:14:20  3s  92.9 MB  0 B  

49559/59  13:54:16  13:54:19  3s  92.8 MB  0 B  


在 2019-07-16 20:46:02,"唐军亮"  写道:
>确定两个问题:
>1、使用的是rocksdb 增量state?
>2、checkpoint的时间间隔设置的多少?
> 
> 
>-- Original --
>From:  "USERNAME";
>Date:  Tue, Jul 16, 2019 05:36 PM
>To:  "user-zh"; 
>
>Subject:  FLINK Checkpoint 问题咨询
>
> 
>
>先谢谢各位大佬!
>
>
>1.环境
>FLINK 版本 :1.7.2
>运行模式:flink on yarn (yarn single job)
>
>
>2.配置
>状态保存方式:RocksDBStateBackend backend = new 
>RocksDBStateBackend("hdfs:/user/flink", true)
>窗口方式:EventTimeSessionWindows.withGap(Time.hours(1))
>计算方式:.aggregate(new MyAggregate(), new MyProcess())
>
>
>3.数据
>数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右
>
>
>4.需求
>监测设备超过1小时没有数据,离线报警
>设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间)
>
>
>5.现象
>该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据)
>
>
>6.咨询问题
>a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB
>b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样?
>c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算?
>问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢!
>
>
>祝大家 头发越来越多,代码BUG越来越少!
>
>
>
>
>--样例数据
>IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End DurationState 
>SizeBuffered During Alignment
>50459/5916:51:3016:51:344s80.1 MB0 B
>50359/5916:31:3016:31:333s80.1 MB0 B
>50259/5916:11:3016:11:344s80.4 MB0 B
>50159/5915:51:3015:52:2353s3.56 GB0 B
>50059/5915:31:5115:31:565s76.8 MB0 B
>49959/5915:14:1615:14:193s93.8 MB0 B
>49859/5914:54:1614:54:203s93.9 MB0 B
>49759/5914:34:1614:35:0447s3.54 GB0 B
>49659/5914:14:1614:14:203s92.9 MB0 B
>49559/5913:54:1613:54:193s92.8 MB0 B


Re:FLINK Checkpoint 问题咨询

2019-07-16 文章 唐军亮
确定两个问题:
1、使用的是rocksdb 增量state?
2、checkpoint的时间间隔设置的多少?
 
 
-- Original --
From:  "USERNAME";
Date:  Tue, Jul 16, 2019 05:36 PM
To:  "user-zh"; 

Subject:  FLINK Checkpoint 问题咨询

 

先谢谢各位大佬!


1.环境
FLINK 版本 :1.7.2
运行模式:flink on yarn (yarn single job)


2.配置
状态保存方式:RocksDBStateBackend backend = new 
RocksDBStateBackend("hdfs:/user/flink", true)
窗口方式:EventTimeSessionWindows.withGap(Time.hours(1))
计算方式:.aggregate(new MyAggregate(), new MyProcess())


3.数据
数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右


4.需求
监测设备超过1小时没有数据,离线报警
设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间)


5.现象
该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据)


6.咨询问题
a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB
b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样?
c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算?
问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢!


祝大家 头发越来越多,代码BUG越来越少!




--样例数据
IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End DurationState 
SizeBuffered During Alignment
50459/5916:51:3016:51:344s80.1 MB0 B
50359/5916:31:3016:31:333s80.1 MB0 B
50259/5916:11:3016:11:344s80.4 MB0 B
50159/5915:51:3015:52:2353s3.56 GB0 B
50059/5915:31:5115:31:565s76.8 MB0 B
49959/5915:14:1615:14:193s93.8 MB0 B
49859/5914:54:1614:54:203s93.9 MB0 B
49759/5914:34:1614:35:0447s3.54 GB0 B
49659/5914:14:1614:14:203s92.9 MB0 B
49559/5913:54:1613:54:193s92.8 MB0 B

Re: FLINK Checkpoint 问题咨询

2019-07-16 文章 Congxian Qiu
你好

1. 窗口数据都会保存的,保存在 State 中,在你的例子中,保存在 RocksDB 中
2. 从给的样例看,应该是增量数据变多了,猜测是往 RocksDB 写数据比较频繁,导致 compaction 之前的 sst 文件无用,这个尝试看
RocksDB 的 log 来验证
3. 窗口的状态暂时无法清楚,你可以用 ProcessFunction [1] 来模拟 window,在 ProcessFunction
中可以按照自己的逻辑清理状态数据

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html
Best,
Congxian


USERNAME  于2019年7月16日周二 下午5:36写道:

> 先谢谢各位大佬!
>
>
> 1.环境
> FLINK 版本 :1.7.2
> 运行模式:flink on yarn (yarn single job)
>
>
> 2.配置
> 状态保存方式:RocksDBStateBackend backend = new
> RocksDBStateBackend("hdfs:/user/flink", true)
> 窗口方式:EventTimeSessionWindows.withGap(Time.hours(1))
> 计算方式:.aggregate(new MyAggregate(), new MyProcess())
>
>
> 3.数据
> 数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右
>
>
> 4.需求
> 监测设备超过1小时没有数据,离线报警
> 设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间)
>
>
> 5.现象
> 该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据)
>
>
> 6.咨询问题
> a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB
> b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样?
> c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算?
> 问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢!
>
>
> 祝大家 头发越来越多,代码BUG越来越少!
>
>
>
>
> --样例数据
> IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End
> DurationState SizeBuffered During Alignment
> 50459/5916:51:3016:51:344s80.1 MB0 B
> 50359/5916:31:3016:31:333s80.1 MB0 B
> 50259/5916:11:3016:11:344s80.4 MB0 B
> 50159/5915:51:3015:52:2353s3.56 GB0 B
> 50059/5915:31:5115:31:565s76.8 MB0 B
> 49959/5915:14:1615:14:193s93.8 MB0 B
> 49859/5914:54:1614:54:203s93.9 MB0 B
> 49759/5914:34:1614:35:0447s3.54 GB0 B
> 49659/5914:14:1614:14:203s92.9 MB0 B
> 49559/5913:54:1613:54:193s92.8 MB0 B


FLINK Checkpoint 问题咨询

2019-07-16 文章 USERNAME
先谢谢各位大佬!


1.环境
FLINK 版本 :1.7.2
运行模式:flink on yarn (yarn single job)


2.配置
状态保存方式:RocksDBStateBackend backend = new 
RocksDBStateBackend("hdfs:/user/flink", true)
窗口方式:EventTimeSessionWindows.withGap(Time.hours(1))
计算方式:.aggregate(new MyAggregate(), new MyProcess())


3.数据
数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右


4.需求
监测设备超过1小时没有数据,离线报警
设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间)


5.现象
该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据)


6.咨询问题
a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB
b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样?
c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算?
问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢!


祝大家 头发越来越多,代码BUG越来越少!




--样例数据
IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End DurationState 
SizeBuffered During Alignment
50459/5916:51:3016:51:344s80.1 MB0 B
50359/5916:31:3016:31:333s80.1 MB0 B
50259/5916:11:3016:11:344s80.4 MB0 B
50159/5915:51:3015:52:2353s3.56 GB0 B
50059/5915:31:5115:31:565s76.8 MB0 B
49959/5915:14:1615:14:193s93.8 MB0 B
49859/5914:54:1614:54:203s93.9 MB0 B
49759/5914:34:1614:35:0447s3.54 GB0 B
49659/5914:14:1614:14:203s92.9 MB0 B
49559/5913:54:1613:54:193s92.8 MB0 B