Re: flink web ui cancel job时能否指定要不要生成savepoint?
hi casel, 目前web ui上应该不支持触发savepoint。 如果要使用stop-with-savepoint功能的话, 可以通过bin/flink[1]或者rest api[2]的方式。 [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint [2] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-stop 李晋忠 casel.chen 于2022年10月28日周五 09:41写道: > flink web ui cancel job时能否指定要不要生成savepoint? 目前是直接cancel job without > savepoint的。
flink web ui cancel job时能否指定要不要生成savepoint?
flink web ui cancel job时能否指定要不要生成savepoint? 目前是直接cancel job without savepoint的。
flink1.10.0-RestClusterClient-cancel job报错
dear all: 如题,我在调用RestClusterClient#cancel(JobID jobId)方法取消作业时,get不到结果,但作业能正常停止。 用future.get()会报错如下: Number of retries has been exhausted. 用future.get(10, TimeUnit.SECONDS)会报错timeout. 调用#cancelWithSavepoint(...)和#stopWithSavepoint(...)就没问题,可以正常获取到结果,不报错。 目前发现, 1.10.0有问题, 1.14.0没有上述问题。 作业运行于cdh yarn集群,版本2.6.0 作业部署,per-job 代码如下: try (ClusterClient clusterClient = new RestClusterClient<>(configuration, clusterId)) { clusterClient .cancel(jobId) .get(20, TimeUnit.SECONDS) } catch (Exception e) { // } 有谁知道如何解决此问题吗? 非常感谢!
flink1.10.0-RestClusterClient-cancel job报错
dear all: 如题,我在调用RestClusterClient#cancel(JobID jobId)方法取消作业时,get不到结果,但作业能正常停止。 用future.get()会报错如下: Number of retries has been exhausted. 用future.get(10, TimeUnit.SECONDS)会报错timeout. 调用#cancelWithSavepoint(...)和#stopWithSavepoint(...)就没问题,可以正常获取到结果,不报错。 目前发现, 1.10.0有问题, 1.14.0没有上述问题。 作业运行于cdh yarn集群,版本2.6.0 作业部署,per-job 代码如下: try (ClusterClient clusterClient = new RestClusterClient<>(configuration, clusterId)) { clusterClient .cancel(jobId) .get(20, TimeUnit.SECONDS) } catch (Exception e) { // } 有谁知道如何解决此问题吗? 非常感谢!
回复:flink1.10.0-RestClusterClient cancel job, 报错
时间设长没有用,不限时都会报错。 回复的原邮件 | 发件人 | 何凯飞<18703416...@163.com> | | 日期 | 2021年12月09日 15:07 | | 收件人 | user-zh@flink.apache.org | | 抄送至 | | | 主题 | Re: flink1.10.0-RestClusterClient cancel job, 报错 | 有尝试过将 timeout 时间设置长一点嘛? 比如3 min
Re: flink1.10.0-RestClusterClient cancel job, 报错
有尝试过将 timeout 时间设置长一点嘛? 比如3 min
flink1.10.0-RestClusterClient cancel job, 报错
dear all: 如题,我在调用RestClusterClient#cancel(JobID jobId)方法取消作业时,get不到结果,但作业能正常停止。 用future.get()会报错如下: Number of retries has been exhausted. 用future.get(10, TimeUnit.SECONDS)会报错timeout. 调用#cancelWithSavepoint(...)和#stopWithSavepoint(...)就没问题,可以正常获取到结果,不报错。 作业运行于cdh yarn集群,版本2.6.0 作业部署,per-job 代码如下: try (ClusterClient clusterClient = new RestClusterClient<>(configuration, clusterId)) { clusterClient .cancel(jobId) .get(20, TimeUnit.SECONDS) } catch (Exception e) { // } 有谁知道如何解决此问题吗? 非常感谢! 回复的原邮件 | 发件人 | Yun Tang | | 日期 | 2021年12月09日 10:57 | | 收件人 | user-zh | | 抄送至 | | | 主题 | Re: flink sql支持细粒度的状态配置 | Hi 你好, 我认为这是一个很好的需求,对于data stream以及python API来说,state TTL都是通过API逐个配置的,你的需求就可以直接满足。但是对于SQL来说,由于相同的SQL语句,不同优化器其生成的执行plan可能会差异很大,很难对某个operator内的state进行TTL进行配置,可能一种方式是增加一些SQL的优化hint,对于你示例中的join语句和groupBy 的count语句配以不同的TTL,但是目前Flink SQL尚未支持该功能。 祝好 唐云 From: gygz...@163.com Sent: Tuesday, December 7, 2021 18:38 To: user-zh Subject: flink sql支持细粒度的状态配置 Hi all 在我们生产中发现,如果在sql中配置状态的TTL会导致这个 ttl时间全局生效 如果我存在一个如下sql select count(1),region from (select * from A join B on a.uid = b.uid) group by region 如果我配置一个全局的TTL会导致count这个GroupAggFunction的状态被淘汰掉,比如说一天以后累计就被清零 如果不配置,又会导致Regular join的状态增大 这是其中一个场景,这里只是举一个例子 主要是想询问针对 Sql中需要配置局部State的ttl时间,或者同一个任务每个sql配置不同的TTL时间,这种场景应该如何去做 ? gygz...@163.com
Re: Cancel job error ! Interrupted while waiting for buffer
.11-1.12.4.jar:1.12.4] > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) > ~[flink-dist_2.11-1.12.4.jar:1.12.4] > at > com.operation.ExtractLineOperationAgg.flatMap(ExtractLineOperationAgg.java:72) > ~[testFlink-1.0.jar:?] > at > com.operation.ExtractLineOperationAgg.flatMap(ExtractLineOperationAgg.java:28) > ~[testFlink-1.0.jar:?] > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) > ~[flink-dist_2.11-1.12.4.jar:1.12.4] > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112) > ~[flink-dist_2.11-1.12.4.jar:1.12.4] > ... 32 more > > > My question : > > 1. what can I do to deal with this error ? > 2. if I cancel job with savepoint , will this error affect savepoint ? > > > Best ! > > > > >
Cancel job error ! Interrupted while waiting for buffer
Hi I use Flink 1.12.4 on yarn, job topology is. kafka -> source -> flatmap -> window 1 min agg -> sink -> kafka. Checkpoint is enable , checkpoint interval is 20s . When I cancel my job, some TM cancel success, some TM become cenceling and the TM will be kill by itself with task.cancellation.timeout = 18. the TM log show that org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:114) ~[flink-dist_2.11-1.12.4.jar:1.12.4] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93) ~[flink-dist_2.11-1.12.4.jar:1.12.4] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) ~[flink-dist_2.11-1.12.4.jar:1.12.4] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.12.4.jar:1.12.4] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.12.4.jar:1.12.4] at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) ~[flink-dist_2.11-1.12.4.jar:1.12.4] at com.operation.ParseLineOperationForAgg.flatMap(ParseLineOperationForAgg.java:74) [testFlink-1.0.jar:?] at com.operation.ParseLineOperationForAgg.flatMap(ParseLineOperationForAgg.java:29) [testFlink-1.0.jar:?] at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) [flink-dist_2.11-1.12.4.jar:1.12.4] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112) [flink-dist_2.11-1.12.4.jar:1.12.4] Caused by: java.io.IOException: Interrupted while waiting for buffer at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:341) ~[testFlink-1.0.jar:?] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313) ~[testFlink-1.0.jar:?] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257) ~[testFlink-1.0.jar:?] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149) ~[testFlink-1.0.jar:?] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104) ~[testFlink-1.0.jar:?] at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) ~[testFlink-1.0.jar:?] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101) ~[flink-dist_2.11-1.12.4.jar:1.12.4] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87) ~[flink-dist_2.11-1.12.4.jar:1.12.4] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43) ~[flink-dist_2.11-1.12.4.jar:1.12.4] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.12.4.jar:1.12.4] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.12.4.jar:1.12.4] at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) ~[flink-dist_2.11-1.12.4.jar:1.12.4] at com.operation.ExtractLineOperationAgg.flatMap(ExtractLineOperationAgg.java:72) ~[testFlink-1.0.jar:?] at com.operation.ExtractLineOperationAgg.flatMap(ExtractLineOperationAgg.java:28) ~[testFlink-1.0.jar:?] at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) ~[flink-dist_2.11-1.12.4.jar:1.12.4] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112) ~[flink-dist_2.11-1.12.4.jar:1.12.4] ... 32 more My question : 1. what can I do to deal with this error ? 2. if I cancel job with savepoint , will this error affect savepoint ? Best !
?????? flink 1.12 Cancel Job??????????(??)
flink-sql,with-upsertMemoryStateBackendenv.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)?? ---- ??: ""
Re: 回复: flink 1.12 Cancel Job内存未释放(问)
Hi 徐州州 请查看一下checkpoint UI部分的overview,观察restored部分的是否为空,也就是没有从checkpoint恢复,同样可以观察job manager 部分日志,看是否从checkpoint resume。 如果没有从checkpoint/savepoint恢复,作业其实相当于是从头重新跑,除非作业有其他的外部访问,否则不应该有任何历史数据能看到。 祝好 唐云 From: 徐州州 <25977...@qq.com> Sent: Tuesday, January 5, 2021 10:34 To: user-zh@flink.apache.org Subject: 回复: flink 1.12 Cancel Job内存未释放(问) 这是我完整的配置文件,并没有设置任何状态后端,和保存点,任务kill执行的命令是/opt/module/hadoop3.2.1/bin/yarn application -kill jobid,启动命令执行的是,/opt/module/flink1.12/bin/flink run -d -m yarn-cluster -yjm 660 -ytm 2500 -ys 3 -yqu xjia_queue -ynm App_Bs_Drainage_Launch_200105,我猜想会不会是因为队列的问题,我集群中只有一个queue队列。 -- 原始邮件 -- 发件人: "user-zh" ; 发送时间: 2021年1月5日(星期二) 上午10:03 收件人: "user-zh@flink.apache.org"; 主题: 回复: flink 1.12 Cancel Job内存未释放(问) 这种情况貌似和检查点、保存点还有状态后端有关,可以排查排查,重新启动任务在昨天的基础上累加这个逻辑是正确的(如果配置了检查点、保存点还有状态后端),只是现在昨天你杀死正在执行的job的时候最后保存的状态结果和你实际的结果不一致 | | 刘海 | | liuha...@163.com | 签名由网易邮箱大师定制 在2021年1月5日 09:04,徐州州<25977...@qq.com> 写道: 我一个flink-sql任务,每次隔天计算都会在昨天的计算结果上累加,我使用代码jar的方式提交,代码中设置了MemoryStateBackend,我定时任务是23:57-kill掉正在执行的job,隔天的00:30通过azkaban的启动脚本,重新提交任务执行,可是我发现00:30当天计算的结果,是在昨天的基础上累加的,我查看kill掉任务的那一段时间NodeManager的内存也得到了释放,可是为什么我在00:30的时间点启动,还是会在昨天的基础上累计,而且计算的结果(并没有完全在昨天的基础上累计),比如昨天计算结果1000,今天它可能在900的结果上进行累加。请问这种情况是为什么。试了好多,都没有解决。|insert into app_bs_drainage_place |SELECT | do.GrouporgName, | du.Name, | COUNT(DISTINCT dooi.Code) AS TotalSingular, |md5(concat(do.GrouporgName,du.Name,cast(current_date as String))) as uuids, |current_date as As_Of_Date |FROM dw_od_order_info dooi | INNER JOIN dw_worktask_info dwi ON dwi.CustomerId = dooi.CustomerId AND dwi.HandlerPersonId = dooi.UserId and dwi.As_Of_Date=current_date | INNER JOIN dim_cc_media_placement_label_relation dmplr ON dmplr.MediaPlacementId = dwi.PlacementId | INNER JOIN dim_cc_media_label dcml ON dmplr.LabelId = dcml.Id AND dcml.Name IN ('金装驼奶', '血糖仪') | INNER JOIN dim_user du ON dooi.UserId = du.Id | INNER JOIN dim_org do ON dooi.UserOrgId = do.Grouporgid AND left(do.GrouporgName, 2) = '引流' | WHERE dooi.As_Of_Date=current_date and dooi.Status < 60 AND dooi.Status < 120 AND dooi.OrgType = 1 | GROUP BY do.GrouporgName,du.Name --原始邮件-- 发件人: "赵一旦"
?????? flink 1.12 Cancel Job??????????(??)
kill/opt/module/hadoop3.2.1/bin/yarn application -kill jobid/opt/module/flink1.12/bin/flink run -d -m yarn-cluster -yjm 660 -ytm 2500 -ys 3 -yqu xjia_queue -ynm App_Bs_Drainage_Launch_200105queue?? ---- ??: "user-zh"
?????? flink 1.12 Cancel Job??????????(??)
job | | | | liuha...@163.com | ?? ??2021??1??5?? 09:04<25977...@qq.com> ?? ??flink-sqljarMemoryStateBackend??23:57-killjob00:30azkaban00:30??kill??NodeManager??00:301000??900??|insert into app_bs_drainage_place |SELECT | do.GrouporgName, | du.Name, | COUNT(DISTINCT dooi.Code) AS TotalSingular, |md5(concat(do.GrouporgName,du.Name,cast(current_date as String))) as uuids, |current_date as As_Of_Date |FROM dw_od_order_info dooi | INNER JOIN dw_worktask_info dwi ON dwi.CustomerId = dooi.CustomerId AND dwi.HandlerPersonId = dooi.UserId and dwi.As_Of_Date=current_date | INNER JOIN dim_cc_media_placement_label_relation dmplr ON dmplr.MediaPlacementId = dwi.PlacementId | INNER JOIN dim_cc_media_label dcml ON dmplr.LabelId = dcml.Id AND dcml.Name IN ('', '??') | INNER JOIN dim_user du ON dooi.UserId = du.Id | INNER JOIN dim_org do ON dooi.UserOrgId = do.Grouporgid AND left(do.GrouporgName, 2) = '' | WHERE dooi.As_Of_Date=current_date and dooi.Status < 60 AND dooi.Status < 120 AND dooi.OrgType = 1 | GROUP BY do.GrouporgName,du.Name ---- ??: "??"
Re: flink 1.12 Cancel Job内存未释放(问)
具体SQL。其实我没特别明白你表达的问题。 什么叫做释放内存,还有在之前的结果上累加。这2是什么跟什么没啥关系的东西,没听懂你表达啥。 前者是内存,后者反映的状态。如果是基于检查点/保存点重启任务,当然会保留状态,就是继续累加。 徐州州 <25977...@qq.com> 于2021年1月4日周一 上午8:45写道: > 即使我切换了,yarn-cluster模式,我23:50,通过/opt/module/hadoop3.2.1/bin/yarn > application -kill > application_1609656886263_0043,kill掉job,第二天1:30重启,结果还是在昨天的结果上累加的,执行的kill-job好像并不能释放state,这个真的一点办法都没有了吗? > > > > --原始邮件-- > 发件人: "赵一旦" 发送时间: 2020年12月29日(星期二) 晚上9:35 > 收件人: "user-zh" 主题: Re: flink 1.12 Cancel Job内存未释放(问) > > > > 不可以吧。任务是任务。taskManager是taskManager。 taskManager是提前启动好的一个进程,任务提交的时候会由 > taskManager 帮你执行。cancel后taskManager继续它自己的事情(比如等新的任务)。 > 或者考虑yarn方式,per-job模式啥的。 > > 徐州州 <25977...@qq.com 于2020年12月29日周二 上午9:00写道: > > 请教一下,我flink > > sql任务Cancel之后,隔一个小时后重启,还是接着Cancel的点进行累加计算的。我在IDEA中开发,代码中没有设置任何Checkpoints,请问我该如何在任务Cancel的时候同时释放掉job所使用的TaskManager内存?
?????? flink 1.12 Cancel Job??????????(??)
??yarn-cluster23:50??/opt/module/hadoop3.2.1/bin/yarn application -kill application_1609656886263_0043??kill??job1:30??kill-job??state?? ---- ??: "??"
?????? flink 1.12 Cancel Job??????????(??)
??StandaloneSessionCluster ---- ??: "user-zh"
?????? flink 1.12 Cancel Job??????????(??)
IDEA??flink sql??job??cancel??job??Checkpoint??Cancel??TaskManager??Solt ---- ??: "??"
Re: flink 1.12 Cancel Job内存未释放(问)
不可以吧。任务是任务。taskManager是taskManager。 taskManager是提前启动好的一个进程,任务提交的时候会由 taskManager 帮你执行。cancel后taskManager继续它自己的事情(比如等新的任务)。 或者考虑yarn方式,per-job模式啥的。 徐州州 <25977...@qq.com> 于2020年12月29日周二 上午9:00写道: > 请教一下,我flink > sql任务Cancel之后,隔一个小时后重启,还是接着Cancel的点进行累加计算的。我在IDEA中开发,代码中没有设置任何Checkpoints,请问我该如何在任务Cancel的时候同时释放掉job所使用的TaskManager内存?
flink 1.12 Cancel Job??????????(??)
flink sqlCancelCancelIDEA??CheckpointsCanceljobTaskManager??
Re: Flink rest api cancel job
Hi White, Can you describe your problem in more detail? * What is your Flink version? * How do you deploy the job (application / session cluster), (Kubernetes, Docker, YARN, ...) * What kind of job are you running (DataStream, Table/SQL, DataSet)? Best, Fabian Am Mo., 20. Juli 2020 um 08:42 Uhr schrieb snack white < amazingu...@gmail.com>: > Hi, > When I using rest api to cancel my job , the rest 9 TM has been > canceled quickly , but the other one TM is always cancelling status , > someone can show me how can I solve the question . > Thanks, > White
Flink rest api cancel job
Hi, When I using rest api to cancel my job , the rest 9 TM has been canceled quickly , but the other one TM is always cancelling status , someone can show me how can I solve the question . Thanks, White
Re: org.apache.flink.util.FlinkException: Could not cancel job
You are correct. Thanks! I misused the job ID. Sorry for bothering you guys. Best regards, Chang from iPhone > On 4 Sep 2018, at 18:06, Chesnay Schepler wrote: > > Please check that the job ID is correct. > >> On 04.09.2018 15:48, Chang Liu wrote: >> Dear All, >> >> I had the following issue when trying to cancel a job from CLI. I am >> wondering am I in the proper way of canceling a job? Or, there is more >> elegant way to do this, both in code or in CLI? Many Thanks! >> >> BTW, I am have streaming coming from Kafka and producing to another Kafka >> topic. >> >> ./bin/flink cancel b89f45024cf2e45914eaa920df95907f >> Cancelling job b89f45024cf2e45914eaa920df95907f. >> >> >> The program finished with the following exception: >> >> org.apache.flink.util.FlinkException: Could not cancel job >> b89f45024cf2e45914eaa920df95907f. >> at >> org.apache.flink.client.cli.CliFrontend.lambda$cancel$5(CliFrontend.java:603) >> at >> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:955) >> at >> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:596) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1029) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) >> at >> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >> at >> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) >> Caused by: java.util.concurrent.ExecutionException: >> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not >> complete the operation. Exception is not retryable. >> at >> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >> at >> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) >> at >> org.apache.flink.client.program.rest.RestClusterClient.cancel(RestClusterClient.java:380) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$cancel$5(CliFrontend.java:601) >> ... 6 more >> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: >> Could not complete the operation. Exception is not retryable. >> at >> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214) >> at >> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >> at >> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >> at >> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >> at >> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) >> at >> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) >> at >> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.util.concurrent.CompletionException: >> org.apache.flink.runtime.rest.util.RestClientException: [Job could not be >> found.] >> at >> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) >> at >> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) >> at >> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) >> at >> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953) >> at >> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) >> ... 4 more >> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job >> could not be found.] >> at >> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225) >> at >> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:209) >> at >> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) >> ... 5 more >> >> >> Best regards/祝好, >> >> Chang Liu 刘畅 >> >
Re: org.apache.flink.util.FlinkException: Could not cancel job
Please check that the job ID is correct. On 04.09.2018 15:48, Chang Liu wrote: Dear All, I had the following issue when trying to cancel a job from CLI. I am wondering am I in the proper way of canceling a job? Or, there is more elegant way to do this, both in code or in CLI? Many Thanks! BTW, I am have streaming coming from Kafka and producing to another Kafka topic. ./bin/flink cancel b89f45024cf2e45914eaa920df95907f Cancelling job b89f45024cf2e45914eaa920df95907f. The program finished with the following exception: org.apache.flink.util.FlinkException: Could not cancel job b89f45024cf2e45914eaa920df95907f. at org.apache.flink.client.cli.CliFrontend.lambda$cancel$5(CliFrontend.java:603) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:955) at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:596) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1029) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Exception is not retryable. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.client.program.rest.RestClusterClient.cancel(RestClusterClient.java:380) at org.apache.flink.client.cli.CliFrontend.lambda$cancel$5(CliFrontend.java:601) ... 6 more Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Exception is not retryable. at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.rest.util.RestClientException: [Job could not be found.] at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ... 4 more Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job could not be found.] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:209) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) ... 5 more Best regards/祝好, Chang Liu 刘畅
org.apache.flink.util.FlinkException: Could not cancel job
Dear All, I had the following issue when trying to cancel a job from CLI. I am wondering am I in the proper way of canceling a job? Or, there is more elegant way to do this, both in code or in CLI? Many Thanks! BTW, I am have streaming coming from Kafka and producing to another Kafka topic. ./bin/flink cancel b89f45024cf2e45914eaa920df95907f Cancelling job b89f45024cf2e45914eaa920df95907f. The program finished with the following exception: org.apache.flink.util.FlinkException: Could not cancel job b89f45024cf2e45914eaa920df95907f. at org.apache.flink.client.cli.CliFrontend.lambda$cancel$5(CliFrontend.java:603) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:955) at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:596) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1029) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Exception is not retryable. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.client.program.rest.RestClusterClient.cancel(RestClusterClient.java:380) at org.apache.flink.client.cli.CliFrontend.lambda$cancel$5(CliFrontend.java:601) ... 6 more Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Exception is not retryable. at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.rest.util.RestClientException: [Job could not be found.] at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ... 4 more Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job could not be found.] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:209) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) ... 5 more Best regards/祝好, Chang Liu 刘畅
Re: Could not cancel job (with savepoint) "Ask timed out"
I see, thanks. Looks like it's better for us to switch to triggering savepoint & cancel separately. On Wed, Aug 22, 2018 at 1:26 PM Till Rohrmann wrote: > Calling cancel-with-savepoint multiple times will trigger multiple > savepoints. The first issued savepoint will complete first and then cancel > the job. Thus, the later savepoints might complete or not depending on the > correct timing. Since savepoint can flush results to external systems, I > would recommend not calling the API multiple times. > > Cheers, > Till > > On Wed, Aug 22, 2018 at 10:40 AM Juho Autio wrote: > >> What I meant to ask was, does it do any harm to keep calling >> cancel-with-savepoint until the job exits? If the job is already cancelling >> with savepoint, I would assume that another cancel-with-savepoint call is >> just ignored. >> >> On Tue, Aug 21, 2018 at 1:18 PM Till Rohrmann >> wrote: >> >>> Just a small addition. Concurrent cancel call will interfere with the >>> cancel-with-savepoint command and directly cancel the job. So it is better >>> to use the cancel-with-savepoint call in order to take savepoint and then >>> cancel the job automatically. >>> >>> Cheers, >>> Till >>> >>> On Thu, Aug 9, 2018 at 9:53 AM vino yang wrote: >>> Hi Juho, We use REST client API : triggerSavepoint(), this API returns a CompletableFuture, then we call it's get() API. You can understand that I am waiting for it to complete in sync. Because cancelWithSavepoint is actually waiting for savepoint to complete synchronization, and then execute the cancel command. We do not use CLI. I think since you are through the CLI, you can observe whether the savepoint is complete by combining the log or the web UI. Thanks, vino. Juho Autio 于2018年8月9日周四 下午3:07写道: > Thanks for the suggestion. Is the separate savepoint triggering async? > Would you then separately poll for the savepoint's completion before > executing cancel? If additional polling is needed, then I would say that > for my purpose it's still easier to call cancel with savepoint and simply > ignore the result of the call. I would assume that it won't do any harm if > I keep retrying cancel with savepoint until the job stops – I expect that > an overlapping cancel request is ignored if the job is already creating a > savepoint. Please correct if my assumption is wrong. > > On Thu, Aug 9, 2018 at 5:04 AM vino yang > wrote: > >> Hi Juho, >> >> This problem does exist, I suggest you separate these two steps to >> temporarily deal with this problem: >> 1) Trigger Savepoint separately; >> 2) execute the cancel command; >> >> Hi Till, Chesnay: >> >> Our internal environment and multiple users on the mailing list have >> encountered similar problems. >> >> In our environment, it seems that JM shows that the save point is >> complete and JM has stopped itself, but the client will still connect to >> the old JM and report a timeout exception. >> >> Thanks, vino. >> >> >> Juho Autio 于2018年8月8日周三 下午9:18写道: >> >>> I was trying to cancel a job with savepoint, but the CLI command >>> failed with "akka.pattern.AskTimeoutException: Ask timed out". >>> >>> The stack trace reveals that ask timeout is 10 seconds: >>> >>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on >>> [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms]. >>> Sender[null] sent message of type >>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". >>> >>> Indeed it's documented that the default value >>> for akka.ask.timeout="10 s" in >>> >>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka >>> >>> Behind the scenes the savepoint creation & job cancellation >>> succeeded, that was to be expected, kind of. So my problem is just >>> getting >>> a proper response back from the CLI call instead of timing out so >>> eagerly. >>> >>> To be exact, what I ran was: >>> >>> flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m >>> yarn-cluster -yid application_1533676784032_0001 --withSavepoint >>> >>> Should I change the akka.ask.timeout to have a longer timeout? If >>> yes, can I override it just for the CLI call somehow? Maybe it might >>> have >>> undesired side-effects if set globally for the actual flink jobs to use? >>> >>> What about akka.client.timeout? The default for it is also rather >>> low: "60 s". Should it also be increased accordingly if I want to accept >>> longer than 60 s for savepoint creation? >>> >>> Finally, that default timeout is so low that I would expect this to >>> be a common problem. I would say that Flink CLI should have higher >>> default
Re: Could not cancel job (with savepoint) "Ask timed out"
Calling cancel-with-savepoint multiple times will trigger multiple savepoints. The first issued savepoint will complete first and then cancel the job. Thus, the later savepoints might complete or not depending on the correct timing. Since savepoint can flush results to external systems, I would recommend not calling the API multiple times. Cheers, Till On Wed, Aug 22, 2018 at 10:40 AM Juho Autio wrote: > What I meant to ask was, does it do any harm to keep calling > cancel-with-savepoint until the job exits? If the job is already cancelling > with savepoint, I would assume that another cancel-with-savepoint call is > just ignored. > > On Tue, Aug 21, 2018 at 1:18 PM Till Rohrmann > wrote: > >> Just a small addition. Concurrent cancel call will interfere with the >> cancel-with-savepoint command and directly cancel the job. So it is better >> to use the cancel-with-savepoint call in order to take savepoint and then >> cancel the job automatically. >> >> Cheers, >> Till >> >> On Thu, Aug 9, 2018 at 9:53 AM vino yang wrote: >> >>> Hi Juho, >>> >>> We use REST client API : triggerSavepoint(), this API returns a >>> CompletableFuture, then we call it's get() API. >>> >>> You can understand that I am waiting for it to complete in sync. >>> Because cancelWithSavepoint is actually waiting for savepoint to >>> complete synchronization, and then execute the cancel command. >>> >>> We do not use CLI. I think since you are through the CLI, you can >>> observe whether the savepoint is complete by combining the log or the web >>> UI. >>> >>> Thanks, vino. >>> >>> >>> Juho Autio 于2018年8月9日周四 下午3:07写道: >>> Thanks for the suggestion. Is the separate savepoint triggering async? Would you then separately poll for the savepoint's completion before executing cancel? If additional polling is needed, then I would say that for my purpose it's still easier to call cancel with savepoint and simply ignore the result of the call. I would assume that it won't do any harm if I keep retrying cancel with savepoint until the job stops – I expect that an overlapping cancel request is ignored if the job is already creating a savepoint. Please correct if my assumption is wrong. On Thu, Aug 9, 2018 at 5:04 AM vino yang wrote: > Hi Juho, > > This problem does exist, I suggest you separate these two steps to > temporarily deal with this problem: > 1) Trigger Savepoint separately; > 2) execute the cancel command; > > Hi Till, Chesnay: > > Our internal environment and multiple users on the mailing list have > encountered similar problems. > > In our environment, it seems that JM shows that the save point is > complete and JM has stopped itself, but the client will still connect to > the old JM and report a timeout exception. > > Thanks, vino. > > > Juho Autio 于2018年8月8日周三 下午9:18写道: > >> I was trying to cancel a job with savepoint, but the CLI command >> failed with "akka.pattern.AskTimeoutException: Ask timed out". >> >> The stack trace reveals that ask timeout is 10 seconds: >> >> Caused by: akka.pattern.AskTimeoutException: Ask timed out on >> [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms]. >> Sender[null] sent message of type >> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". >> >> Indeed it's documented that the default value >> for akka.ask.timeout="10 s" in >> >> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka >> >> Behind the scenes the savepoint creation & job cancellation >> succeeded, that was to be expected, kind of. So my problem is just >> getting >> a proper response back from the CLI call instead of timing out so >> eagerly. >> >> To be exact, what I ran was: >> >> flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m >> yarn-cluster -yid application_1533676784032_0001 --withSavepoint >> >> Should I change the akka.ask.timeout to have a longer timeout? If >> yes, can I override it just for the CLI call somehow? Maybe it might have >> undesired side-effects if set globally for the actual flink jobs to use? >> >> What about akka.client.timeout? The default for it is also rather >> low: "60 s". Should it also be increased accordingly if I want to accept >> longer than 60 s for savepoint creation? >> >> Finally, that default timeout is so low that I would expect this to >> be a common problem. I would say that Flink CLI should have higher >> default >> timeout for cancel and savepoint creation ops. >> >> Thanks! >> > >
Re: Could not cancel job (with savepoint) "Ask timed out"
What I meant to ask was, does it do any harm to keep calling cancel-with-savepoint until the job exits? If the job is already cancelling with savepoint, I would assume that another cancel-with-savepoint call is just ignored. On Tue, Aug 21, 2018 at 1:18 PM Till Rohrmann wrote: > Just a small addition. Concurrent cancel call will interfere with the > cancel-with-savepoint command and directly cancel the job. So it is better > to use the cancel-with-savepoint call in order to take savepoint and then > cancel the job automatically. > > Cheers, > Till > > On Thu, Aug 9, 2018 at 9:53 AM vino yang wrote: > >> Hi Juho, >> >> We use REST client API : triggerSavepoint(), this API returns a >> CompletableFuture, then we call it's get() API. >> >> You can understand that I am waiting for it to complete in sync. >> Because cancelWithSavepoint is actually waiting for savepoint to complete >> synchronization, and then execute the cancel command. >> >> We do not use CLI. I think since you are through the CLI, you can observe >> whether the savepoint is complete by combining the log or the web UI. >> >> Thanks, vino. >> >> >> Juho Autio 于2018年8月9日周四 下午3:07写道: >> >>> Thanks for the suggestion. Is the separate savepoint triggering async? >>> Would you then separately poll for the savepoint's completion before >>> executing cancel? If additional polling is needed, then I would say that >>> for my purpose it's still easier to call cancel with savepoint and simply >>> ignore the result of the call. I would assume that it won't do any harm if >>> I keep retrying cancel with savepoint until the job stops – I expect that >>> an overlapping cancel request is ignored if the job is already creating a >>> savepoint. Please correct if my assumption is wrong. >>> >>> On Thu, Aug 9, 2018 at 5:04 AM vino yang wrote: >>> Hi Juho, This problem does exist, I suggest you separate these two steps to temporarily deal with this problem: 1) Trigger Savepoint separately; 2) execute the cancel command; Hi Till, Chesnay: Our internal environment and multiple users on the mailing list have encountered similar problems. In our environment, it seems that JM shows that the save point is complete and JM has stopped itself, but the client will still connect to the old JM and report a timeout exception. Thanks, vino. Juho Autio 于2018年8月8日周三 下午9:18写道: > I was trying to cancel a job with savepoint, but the CLI command > failed with "akka.pattern.AskTimeoutException: Ask timed out". > > The stack trace reveals that ask timeout is 10 seconds: > > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms]. > Sender[null] sent message of type > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". > > Indeed it's documented that the default value for akka.ask.timeout="10 > s" in > > https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka > > Behind the scenes the savepoint creation & job cancellation succeeded, > that was to be expected, kind of. So my problem is just getting a proper > response back from the CLI call instead of timing out so eagerly. > > To be exact, what I ran was: > > flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m > yarn-cluster -yid application_1533676784032_0001 --withSavepoint > > Should I change the akka.ask.timeout to have a longer timeout? If yes, > can I override it just for the CLI call somehow? Maybe it might have > undesired side-effects if set globally for the actual flink jobs to use? > > What about akka.client.timeout? The default for it is also rather > low: "60 s". Should it also be increased accordingly if I want to accept > longer than 60 s for savepoint creation? > > Finally, that default timeout is so low that I would expect this to be > a common problem. I would say that Flink CLI should have higher default > timeout for cancel and savepoint creation ops. > > Thanks! > >>>
Re: Could not cancel job (with savepoint) "Ask timed out"
Just a small addition. Concurrent cancel call will interfere with the cancel-with-savepoint command and directly cancel the job. So it is better to use the cancel-with-savepoint call in order to take savepoint and then cancel the job automatically. Cheers, Till On Thu, Aug 9, 2018 at 9:53 AM vino yang wrote: > Hi Juho, > > We use REST client API : triggerSavepoint(), this API returns a > CompletableFuture, then we call it's get() API. > > You can understand that I am waiting for it to complete in sync. > Because cancelWithSavepoint is actually waiting for savepoint to complete > synchronization, and then execute the cancel command. > > We do not use CLI. I think since you are through the CLI, you can observe > whether the savepoint is complete by combining the log or the web UI. > > Thanks, vino. > > > Juho Autio 于2018年8月9日周四 下午3:07写道: > >> Thanks for the suggestion. Is the separate savepoint triggering async? >> Would you then separately poll for the savepoint's completion before >> executing cancel? If additional polling is needed, then I would say that >> for my purpose it's still easier to call cancel with savepoint and simply >> ignore the result of the call. I would assume that it won't do any harm if >> I keep retrying cancel with savepoint until the job stops – I expect that >> an overlapping cancel request is ignored if the job is already creating a >> savepoint. Please correct if my assumption is wrong. >> >> On Thu, Aug 9, 2018 at 5:04 AM vino yang wrote: >> >>> Hi Juho, >>> >>> This problem does exist, I suggest you separate these two steps to >>> temporarily deal with this problem: >>> 1) Trigger Savepoint separately; >>> 2) execute the cancel command; >>> >>> Hi Till, Chesnay: >>> >>> Our internal environment and multiple users on the mailing list have >>> encountered similar problems. >>> >>> In our environment, it seems that JM shows that the save point is >>> complete and JM has stopped itself, but the client will still connect to >>> the old JM and report a timeout exception. >>> >>> Thanks, vino. >>> >>> >>> Juho Autio 于2018年8月8日周三 下午9:18写道: >>> I was trying to cancel a job with savepoint, but the CLI command failed with "akka.pattern.AskTimeoutException: Ask timed out". The stack trace reveals that ask timeout is 10 seconds: Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". Indeed it's documented that the default value for akka.ask.timeout="10 s" in https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka Behind the scenes the savepoint creation & job cancellation succeeded, that was to be expected, kind of. So my problem is just getting a proper response back from the CLI call instead of timing out so eagerly. To be exact, what I ran was: flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m yarn-cluster -yid application_1533676784032_0001 --withSavepoint Should I change the akka.ask.timeout to have a longer timeout? If yes, can I override it just for the CLI call somehow? Maybe it might have undesired side-effects if set globally for the actual flink jobs to use? What about akka.client.timeout? The default for it is also rather low: "60 s". Should it also be increased accordingly if I want to accept longer than 60 s for savepoint creation? Finally, that default timeout is so low that I would expect this to be a common problem. I would say that Flink CLI should have higher default timeout for cancel and savepoint creation ops. Thanks! >>> >>
Re: Could not cancel job (with savepoint) "Ask timed out"
Hi Juho, We use REST client API : triggerSavepoint(), this API returns a CompletableFuture, then we call it's get() API. You can understand that I am waiting for it to complete in sync. Because cancelWithSavepoint is actually waiting for savepoint to complete synchronization, and then execute the cancel command. We do not use CLI. I think since you are through the CLI, you can observe whether the savepoint is complete by combining the log or the web UI. Thanks, vino. Juho Autio 于2018年8月9日周四 下午3:07写道: > Thanks for the suggestion. Is the separate savepoint triggering async? > Would you then separately poll for the savepoint's completion before > executing cancel? If additional polling is needed, then I would say that > for my purpose it's still easier to call cancel with savepoint and simply > ignore the result of the call. I would assume that it won't do any harm if > I keep retrying cancel with savepoint until the job stops – I expect that > an overlapping cancel request is ignored if the job is already creating a > savepoint. Please correct if my assumption is wrong. > > On Thu, Aug 9, 2018 at 5:04 AM vino yang wrote: > >> Hi Juho, >> >> This problem does exist, I suggest you separate these two steps to >> temporarily deal with this problem: >> 1) Trigger Savepoint separately; >> 2) execute the cancel command; >> >> Hi Till, Chesnay: >> >> Our internal environment and multiple users on the mailing list have >> encountered similar problems. >> >> In our environment, it seems that JM shows that the save point is >> complete and JM has stopped itself, but the client will still connect to >> the old JM and report a timeout exception. >> >> Thanks, vino. >> >> >> Juho Autio 于2018年8月8日周三 下午9:18写道: >> >>> I was trying to cancel a job with savepoint, but the CLI command failed >>> with "akka.pattern.AskTimeoutException: Ask timed out". >>> >>> The stack trace reveals that ask timeout is 10 seconds: >>> >>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on >>> [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms]. >>> Sender[null] sent message of type >>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". >>> >>> Indeed it's documented that the default value for akka.ask.timeout="10 >>> s" in >>> >>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka >>> >>> Behind the scenes the savepoint creation & job cancellation succeeded, >>> that was to be expected, kind of. So my problem is just getting a proper >>> response back from the CLI call instead of timing out so eagerly. >>> >>> To be exact, what I ran was: >>> >>> flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m >>> yarn-cluster -yid application_1533676784032_0001 --withSavepoint >>> >>> Should I change the akka.ask.timeout to have a longer timeout? If yes, >>> can I override it just for the CLI call somehow? Maybe it might have >>> undesired side-effects if set globally for the actual flink jobs to use? >>> >>> What about akka.client.timeout? The default for it is also rather >>> low: "60 s". Should it also be increased accordingly if I want to accept >>> longer than 60 s for savepoint creation? >>> >>> Finally, that default timeout is so low that I would expect this to be a >>> common problem. I would say that Flink CLI should have higher default >>> timeout for cancel and savepoint creation ops. >>> >>> Thanks! >>> >> >
Re: Could not cancel job (with savepoint) "Ask timed out"
Thanks for the suggestion. Is the separate savepoint triggering async? Would you then separately poll for the savepoint's completion before executing cancel? If additional polling is needed, then I would say that for my purpose it's still easier to call cancel with savepoint and simply ignore the result of the call. I would assume that it won't do any harm if I keep retrying cancel with savepoint until the job stops – I expect that an overlapping cancel request is ignored if the job is already creating a savepoint. Please correct if my assumption is wrong. On Thu, Aug 9, 2018 at 5:04 AM vino yang wrote: > Hi Juho, > > This problem does exist, I suggest you separate these two steps to > temporarily deal with this problem: > 1) Trigger Savepoint separately; > 2) execute the cancel command; > > Hi Till, Chesnay: > > Our internal environment and multiple users on the mailing list have > encountered similar problems. > > In our environment, it seems that JM shows that the save point is complete > and JM has stopped itself, but the client will still connect to the old JM > and report a timeout exception. > > Thanks, vino. > > > Juho Autio 于2018年8月8日周三 下午9:18写道: > >> I was trying to cancel a job with savepoint, but the CLI command failed >> with "akka.pattern.AskTimeoutException: Ask timed out". >> >> The stack trace reveals that ask timeout is 10 seconds: >> >> Caused by: akka.pattern.AskTimeoutException: Ask timed out on >> [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms]. >> Sender[null] sent message of type >> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". >> >> Indeed it's documented that the default value for akka.ask.timeout="10 >> s" in >> >> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka >> >> Behind the scenes the savepoint creation & job cancellation succeeded, >> that was to be expected, kind of. So my problem is just getting a proper >> response back from the CLI call instead of timing out so eagerly. >> >> To be exact, what I ran was: >> >> flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m >> yarn-cluster -yid application_1533676784032_0001 --withSavepoint >> >> Should I change the akka.ask.timeout to have a longer timeout? If yes, >> can I override it just for the CLI call somehow? Maybe it might have >> undesired side-effects if set globally for the actual flink jobs to use? >> >> What about akka.client.timeout? The default for it is also rather >> low: "60 s". Should it also be increased accordingly if I want to accept >> longer than 60 s for savepoint creation? >> >> Finally, that default timeout is so low that I would expect this to be a >> common problem. I would say that Flink CLI should have higher default >> timeout for cancel and savepoint creation ops. >> >> Thanks! >> >
Re: Could not cancel job (with savepoint) "Ask timed out"
Hi Juho, This problem does exist, I suggest you separate these two steps to temporarily deal with this problem: 1) Trigger Savepoint separately; 2) execute the cancel command; Hi Till, Chesnay: Our internal environment and multiple users on the mailing list have encountered similar problems. In our environment, it seems that JM shows that the save point is complete and JM has stopped itself, but the client will still connect to the old JM and report a timeout exception. Thanks, vino. Juho Autio 于2018年8月8日周三 下午9:18写道: > I was trying to cancel a job with savepoint, but the CLI command failed > with "akka.pattern.AskTimeoutException: Ask timed out". > > The stack trace reveals that ask timeout is 10 seconds: > > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms]. > Sender[null] sent message of type > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". > > Indeed it's documented that the default value for akka.ask.timeout="10 > s" in > > https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka > > Behind the scenes the savepoint creation & job cancellation succeeded, > that was to be expected, kind of. So my problem is just getting a proper > response back from the CLI call instead of timing out so eagerly. > > To be exact, what I ran was: > > flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m > yarn-cluster -yid application_1533676784032_0001 --withSavepoint > > Should I change the akka.ask.timeout to have a longer timeout? If yes, can > I override it just for the CLI call somehow? Maybe it might have undesired > side-effects if set globally for the actual flink jobs to use? > > What about akka.client.timeout? The default for it is also rather low: "60 > s". Should it also be increased accordingly if I want to accept longer than > 60 s for savepoint creation? > > Finally, that default timeout is so low that I would expect this to be a > common problem. I would say that Flink CLI should have higher default > timeout for cancel and savepoint creation ops. > > Thanks! >
Could not cancel job (with savepoint) "Ask timed out"
I was trying to cancel a job with savepoint, but the CLI command failed with "akka.pattern.AskTimeoutException: Ask timed out". The stack trace reveals that ask timeout is 10 seconds: Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". Indeed it's documented that the default value for akka.ask.timeout="10 s" in https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka Behind the scenes the savepoint creation & job cancellation succeeded, that was to be expected, kind of. So my problem is just getting a proper response back from the CLI call instead of timing out so eagerly. To be exact, what I ran was: flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m yarn-cluster -yid application_1533676784032_0001 --withSavepoint Should I change the akka.ask.timeout to have a longer timeout? If yes, can I override it just for the CLI call somehow? Maybe it might have undesired side-effects if set globally for the actual flink jobs to use? What about akka.client.timeout? The default for it is also rather low: "60 s". Should it also be increased accordingly if I want to accept longer than 60 s for savepoint creation? Finally, that default timeout is so low that I would expect this to be a common problem. I would say that Flink CLI should have higher default timeout for cancel and savepoint creation ops. Thanks!
Re: Cannot cancel job with savepoint due to timeout
Hi Bruno, the lacking documentation for akka.client.timeout is an oversight on our part [1]. I'll update it asap. Unfortunately, at the moment there is no other way than to specify the akka.client.timeout in the flink-conf.yaml file. [1] https://issues.apache.org/jira/browse/FLINK-5700 Cheers, Till On Wed, Feb 1, 2017 at 9:47 AM, Bruno Arandawrote: > Maybe, though it could be good to be able to override in the command line > somehow, though I guess I could just change the flink config. > > Many thanks Yuri, > > Bruno > > On Wed, 1 Feb 2017 at 07:40 Yury Ruchin wrote: > >> Hi Bruno, >> >> From the code I conclude that "akka.client.timeout" setting is what >> affects this. It defaults to 60 seconds. >> >> I'm not sure why this setting is not documented though as well as many >> other "akka.*" settings - maybe there are some good reasons behind. >> >> Regards, >> Yury >> >> 2017-01-31 17:47 GMT+03:00 Bruno Aranda : >> >> Hi there, >> >> I am trying to cancel a job and create a savepoint (ie flink cancel -s) >> but it takes more than a minute to do that and then it fails due to the >> timeout. However, it seems that the job will be cancelled successfully and >> the savepoint made, but I can only see that through the dasboard. >> >> Cancelling job 790b60a2b44bc98854782d4e0cac05d5 with savepoint to >> default savepoint directory. >> >> >> The program finished with the following exception: >> >> java.util.concurrent.TimeoutException: Futures timed out after [6 >> milliseconds] >> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) >> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) >> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) >> at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn( >> BlockContext.scala:53) >> at scala.concurrent.Await$.result(package.scala:190) >> at scala.concurrent.Await.result(package.scala) >> at org.apache.flink.client.CliFrontend.cancel(CliFrontend.java:618) >> at org.apache.flink.client.CliFrontend.parseParameters( >> CliFrontend.java:1079) >> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) >> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) >> at org.apache.flink.runtime.security.HadoopSecurityContext$1.run( >> HadoopSecurityContext.java:43) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at org.apache.hadoop.security.UserGroupInformation.doAs( >> UserGroupInformation.java:1698) >> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured( >> HadoopSecurityContext.java:40) >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1117) >> >> Is there any way to configure this timeout? So we can depend on the >> outcome of this execution for scripts, etc. >> >> Thanks! >> >> Bruno >> >> >>
Re: Cannot cancel job with savepoint due to timeout
Maybe, though it could be good to be able to override in the command line somehow, though I guess I could just change the flink config. Many thanks Yuri, Bruno On Wed, 1 Feb 2017 at 07:40 Yury Ruchinwrote: > Hi Bruno, > > From the code I conclude that "akka.client.timeout" setting is what > affects this. It defaults to 60 seconds. > > I'm not sure why this setting is not documented though as well as many > other "akka.*" settings - maybe there are some good reasons behind. > > Regards, > Yury > > 2017-01-31 17:47 GMT+03:00 Bruno Aranda : > > Hi there, > > I am trying to cancel a job and create a savepoint (ie flink cancel -s) > but it takes more than a minute to do that and then it fails due to the > timeout. However, it seems that the job will be cancelled successfully and > the savepoint made, but I can only see that through the dasboard. > > Cancelling job 790b60a2b44bc98854782d4e0cac05d5 with savepoint to default > savepoint directory. > > > The program finished with the following exception: > > java.util.concurrent.TimeoutException: Futures timed out after [6 > milliseconds] > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:190) > at scala.concurrent.Await.result(package.scala) > at org.apache.flink.client.CliFrontend.cancel(CliFrontend.java:618) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1079) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at > org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1117) > > Is there any way to configure this timeout? So we can depend on the > outcome of this execution for scripts, etc. > > Thanks! > > Bruno > > >
Re: Cannot cancel job with savepoint due to timeout
Hi Bruno, >From the code I conclude that "akka.client.timeout" setting is what affects this. It defaults to 60 seconds. I'm not sure why this setting is not documented though as well as many other "akka.*" settings - maybe there are some good reasons behind. Regards, Yury 2017-01-31 17:47 GMT+03:00 Bruno Aranda: > Hi there, > > I am trying to cancel a job and create a savepoint (ie flink cancel -s) > but it takes more than a minute to do that and then it fails due to the > timeout. However, it seems that the job will be cancelled successfully and > the savepoint made, but I can only see that through the dasboard. > > Cancelling job 790b60a2b44bc98854782d4e0cac05d5 with savepoint to default > savepoint directory. > > > The program finished with the following exception: > > java.util.concurrent.TimeoutException: Futures timed out after [6 > milliseconds] > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn( > BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:190) > at scala.concurrent.Await.result(package.scala) > at org.apache.flink.client.CliFrontend.cancel(CliFrontend.java:618) > at org.apache.flink.client.CliFrontend.parseParameters( > CliFrontend.java:1079) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at org.apache.flink.runtime.security.HadoopSecurityContext$1.run( > HadoopSecurityContext.java:43) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at org.apache.hadoop.security.UserGroupInformation.doAs( > UserGroupInformation.java:1698) > at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured( > HadoopSecurityContext.java:40) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1117) > > Is there any way to configure this timeout? So we can depend on the > outcome of this execution for scripts, etc. > > Thanks! > > Bruno >
Re: Diff between stop and cancel job
On Thu, May 5, 2016 at 1:59 AM, Bajaj, Abhinavwrote: > Or can we resume a stopped streaming job ? You can use savepoints [1] to take a snapshot of a streaming program from which you can restart the job at a later point in time. This is independent of whether you cancel or stop the program after taking the savepoint. Currently, you can effectively only use savepoint + cancel. Stopping is currently only implemented for the TwitterSource. I expect this to be implemented for other more common sources like Kafka as well in future versions. Futhermore, I expect a convenience feature like stop-and-savepoint, which gracefully stops your streaming program and automatically takes a savepoint. But as of now, I probably need to go with savepoint + cancel. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
Re: Cancel Job
Hi, currently, messaged in flight will be dropped if a streaming job gets canceled. There is already WIP to add a STOP signal which allows for a clean shutdown of a streaming job. This should get merged soon and will be available in Flink 1.0. You can follow the JIRA an PR here: https://issues.apache.org/jira/browse/FLINK-2111 https://github.com/apache/flink/pull/750 -Matthias On 01/18/2016 08:26 PM, Don Frascuchon wrote: > Hi, > > When some streaming job is manually canceled, what's about the messages > in process ? Flink's engine wait to task finish process messages inside > (some like apache-storm) ? If not, there is a safe way for stop > streaming jobs ? > > Thanks in advance! > Best regards signature.asc Description: OpenPGP digital signature
Re: Cancel Job
Thanks Matthias ! El lun., 18 ene. 2016 a las 20:51, Matthias J. Sax () escribió: > Hi, > > currently, messaged in flight will be dropped if a streaming job gets > canceled. > > There is already WIP to add a STOP signal which allows for a clean > shutdown of a streaming job. This should get merged soon and will be > available in Flink 1.0. > > You can follow the JIRA an PR here: > https://issues.apache.org/jira/browse/FLINK-2111 > https://github.com/apache/flink/pull/750 > > -Matthias > > > On 01/18/2016 08:26 PM, Don Frascuchon wrote: > > Hi, > > > > When some streaming job is manually canceled, what's about the messages > > in process ? Flink's engine wait to task finish process messages inside > > (some like apache-storm) ? If not, there is a safe way for stop > > streaming jobs ? > > > > Thanks in advance! > > Best regards > >