Re: flink web ui cancel job时能否指定要不要生成savepoint?

2022-10-27 Thread Jinzhong Li
hi casel,
目前web ui上应该不支持触发savepoint。  如果要使用stop-with-savepoint功能的话,
可以通过bin/flink[1]或者rest
api[2]的方式。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-stop


李晋忠


casel.chen  于2022年10月28日周五 09:41写道:

> flink web ui cancel job时能否指定要不要生成savepoint? 目前是直接cancel job without
> savepoint的。


flink web ui cancel job时能否指定要不要生成savepoint?

2022-10-27 Thread casel.chen
flink web ui cancel job时能否指定要不要生成savepoint? 目前是直接cancel job without savepoint的。

flink1.10.0-RestClusterClient-cancel job报错

2021-12-09 Thread nicygan

dear all:
 如题,我在调用RestClusterClient#cancel(JobID jobId)方法取消作业时,get不到结果,但作业能正常停止。

用future.get()会报错如下:
Number of retries has been exhausted.

用future.get(10, TimeUnit.SECONDS)会报错timeout.

调用#cancelWithSavepoint(...)和#stopWithSavepoint(...)就没问题,可以正常获取到结果,不报错。

目前发现,
1.10.0有问题,
1.14.0没有上述问题。


作业运行于cdh yarn集群,版本2.6.0
作业部署,per-job

代码如下:
try (ClusterClient clusterClient = new RestClusterClient<>(configuration, 
clusterId)) {
clusterClient
 .cancel(jobId)
 .get(20, TimeUnit.SECONDS)
} catch (Exception e) {
//
}

有谁知道如何解决此问题吗?
非常感谢!

flink1.10.0-RestClusterClient-cancel job报错

2021-12-09 Thread nicygan

dear all:
  如题,我在调用RestClusterClient#cancel(JobID jobId)方法取消作业时,get不到结果,但作业能正常停止。

用future.get()会报错如下:
Number of retries has been exhausted.

用future.get(10, TimeUnit.SECONDS)会报错timeout.

调用#cancelWithSavepoint(...)和#stopWithSavepoint(...)就没问题,可以正常获取到结果,不报错。

目前发现,
1.10.0有问题,
1.14.0没有上述问题。


作业运行于cdh yarn集群,版本2.6.0
作业部署,per-job

代码如下:
try (ClusterClient clusterClient = new RestClusterClient<>(configuration, 
clusterId)) {
 clusterClient
  .cancel(jobId)
  .get(20, TimeUnit.SECONDS)
} catch (Exception e) {
 //
}

有谁知道如何解决此问题吗?
非常感谢!


回复:flink1.10.0-RestClusterClient cancel job, 报错

2021-12-08 Thread nicygan
时间设长没有用,不限时都会报错。



 回复的原邮件 
| 发件人 | 何凯飞<18703416...@163.com> |
| 日期 | 2021年12月09日 15:07 |
| 收件人 | user-zh@flink.apache.org |
| 抄送至 | |
| 主题 | Re: flink1.10.0-RestClusterClient cancel job, 报错 |
有尝试过将 timeout 时间设置长一点嘛? 比如3 min




Re: flink1.10.0-RestClusterClient cancel job, 报错

2021-12-08 Thread 何凯飞
有尝试过将 timeout 时间设置长一点嘛? 比如3 min 





flink1.10.0-RestClusterClient cancel job, 报错

2021-12-08 Thread nicygan
dear all:
  如题,我在调用RestClusterClient#cancel(JobID jobId)方法取消作业时,get不到结果,但作业能正常停止。

用future.get()会报错如下:
Number of retries has been exhausted.

用future.get(10, TimeUnit.SECONDS)会报错timeout.

调用#cancelWithSavepoint(...)和#stopWithSavepoint(...)就没问题,可以正常获取到结果,不报错。


作业运行于cdh yarn集群,版本2.6.0
作业部署,per-job

代码如下:
try (ClusterClient clusterClient = new RestClusterClient<>(configuration, 
clusterId)) {
 clusterClient
  .cancel(jobId)
  .get(20, TimeUnit.SECONDS)
} catch (Exception e) {
 //
}

有谁知道如何解决此问题吗?
非常感谢!




 回复的原邮件 
| 发件人 | Yun Tang |
| 日期 | 2021年12月09日 10:57 |
| 收件人 | user-zh |
| 抄送至 | |
| 主题 | Re: flink sql支持细粒度的状态配置 |
Hi 你好,

我认为这是一个很好的需求,对于data stream以及python API来说,state 
TTL都是通过API逐个配置的,你的需求就可以直接满足。但是对于SQL来说,由于相同的SQL语句,不同优化器其生成的执行plan可能会差异很大,很难对某个operator内的state进行TTL进行配置,可能一种方式是增加一些SQL的优化hint,对于你示例中的join语句和groupBy
 的count语句配以不同的TTL,但是目前Flink SQL尚未支持该功能。


祝好
唐云


From: gygz...@163.com 
Sent: Tuesday, December 7, 2021 18:38
To: user-zh 
Subject: flink sql支持细粒度的状态配置

Hi all

在我们生产中发现,如果在sql中配置状态的TTL会导致这个 ttl时间全局生效

如果我存在一个如下sql

select count(1),region from (select * from A join B on a.uid = b.uid)  group by 
region

如果我配置一个全局的TTL会导致count这个GroupAggFunction的状态被淘汰掉,比如说一天以后累计就被清零

如果不配置,又会导致Regular join的状态增大

这是其中一个场景,这里只是举一个例子

主要是想询问针对 Sql中需要配置局部State的ttl时间,或者同一个任务每个sql配置不同的TTL时间,这种场景应该如何去做 ?



gygz...@163.com


Re: Cancel job error ! Interrupted while waiting for buffer

2021-06-28 Thread Piotr Nowojski
.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> com.operation.ExtractLineOperationAgg.flatMap(ExtractLineOperationAgg.java:72)
> ~[testFlink-1.0.jar:?]
> at
> com.operation.ExtractLineOperationAgg.flatMap(ExtractLineOperationAgg.java:28)
> ~[testFlink-1.0.jar:?]
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> ... 32 more
>
>
> My question :
>
> 1. what can I do to deal with  this error ?
> 2. if I cancel job with savepoint ,  will  this error  affect  savepoint ?
>
>
> Best !
>
>
>
>
>


Cancel job error ! Interrupted while waiting for buffer

2021-06-25 Thread SmileSmile
Hi 


   I use Flink 1.12.4 on yarn,  job topology is.  kafka -> source -> 
flatmap -> window 1 min agg -> sink -> kafka.  Checkpoint is enable ,  
checkpoint interval is 20s . When I cancel my job,  some TM cancel  success, 
some TM become cenceling and the TM  will be kill by itself  with 
task.cancellation.timeout = 18.  the TM log show that 


org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:114)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
com.operation.ParseLineOperationForAgg.flatMap(ParseLineOperationForAgg.java:74)
 [testFlink-1.0.jar:?]
at 
com.operation.ParseLineOperationForAgg.flatMap(ParseLineOperationForAgg.java:29)
 [testFlink-1.0.jar:?]
at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
 [flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
 [flink-dist_2.11-1.12.4.jar:1.12.4]


Caused by: java.io.IOException: Interrupted while waiting for buffer
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:341)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
 ~[testFlink-1.0.jar:?]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
com.operation.ExtractLineOperationAgg.flatMap(ExtractLineOperationAgg.java:72) 
~[testFlink-1.0.jar:?]
at 
com.operation.ExtractLineOperationAgg.flatMap(ExtractLineOperationAgg.java:28) 
~[testFlink-1.0.jar:?]
at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
 ~[flink-dist_2.11-1.12.4.jar:1.12.4]
... 32 more




My question :


1. what can I do to deal with  this error ? 
2. if I cancel job with savepoint ,  will  this error  affect  savepoint ?




Best !



?????? flink 1.12 Cancel Job??????????(??)

2021-01-05 Thread ??????
flink-sql,with-upsertMemoryStateBackendenv.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)??



----
??: ""

Re: 回复: flink 1.12 Cancel Job内存未释放(问)

2021-01-04 Thread Yun Tang
Hi 徐州州

请查看一下checkpoint UI部分的overview,观察restored部分的是否为空,也就是没有从checkpoint恢复,同样可以观察job 
manager 部分日志,看是否从checkpoint resume。
如果没有从checkpoint/savepoint恢复,作业其实相当于是从头重新跑,除非作业有其他的外部访问,否则不应该有任何历史数据能看到。

祝好
唐云

From: 徐州州 <25977...@qq.com>
Sent: Tuesday, January 5, 2021 10:34
To: user-zh@flink.apache.org 
Subject: 回复: flink 1.12 Cancel Job内存未释放(问)

这是我完整的配置文件,并没有设置任何状态后端,和保存点,任务kill执行的命令是/opt/module/hadoop3.2.1/bin/yarn 
application -kill  jobid,启动命令执行的是,/opt/module/flink1.12/bin/flink run -d -m 
yarn-cluster -yjm 660 -ytm 2500 -ys 3 -yqu xjia_queue -ynm 
App_Bs_Drainage_Launch_200105,我猜想会不会是因为队列的问题,我集群中只有一个queue队列。

-- 原始邮件 --
发件人: "user-zh" ;
发送时间: 2021年1月5日(星期二) 上午10:03
收件人: "user-zh@flink.apache.org";
主题: 回复: flink 1.12 Cancel Job内存未释放(问)

这种情况貌似和检查点、保存点还有状态后端有关,可以排查排查,重新启动任务在昨天的基础上累加这个逻辑是正确的(如果配置了检查点、保存点还有状态后端),只是现在昨天你杀死正在执行的job的时候最后保存的状态结果和你实际的结果不一致


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月5日 09:04,徐州州<25977...@qq.com> 写道:
我一个flink-sql任务,每次隔天计算都会在昨天的计算结果上累加,我使用代码jar的方式提交,代码中设置了MemoryStateBackend,我定时任务是23:57-kill掉正在执行的job,隔天的00:30通过azkaban的启动脚本,重新提交任务执行,可是我发现00:30当天计算的结果,是在昨天的基础上累加的,我查看kill掉任务的那一段时间NodeManager的内存也得到了释放,可是为什么我在00:30的时间点启动,还是会在昨天的基础上累计,而且计算的结果(并没有完全在昨天的基础上累计),比如昨天计算结果1000,今天它可能在900的结果上进行累加。请问这种情况是为什么。试了好多,都没有解决。|insert
 into app_bs_drainage_place
|SELECT
| do.GrouporgName,
| du.Name,
| COUNT(DISTINCT dooi.Code) AS TotalSingular,
|md5(concat(do.GrouporgName,du.Name,cast(current_date as String))) as uuids,
|current_date as As_Of_Date
|FROM dw_od_order_info dooi
|  INNER JOIN dw_worktask_info dwi ON dwi.CustomerId = dooi.CustomerId AND 
dwi.HandlerPersonId = dooi.UserId and dwi.As_Of_Date=current_date
|  INNER JOIN dim_cc_media_placement_label_relation dmplr ON 
dmplr.MediaPlacementId = dwi.PlacementId
|  INNER JOIN dim_cc_media_label dcml ON dmplr.LabelId = dcml.Id AND dcml.Name 
IN ('金装驼奶', '血糖仪')
|  INNER JOIN dim_user du ON dooi.UserId = du.Id
| INNER JOIN dim_org do ON dooi.UserOrgId = do.Grouporgid AND 
left(do.GrouporgName, 2) = '引流'
| WHERE dooi.As_Of_Date=current_date and dooi.Status < 60 AND dooi.Status 
< 120 AND dooi.OrgType = 1
| GROUP BY do.GrouporgName,du.Name




--原始邮件--
发件人: "赵一旦"

?????? flink 1.12 Cancel Job??????????(??)

2021-01-04 Thread ??????
kill/opt/module/hadoop3.2.1/bin/yarn
 application -kill 
jobid/opt/module/flink1.12/bin/flink run -d -m yarn-cluster 
-yjm 660 -ytm 2500 -ys 3 -yqu xjia_queue -ynm 
App_Bs_Drainage_Launch_200105queue??


----
??: 
   "user-zh"



?????? flink 1.12 Cancel Job??????????(??)

2021-01-04 Thread ????
job


| |

|
|
liuha...@163.com
|
??
??2021??1??5?? 09:04<25977...@qq.com> ??
??flink-sqljarMemoryStateBackend??23:57-killjob00:30azkaban00:30??kill??NodeManager??00:301000??900??|insert
 into app_bs_drainage_place
|SELECT
| do.GrouporgName,
| du.Name,
| COUNT(DISTINCT dooi.Code) AS TotalSingular,
|md5(concat(do.GrouporgName,du.Name,cast(current_date as String))) as uuids,
|current_date as As_Of_Date
|FROM dw_od_order_info dooi
|  INNER JOIN dw_worktask_info dwi ON dwi.CustomerId = dooi.CustomerId AND 
dwi.HandlerPersonId = dooi.UserId and dwi.As_Of_Date=current_date
|  INNER JOIN dim_cc_media_placement_label_relation dmplr ON 
dmplr.MediaPlacementId = dwi.PlacementId
|  INNER JOIN dim_cc_media_label dcml ON dmplr.LabelId = dcml.Id AND dcml.Name 
IN ('', '??')
|  INNER JOIN dim_user du ON dooi.UserId = du.Id
| INNER JOIN dim_org do ON dooi.UserOrgId = do.Grouporgid AND 
left(do.GrouporgName, 2) = ''
| WHERE dooi.As_Of_Date=current_date and dooi.Status < 60 AND dooi.Status 
< 120 AND dooi.OrgType = 1
| GROUP BY do.GrouporgName,du.Name




----
??: "??"

Re: flink 1.12 Cancel Job内存未释放(问)

2021-01-04 Thread 赵一旦
具体SQL。其实我没特别明白你表达的问题。
什么叫做释放内存,还有在之前的结果上累加。这2是什么跟什么没啥关系的东西,没听懂你表达啥。
前者是内存,后者反映的状态。如果是基于检查点/保存点重启任务,当然会保留状态,就是继续累加。

徐州州 <25977...@qq.com> 于2021年1月4日周一 上午8:45写道:

> 即使我切换了,yarn-cluster模式,我23:50,通过/opt/module/hadoop3.2.1/bin/yarn
> application -kill
> application_1609656886263_0043,kill掉job,第二天1:30重启,结果还是在昨天的结果上累加的,执行的kill-job好像并不能释放state,这个真的一点办法都没有了吗?
>
>
>
> --原始邮件--
> 发件人: "赵一旦" 发送时间: 2020年12月29日(星期二) 晚上9:35
> 收件人: "user-zh" 主题: Re: flink 1.12 Cancel Job内存未释放(问)
>
>
>
> 不可以吧。任务是任务。taskManager是taskManager。 taskManager是提前启动好的一个进程,任务提交的时候会由
> taskManager 帮你执行。cancel后taskManager继续它自己的事情(比如等新的任务)。
> 或者考虑yarn方式,per-job模式啥的。
>
> 徐州州 <25977...@qq.com 于2020年12月29日周二 上午9:00写道:
>
>  请教一下,我flink
> 
> sql任务Cancel之后,隔一个小时后重启,还是接着Cancel的点进行累加计算的。我在IDEA中开发,代码中没有设置任何Checkpoints,请问我该如何在任务Cancel的时候同时释放掉job所使用的TaskManager内存?


?????? flink 1.12 Cancel Job??????????(??)

2021-01-03 Thread ??????
??yarn-cluster23:50??/opt/module/hadoop3.2.1/bin/yarn 
application -kill 
application_1609656886263_0043??kill??job1:30??kill-job??state??



----
??: "??"

?????? flink 1.12 Cancel Job??????????(??)

2020-12-29 Thread ??????
??StandaloneSessionCluster




----
??: 
   "user-zh"



?????? flink 1.12 Cancel Job??????????(??)

2020-12-29 Thread ??????
IDEA??flink 
sql??job??cancel??job??Checkpoint??Cancel??TaskManager??Solt



----
??: "??"

Re: flink 1.12 Cancel Job内存未释放(问)

2020-12-29 Thread 赵一旦
不可以吧。任务是任务。taskManager是taskManager。  taskManager是提前启动好的一个进程,任务提交的时候会由
taskManager 帮你执行。cancel后taskManager继续它自己的事情(比如等新的任务)。
或者考虑yarn方式,per-job模式啥的。

徐州州 <25977...@qq.com> 于2020年12月29日周二 上午9:00写道:

> 请教一下,我flink
> sql任务Cancel之后,隔一个小时后重启,还是接着Cancel的点进行累加计算的。我在IDEA中开发,代码中没有设置任何Checkpoints,请问我该如何在任务Cancel的时候同时释放掉job所使用的TaskManager内存?


flink 1.12 Cancel Job??????????(??)

2020-12-28 Thread ??????
flink 
sqlCancelCancelIDEA??CheckpointsCanceljobTaskManager??

Re: Flink rest api cancel job

2020-07-21 Thread Fabian Hueske
Hi White,

Can you describe your problem in more detail?

* What is your Flink version?
* How do you deploy the job (application / session cluster), (Kubernetes,
Docker, YARN, ...)
* What kind of job are you running (DataStream, Table/SQL, DataSet)?

Best, Fabian

Am Mo., 20. Juli 2020 um 08:42 Uhr schrieb snack white <
amazingu...@gmail.com>:

> Hi,
>   When I using rest api to cancel my job , the rest 9 TM has been
> canceled quickly , but the other one TM is always cancelling status ,
> someone can show me how can I solve the question .
> Thanks,
> White


Flink rest api cancel job

2020-07-20 Thread snack white
Hi, 
  When I using rest api to cancel my job , the rest 9 TM has been canceled 
quickly , but the other one TM is always cancelling status , someone can show 
me how can I solve the question . 
Thanks,
White 

Re: org.apache.flink.util.FlinkException: Could not cancel job

2018-09-06 Thread Chang Liu
You are correct.  Thanks! I misused the job ID.  Sorry for bothering you guys.  

Best regards,
Chang 

from iPhone

> On 4 Sep 2018, at 18:06, Chesnay Schepler  wrote:
> 
> Please check that the job ID is correct.
> 
>> On 04.09.2018 15:48, Chang Liu wrote:
>> Dear All,
>> 
>> I had the following issue when trying to cancel a job from CLI. I am 
>> wondering am I in the proper way of canceling a job? Or, there is more 
>> elegant way to do this, both in code or in CLI? Many Thanks!
>> 
>> BTW, I am have streaming coming from Kafka and producing to another Kafka 
>> topic.
>> 
>> ./bin/flink cancel b89f45024cf2e45914eaa920df95907f
>> Cancelling job b89f45024cf2e45914eaa920df95907f.
>> 
>> 
>>  The program finished with the following exception:
>> 
>> org.apache.flink.util.FlinkException: Could not cancel job 
>> b89f45024cf2e45914eaa920df95907f.
>> at 
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$5(CliFrontend.java:603)
>> at 
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:955)
>> at 
>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:596)
>> at 
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1029)
>> at 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
>> at 
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at 
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
>> Caused by: java.util.concurrent.ExecutionException: 
>> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
>> complete the operation. Exception is not retryable.
>> at 
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> at 
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> at 
>> org.apache.flink.client.program.rest.RestClusterClient.cancel(RestClusterClient.java:380)
>> at 
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$5(CliFrontend.java:601)
>> ... 6 more
>> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
>> Could not complete the operation. Exception is not retryable.
>> at 
>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214)
>> at 
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>> at 
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>> at 
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at 
>> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>> at 
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>> at 
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.util.concurrent.CompletionException: 
>> org.apache.flink.runtime.rest.util.RestClientException: [Job could not be 
>> found.]
>> at 
>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
>> at 
>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
>> at 
>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
>> at 
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953)
>> at 
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>> ... 4 more
>> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job 
>> could not be found.]
>> at 
>> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225)
>> at 
>> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:209)
>> at 
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>> ... 5 more
>> 
>> 
>> Best regards/祝好,
>> 
>> Chang Liu 刘畅
>> 
> 


Re: org.apache.flink.util.FlinkException: Could not cancel job

2018-09-04 Thread Chesnay Schepler

Please check that the job ID is correct.

On 04.09.2018 15:48, Chang Liu wrote:

Dear All,

I had the following issue when trying to cancel a job from CLI. I am 
wondering am I in the proper way of canceling a job? Or, there is more 
elegant way to do this, both in code or in CLI? Many Thanks!


BTW, I am have streaming coming from Kafka and producing to another 
Kafka topic.


./bin/flink cancel b89f45024cf2e45914eaa920df95907f
Cancelling job b89f45024cf2e45914eaa920df95907f.


 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not cancel job 
b89f45024cf2e45914eaa920df95907f.
at 
org.apache.flink.client.cli.CliFrontend.lambda$cancel$5(CliFrontend.java:603)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:955)
at 
org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:596)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1029)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could 
not complete the operation. Exception is not retryable.
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at 
org.apache.flink.client.program.rest.RestClusterClient.cancel(RestClusterClient.java:380)
at 
org.apache.flink.client.cli.CliFrontend.lambda$cancel$5(CliFrontend.java:601)

... 6 more
Caused by: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could 
not complete the operation. Exception is not retryable.
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.rest.util.RestClientException: [Job could not 
be found.]
at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at 
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)

... 4 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: 
[Job could not be found.]
at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:209)
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)

... 5 more


Best regards/祝好,

Chang Liu 刘畅





org.apache.flink.util.FlinkException: Could not cancel job

2018-09-04 Thread Chang Liu
Dear All,

I had the following issue when trying to cancel a job from CLI. I am
wondering am I in the proper way of canceling a job? Or, there is more
elegant way to do this, both in code or in CLI? Many Thanks!

BTW, I am have streaming coming from Kafka and producing to another Kafka
topic.

./bin/flink cancel b89f45024cf2e45914eaa920df95907f
Cancelling job b89f45024cf2e45914eaa920df95907f.


 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not cancel job
b89f45024cf2e45914eaa920df95907f.
at
org.apache.flink.client.cli.CliFrontend.lambda$cancel$5(CliFrontend.java:603)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:955)
at
org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:596)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1029)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
complete the operation. Exception is not retryable.
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at
org.apache.flink.client.program.rest.RestClusterClient.cancel(RestClusterClient.java:380)
at
org.apache.flink.client.cli.CliFrontend.lambda$cancel$5(CliFrontend.java:601)
... 6 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
Could not complete the operation. Exception is not retryable.
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.rest.util.RestClientException: [Job could not be
found.]
at
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 4 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job
could not be found.]
at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225)
at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:209)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
... 5 more


Best regards/祝好,

Chang Liu 刘畅


Re: Could not cancel job (with savepoint) "Ask timed out"

2018-08-22 Thread Juho Autio
I see, thanks. Looks like it's better for us to switch to triggering
savepoint & cancel separately.

On Wed, Aug 22, 2018 at 1:26 PM Till Rohrmann  wrote:

> Calling cancel-with-savepoint multiple times will trigger multiple
> savepoints. The first issued savepoint will complete first and then cancel
> the job. Thus, the later savepoints might complete or not depending on the
> correct timing. Since savepoint can flush results to external systems, I
> would recommend not calling the API multiple times.
>
> Cheers,
> Till
>
> On Wed, Aug 22, 2018 at 10:40 AM Juho Autio  wrote:
>
>> What I meant to ask was, does it do any harm to keep calling
>> cancel-with-savepoint until the job exits? If the job is already cancelling
>> with savepoint, I would assume that another cancel-with-savepoint call is
>> just ignored.
>>
>> On Tue, Aug 21, 2018 at 1:18 PM Till Rohrmann 
>> wrote:
>>
>>> Just a small addition. Concurrent cancel call will interfere with the
>>> cancel-with-savepoint command and directly cancel the job. So it is better
>>> to use the cancel-with-savepoint call in order to take savepoint and then
>>> cancel the job automatically.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Aug 9, 2018 at 9:53 AM vino yang  wrote:
>>>
 Hi Juho,

 We use REST client API : triggerSavepoint(), this API returns a
 CompletableFuture, then we call it's get() API.

 You can understand that I am waiting for it to complete in sync.
 Because cancelWithSavepoint is actually waiting for savepoint to
 complete synchronization, and then execute the cancel command.

 We do not use CLI. I think since you are through the CLI, you can
 observe whether the savepoint is complete by combining the log or the web
 UI.

 Thanks, vino.


 Juho Autio  于2018年8月9日周四 下午3:07写道:

> Thanks for the suggestion. Is the separate savepoint triggering async?
> Would you then separately poll for the savepoint's completion before
> executing cancel? If additional polling is needed, then I would say that
> for my purpose it's still easier to call cancel with savepoint and simply
> ignore the result of the call. I would assume that it won't do any harm if
> I keep retrying cancel with savepoint until the job stops – I expect that
> an overlapping cancel request is ignored if the job is already creating a
> savepoint. Please correct if my assumption is wrong.
>
> On Thu, Aug 9, 2018 at 5:04 AM vino yang 
> wrote:
>
>> Hi Juho,
>>
>> This problem does exist, I suggest you separate these two steps to
>> temporarily deal with this problem:
>> 1) Trigger Savepoint separately;
>> 2) execute the cancel command;
>>
>> Hi Till, Chesnay:
>>
>> Our internal environment and multiple users on the mailing list have
>> encountered similar problems.
>>
>> In our environment, it seems that JM shows that the save point is
>> complete and JM has stopped itself, but the client will still connect to
>> the old JM and report a timeout exception.
>>
>> Thanks, vino.
>>
>>
>> Juho Autio  于2018年8月8日周三 下午9:18写道:
>>
>>> I was trying to cancel a job with savepoint, but the CLI command
>>> failed with "akka.pattern.AskTimeoutException: Ask timed out".
>>>
>>> The stack trace reveals that ask timeout is 10 seconds:
>>>
>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>> [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms].
>>> Sender[null] sent message of type
>>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>>
>>> Indeed it's documented that the default value
>>> for akka.ask.timeout="10 s" in
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka
>>>
>>> Behind the scenes the savepoint creation & job cancellation
>>> succeeded, that was to be expected, kind of. So my problem is just 
>>> getting
>>> a proper response back from the CLI call instead of timing out so 
>>> eagerly.
>>>
>>> To be exact, what I ran was:
>>>
>>> flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m
>>> yarn-cluster -yid application_1533676784032_0001 --withSavepoint
>>>
>>> Should I change the akka.ask.timeout to have a longer timeout? If
>>> yes, can I override it just for the CLI call somehow? Maybe it might 
>>> have
>>> undesired side-effects if set globally for the actual flink jobs to use?
>>>
>>> What about akka.client.timeout? The default for it is also rather
>>> low: "60 s". Should it also be increased accordingly if I want to accept
>>> longer than 60 s for savepoint creation?
>>>
>>> Finally, that default timeout is so low that I would expect this to
>>> be a common problem. I would say that Flink CLI should have higher 
>>> default

Re: Could not cancel job (with savepoint) "Ask timed out"

2018-08-22 Thread Till Rohrmann
Calling cancel-with-savepoint multiple times will trigger multiple
savepoints. The first issued savepoint will complete first and then cancel
the job. Thus, the later savepoints might complete or not depending on the
correct timing. Since savepoint can flush results to external systems, I
would recommend not calling the API multiple times.

Cheers,
Till

On Wed, Aug 22, 2018 at 10:40 AM Juho Autio  wrote:

> What I meant to ask was, does it do any harm to keep calling
> cancel-with-savepoint until the job exits? If the job is already cancelling
> with savepoint, I would assume that another cancel-with-savepoint call is
> just ignored.
>
> On Tue, Aug 21, 2018 at 1:18 PM Till Rohrmann 
> wrote:
>
>> Just a small addition. Concurrent cancel call will interfere with the
>> cancel-with-savepoint command and directly cancel the job. So it is better
>> to use the cancel-with-savepoint call in order to take savepoint and then
>> cancel the job automatically.
>>
>> Cheers,
>> Till
>>
>> On Thu, Aug 9, 2018 at 9:53 AM vino yang  wrote:
>>
>>> Hi Juho,
>>>
>>> We use REST client API : triggerSavepoint(), this API returns a
>>> CompletableFuture, then we call it's get() API.
>>>
>>> You can understand that I am waiting for it to complete in sync.
>>> Because cancelWithSavepoint is actually waiting for savepoint to
>>> complete synchronization, and then execute the cancel command.
>>>
>>> We do not use CLI. I think since you are through the CLI, you can
>>> observe whether the savepoint is complete by combining the log or the web
>>> UI.
>>>
>>> Thanks, vino.
>>>
>>>
>>> Juho Autio  于2018年8月9日周四 下午3:07写道:
>>>
 Thanks for the suggestion. Is the separate savepoint triggering async?
 Would you then separately poll for the savepoint's completion before
 executing cancel? If additional polling is needed, then I would say that
 for my purpose it's still easier to call cancel with savepoint and simply
 ignore the result of the call. I would assume that it won't do any harm if
 I keep retrying cancel with savepoint until the job stops – I expect that
 an overlapping cancel request is ignored if the job is already creating a
 savepoint. Please correct if my assumption is wrong.

 On Thu, Aug 9, 2018 at 5:04 AM vino yang  wrote:

> Hi Juho,
>
> This problem does exist, I suggest you separate these two steps to
> temporarily deal with this problem:
> 1) Trigger Savepoint separately;
> 2) execute the cancel command;
>
> Hi Till, Chesnay:
>
> Our internal environment and multiple users on the mailing list have
> encountered similar problems.
>
> In our environment, it seems that JM shows that the save point is
> complete and JM has stopped itself, but the client will still connect to
> the old JM and report a timeout exception.
>
> Thanks, vino.
>
>
> Juho Autio  于2018年8月8日周三 下午9:18写道:
>
>> I was trying to cancel a job with savepoint, but the CLI command
>> failed with "akka.pattern.AskTimeoutException: Ask timed out".
>>
>> The stack trace reveals that ask timeout is 10 seconds:
>>
>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms].
>> Sender[null] sent message of type
>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>
>> Indeed it's documented that the default value
>> for akka.ask.timeout="10 s" in
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka
>>
>> Behind the scenes the savepoint creation & job cancellation
>> succeeded, that was to be expected, kind of. So my problem is just 
>> getting
>> a proper response back from the CLI call instead of timing out so 
>> eagerly.
>>
>> To be exact, what I ran was:
>>
>> flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m
>> yarn-cluster -yid application_1533676784032_0001 --withSavepoint
>>
>> Should I change the akka.ask.timeout to have a longer timeout? If
>> yes, can I override it just for the CLI call somehow? Maybe it might have
>> undesired side-effects if set globally for the actual flink jobs to use?
>>
>> What about akka.client.timeout? The default for it is also rather
>> low: "60 s". Should it also be increased accordingly if I want to accept
>> longer than 60 s for savepoint creation?
>>
>> Finally, that default timeout is so low that I would expect this to
>> be a common problem. I would say that Flink CLI should have higher 
>> default
>> timeout for cancel and savepoint creation ops.
>>
>> Thanks!
>>
>

>


Re: Could not cancel job (with savepoint) "Ask timed out"

2018-08-22 Thread Juho Autio
What I meant to ask was, does it do any harm to keep calling
cancel-with-savepoint until the job exits? If the job is already cancelling
with savepoint, I would assume that another cancel-with-savepoint call is
just ignored.

On Tue, Aug 21, 2018 at 1:18 PM Till Rohrmann  wrote:

> Just a small addition. Concurrent cancel call will interfere with the
> cancel-with-savepoint command and directly cancel the job. So it is better
> to use the cancel-with-savepoint call in order to take savepoint and then
> cancel the job automatically.
>
> Cheers,
> Till
>
> On Thu, Aug 9, 2018 at 9:53 AM vino yang  wrote:
>
>> Hi Juho,
>>
>> We use REST client API : triggerSavepoint(), this API returns a
>> CompletableFuture, then we call it's get() API.
>>
>> You can understand that I am waiting for it to complete in sync.
>> Because cancelWithSavepoint is actually waiting for savepoint to complete
>> synchronization, and then execute the cancel command.
>>
>> We do not use CLI. I think since you are through the CLI, you can observe
>> whether the savepoint is complete by combining the log or the web UI.
>>
>> Thanks, vino.
>>
>>
>> Juho Autio  于2018年8月9日周四 下午3:07写道:
>>
>>> Thanks for the suggestion. Is the separate savepoint triggering async?
>>> Would you then separately poll for the savepoint's completion before
>>> executing cancel? If additional polling is needed, then I would say that
>>> for my purpose it's still easier to call cancel with savepoint and simply
>>> ignore the result of the call. I would assume that it won't do any harm if
>>> I keep retrying cancel with savepoint until the job stops – I expect that
>>> an overlapping cancel request is ignored if the job is already creating a
>>> savepoint. Please correct if my assumption is wrong.
>>>
>>> On Thu, Aug 9, 2018 at 5:04 AM vino yang  wrote:
>>>
 Hi Juho,

 This problem does exist, I suggest you separate these two steps to
 temporarily deal with this problem:
 1) Trigger Savepoint separately;
 2) execute the cancel command;

 Hi Till, Chesnay:

 Our internal environment and multiple users on the mailing list have
 encountered similar problems.

 In our environment, it seems that JM shows that the save point is
 complete and JM has stopped itself, but the client will still connect to
 the old JM and report a timeout exception.

 Thanks, vino.


 Juho Autio  于2018年8月8日周三 下午9:18写道:

> I was trying to cancel a job with savepoint, but the CLI command
> failed with "akka.pattern.AskTimeoutException: Ask timed out".
>
> The stack trace reveals that ask timeout is 10 seconds:
>
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>
> Indeed it's documented that the default value for akka.ask.timeout="10
> s" in
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka
>
> Behind the scenes the savepoint creation & job cancellation succeeded,
> that was to be expected, kind of. So my problem is just getting a proper
> response back from the CLI call instead of timing out so eagerly.
>
> To be exact, what I ran was:
>
> flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m
> yarn-cluster -yid application_1533676784032_0001 --withSavepoint
>
> Should I change the akka.ask.timeout to have a longer timeout? If yes,
> can I override it just for the CLI call somehow? Maybe it might have
> undesired side-effects if set globally for the actual flink jobs to use?
>
> What about akka.client.timeout? The default for it is also rather
> low: "60 s". Should it also be increased accordingly if I want to accept
> longer than 60 s for savepoint creation?
>
> Finally, that default timeout is so low that I would expect this to be
> a common problem. I would say that Flink CLI should have higher default
> timeout for cancel and savepoint creation ops.
>
> Thanks!
>

>>>


Re: Could not cancel job (with savepoint) "Ask timed out"

2018-08-21 Thread Till Rohrmann
Just a small addition. Concurrent cancel call will interfere with the
cancel-with-savepoint command and directly cancel the job. So it is better
to use the cancel-with-savepoint call in order to take savepoint and then
cancel the job automatically.

Cheers,
Till

On Thu, Aug 9, 2018 at 9:53 AM vino yang  wrote:

> Hi Juho,
>
> We use REST client API : triggerSavepoint(), this API returns a
> CompletableFuture, then we call it's get() API.
>
> You can understand that I am waiting for it to complete in sync.
> Because cancelWithSavepoint is actually waiting for savepoint to complete
> synchronization, and then execute the cancel command.
>
> We do not use CLI. I think since you are through the CLI, you can observe
> whether the savepoint is complete by combining the log or the web UI.
>
> Thanks, vino.
>
>
> Juho Autio  于2018年8月9日周四 下午3:07写道:
>
>> Thanks for the suggestion. Is the separate savepoint triggering async?
>> Would you then separately poll for the savepoint's completion before
>> executing cancel? If additional polling is needed, then I would say that
>> for my purpose it's still easier to call cancel with savepoint and simply
>> ignore the result of the call. I would assume that it won't do any harm if
>> I keep retrying cancel with savepoint until the job stops – I expect that
>> an overlapping cancel request is ignored if the job is already creating a
>> savepoint. Please correct if my assumption is wrong.
>>
>> On Thu, Aug 9, 2018 at 5:04 AM vino yang  wrote:
>>
>>> Hi Juho,
>>>
>>> This problem does exist, I suggest you separate these two steps to
>>> temporarily deal with this problem:
>>> 1) Trigger Savepoint separately;
>>> 2) execute the cancel command;
>>>
>>> Hi Till, Chesnay:
>>>
>>> Our internal environment and multiple users on the mailing list have
>>> encountered similar problems.
>>>
>>> In our environment, it seems that JM shows that the save point is
>>> complete and JM has stopped itself, but the client will still connect to
>>> the old JM and report a timeout exception.
>>>
>>> Thanks, vino.
>>>
>>>
>>> Juho Autio  于2018年8月8日周三 下午9:18写道:
>>>
 I was trying to cancel a job with savepoint, but the CLI command failed
 with "akka.pattern.AskTimeoutException: Ask timed out".

 The stack trace reveals that ask timeout is 10 seconds:

 Caused by: akka.pattern.AskTimeoutException: Ask timed out on
 [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms].
 Sender[null] sent message of type
 "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

 Indeed it's documented that the default value for akka.ask.timeout="10
 s" in

 https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka

 Behind the scenes the savepoint creation & job cancellation succeeded,
 that was to be expected, kind of. So my problem is just getting a proper
 response back from the CLI call instead of timing out so eagerly.

 To be exact, what I ran was:

 flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m
 yarn-cluster -yid application_1533676784032_0001 --withSavepoint

 Should I change the akka.ask.timeout to have a longer timeout? If yes,
 can I override it just for the CLI call somehow? Maybe it might have
 undesired side-effects if set globally for the actual flink jobs to use?

 What about akka.client.timeout? The default for it is also rather
 low: "60 s". Should it also be increased accordingly if I want to accept
 longer than 60 s for savepoint creation?

 Finally, that default timeout is so low that I would expect this to be
 a common problem. I would say that Flink CLI should have higher default
 timeout for cancel and savepoint creation ops.

 Thanks!

>>>
>>


Re: Could not cancel job (with savepoint) "Ask timed out"

2018-08-09 Thread vino yang
Hi Juho,

We use REST client API : triggerSavepoint(), this API returns a
CompletableFuture, then we call it's get() API.

You can understand that I am waiting for it to complete in sync.
Because cancelWithSavepoint is actually waiting for savepoint to complete
synchronization, and then execute the cancel command.

We do not use CLI. I think since you are through the CLI, you can observe
whether the savepoint is complete by combining the log or the web UI.

Thanks, vino.


Juho Autio  于2018年8月9日周四 下午3:07写道:

> Thanks for the suggestion. Is the separate savepoint triggering async?
> Would you then separately poll for the savepoint's completion before
> executing cancel? If additional polling is needed, then I would say that
> for my purpose it's still easier to call cancel with savepoint and simply
> ignore the result of the call. I would assume that it won't do any harm if
> I keep retrying cancel with savepoint until the job stops – I expect that
> an overlapping cancel request is ignored if the job is already creating a
> savepoint. Please correct if my assumption is wrong.
>
> On Thu, Aug 9, 2018 at 5:04 AM vino yang  wrote:
>
>> Hi Juho,
>>
>> This problem does exist, I suggest you separate these two steps to
>> temporarily deal with this problem:
>> 1) Trigger Savepoint separately;
>> 2) execute the cancel command;
>>
>> Hi Till, Chesnay:
>>
>> Our internal environment and multiple users on the mailing list have
>> encountered similar problems.
>>
>> In our environment, it seems that JM shows that the save point is
>> complete and JM has stopped itself, but the client will still connect to
>> the old JM and report a timeout exception.
>>
>> Thanks, vino.
>>
>>
>> Juho Autio  于2018年8月8日周三 下午9:18写道:
>>
>>> I was trying to cancel a job with savepoint, but the CLI command failed
>>> with "akka.pattern.AskTimeoutException: Ask timed out".
>>>
>>> The stack trace reveals that ask timeout is 10 seconds:
>>>
>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>> [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms].
>>> Sender[null] sent message of type
>>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>>
>>> Indeed it's documented that the default value for akka.ask.timeout="10
>>> s" in
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka
>>>
>>> Behind the scenes the savepoint creation & job cancellation succeeded,
>>> that was to be expected, kind of. So my problem is just getting a proper
>>> response back from the CLI call instead of timing out so eagerly.
>>>
>>> To be exact, what I ran was:
>>>
>>> flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m
>>> yarn-cluster -yid application_1533676784032_0001 --withSavepoint
>>>
>>> Should I change the akka.ask.timeout to have a longer timeout? If yes,
>>> can I override it just for the CLI call somehow? Maybe it might have
>>> undesired side-effects if set globally for the actual flink jobs to use?
>>>
>>> What about akka.client.timeout? The default for it is also rather
>>> low: "60 s". Should it also be increased accordingly if I want to accept
>>> longer than 60 s for savepoint creation?
>>>
>>> Finally, that default timeout is so low that I would expect this to be a
>>> common problem. I would say that Flink CLI should have higher default
>>> timeout for cancel and savepoint creation ops.
>>>
>>> Thanks!
>>>
>>
>


Re: Could not cancel job (with savepoint) "Ask timed out"

2018-08-09 Thread Juho Autio
Thanks for the suggestion. Is the separate savepoint triggering async?
Would you then separately poll for the savepoint's completion before
executing cancel? If additional polling is needed, then I would say that
for my purpose it's still easier to call cancel with savepoint and simply
ignore the result of the call. I would assume that it won't do any harm if
I keep retrying cancel with savepoint until the job stops – I expect that
an overlapping cancel request is ignored if the job is already creating a
savepoint. Please correct if my assumption is wrong.

On Thu, Aug 9, 2018 at 5:04 AM vino yang  wrote:

> Hi Juho,
>
> This problem does exist, I suggest you separate these two steps to
> temporarily deal with this problem:
> 1) Trigger Savepoint separately;
> 2) execute the cancel command;
>
> Hi Till, Chesnay:
>
> Our internal environment and multiple users on the mailing list have
> encountered similar problems.
>
> In our environment, it seems that JM shows that the save point is complete
> and JM has stopped itself, but the client will still connect to the old JM
> and report a timeout exception.
>
> Thanks, vino.
>
>
> Juho Autio  于2018年8月8日周三 下午9:18写道:
>
>> I was trying to cancel a job with savepoint, but the CLI command failed
>> with "akka.pattern.AskTimeoutException: Ask timed out".
>>
>> The stack trace reveals that ask timeout is 10 seconds:
>>
>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms].
>> Sender[null] sent message of type
>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>
>> Indeed it's documented that the default value for akka.ask.timeout="10
>> s" in
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka
>>
>> Behind the scenes the savepoint creation & job cancellation succeeded,
>> that was to be expected, kind of. So my problem is just getting a proper
>> response back from the CLI call instead of timing out so eagerly.
>>
>> To be exact, what I ran was:
>>
>> flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m
>> yarn-cluster -yid application_1533676784032_0001 --withSavepoint
>>
>> Should I change the akka.ask.timeout to have a longer timeout? If yes,
>> can I override it just for the CLI call somehow? Maybe it might have
>> undesired side-effects if set globally for the actual flink jobs to use?
>>
>> What about akka.client.timeout? The default for it is also rather
>> low: "60 s". Should it also be increased accordingly if I want to accept
>> longer than 60 s for savepoint creation?
>>
>> Finally, that default timeout is so low that I would expect this to be a
>> common problem. I would say that Flink CLI should have higher default
>> timeout for cancel and savepoint creation ops.
>>
>> Thanks!
>>
>


Re: Could not cancel job (with savepoint) "Ask timed out"

2018-08-08 Thread vino yang
Hi Juho,

This problem does exist, I suggest you separate these two steps to
temporarily deal with this problem:
1) Trigger Savepoint separately;
2) execute the cancel command;

Hi Till, Chesnay:

Our internal environment and multiple users on the mailing list have
encountered similar problems.

In our environment, it seems that JM shows that the save point is complete
and JM has stopped itself, but the client will still connect to the old JM
and report a timeout exception.

Thanks, vino.


Juho Autio  于2018年8月8日周三 下午9:18写道:

> I was trying to cancel a job with savepoint, but the CLI command failed
> with "akka.pattern.AskTimeoutException: Ask timed out".
>
> The stack trace reveals that ask timeout is 10 seconds:
>
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>
> Indeed it's documented that the default value for akka.ask.timeout="10
> s" in
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka
>
> Behind the scenes the savepoint creation & job cancellation succeeded,
> that was to be expected, kind of. So my problem is just getting a proper
> response back from the CLI call instead of timing out so eagerly.
>
> To be exact, what I ran was:
>
> flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m
> yarn-cluster -yid application_1533676784032_0001 --withSavepoint
>
> Should I change the akka.ask.timeout to have a longer timeout? If yes, can
> I override it just for the CLI call somehow? Maybe it might have undesired
> side-effects if set globally for the actual flink jobs to use?
>
> What about akka.client.timeout? The default for it is also rather low: "60
> s". Should it also be increased accordingly if I want to accept longer than
> 60 s for savepoint creation?
>
> Finally, that default timeout is so low that I would expect this to be a
> common problem. I would say that Flink CLI should have higher default
> timeout for cancel and savepoint creation ops.
>
> Thanks!
>


Could not cancel job (with savepoint) "Ask timed out"

2018-08-08 Thread Juho Autio
I was trying to cancel a job with savepoint, but the CLI command failed
with "akka.pattern.AskTimeoutException: Ask timed out".

The stack trace reveals that ask timeout is 10 seconds:

Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

Indeed it's documented that the default value for akka.ask.timeout="10 s" in
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka

Behind the scenes the savepoint creation & job cancellation succeeded, that
was to be expected, kind of. So my problem is just getting a proper
response back from the CLI call instead of timing out so eagerly.

To be exact, what I ran was:

flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m
yarn-cluster -yid application_1533676784032_0001 --withSavepoint

Should I change the akka.ask.timeout to have a longer timeout? If yes, can
I override it just for the CLI call somehow? Maybe it might have undesired
side-effects if set globally for the actual flink jobs to use?

What about akka.client.timeout? The default for it is also rather low: "60
s". Should it also be increased accordingly if I want to accept longer than
60 s for savepoint creation?

Finally, that default timeout is so low that I would expect this to be a
common problem. I would say that Flink CLI should have higher default
timeout for cancel and savepoint creation ops.

Thanks!


Re: Cannot cancel job with savepoint due to timeout

2017-02-02 Thread Till Rohrmann
Hi Bruno,

the lacking documentation for akka.client.timeout is an oversight on our
part [1]. I'll update it asap.

Unfortunately, at the moment there is no other way than to specify the
akka.client.timeout in the flink-conf.yaml file.

[1] https://issues.apache.org/jira/browse/FLINK-5700

Cheers,
Till

On Wed, Feb 1, 2017 at 9:47 AM, Bruno Aranda  wrote:

> Maybe, though it could be good to be able to override in the command line
> somehow, though I guess I could just change the flink config.
>
> Many thanks Yuri,
>
> Bruno
>
> On Wed, 1 Feb 2017 at 07:40 Yury Ruchin  wrote:
>
>> Hi Bruno,
>>
>> From the code I conclude that "akka.client.timeout" setting is what
>> affects this. It defaults to 60 seconds.
>>
>> I'm not sure why this setting is not documented though as well as many
>> other "akka.*" settings - maybe there are some good reasons behind.
>>
>> Regards,
>> Yury
>>
>> 2017-01-31 17:47 GMT+03:00 Bruno Aranda :
>>
>> Hi there,
>>
>> I am trying to cancel a job and create a savepoint (ie flink cancel -s)
>> but it takes more than a minute to do that and then it fails due to the
>> timeout. However, it seems that the job will be cancelled successfully and
>> the savepoint made, but I can only see that through the dasboard.
>>
>> Cancelling job 790b60a2b44bc98854782d4e0cac05d5 with savepoint to
>> default savepoint directory.
>>
>> 
>>  The program finished with the following exception:
>>
>> java.util.concurrent.TimeoutException: Futures timed out after [6
>> milliseconds]
>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
>> at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(
>> BlockContext.scala:53)
>> at scala.concurrent.Await$.result(package.scala:190)
>> at scala.concurrent.Await.result(package.scala)
>> at org.apache.flink.client.CliFrontend.cancel(CliFrontend.java:618)
>> at org.apache.flink.client.CliFrontend.parseParameters(
>> CliFrontend.java:1079)
>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
>> at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(
>> HadoopSecurityContext.java:43)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at org.apache.hadoop.security.UserGroupInformation.doAs(
>> UserGroupInformation.java:1698)
>> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(
>> HadoopSecurityContext.java:40)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1117)
>>
>> Is there any way to configure this timeout? So we can depend on the
>> outcome of this execution for scripts, etc.
>>
>> Thanks!
>>
>> Bruno
>>
>>
>>


Re: Cannot cancel job with savepoint due to timeout

2017-02-01 Thread Bruno Aranda
Maybe, though it could be good to be able to override in the command line
somehow, though I guess I could just change the flink config.

Many thanks Yuri,

Bruno

On Wed, 1 Feb 2017 at 07:40 Yury Ruchin  wrote:

> Hi Bruno,
>
> From the code I conclude that "akka.client.timeout" setting is what
> affects this. It defaults to 60 seconds.
>
> I'm not sure why this setting is not documented though as well as many
> other "akka.*" settings - maybe there are some good reasons behind.
>
> Regards,
> Yury
>
> 2017-01-31 17:47 GMT+03:00 Bruno Aranda :
>
> Hi there,
>
> I am trying to cancel a job and create a savepoint (ie flink cancel -s)
> but it takes more than a minute to do that and then it fails due to the
> timeout. However, it seems that the job will be cancelled successfully and
> the savepoint made, but I can only see that through the dasboard.
>
> Cancelling job 790b60a2b44bc98854782d4e0cac05d5 with savepoint to default
> savepoint directory.
>
> 
>  The program finished with the following exception:
>
> java.util.concurrent.TimeoutException: Futures timed out after [6
> milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:190)
> at scala.concurrent.Await.result(package.scala)
> at org.apache.flink.client.CliFrontend.cancel(CliFrontend.java:618)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1079)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1117)
>
> Is there any way to configure this timeout? So we can depend on the
> outcome of this execution for scripts, etc.
>
> Thanks!
>
> Bruno
>
>
>


Re: Cannot cancel job with savepoint due to timeout

2017-01-31 Thread Yury Ruchin
Hi Bruno,

>From the code I conclude that "akka.client.timeout" setting is what affects
this. It defaults to 60 seconds.

I'm not sure why this setting is not documented though as well as many
other "akka.*" settings - maybe there are some good reasons behind.

Regards,
Yury

2017-01-31 17:47 GMT+03:00 Bruno Aranda :

> Hi there,
>
> I am trying to cancel a job and create a savepoint (ie flink cancel -s)
> but it takes more than a minute to do that and then it fails due to the
> timeout. However, it seems that the job will be cancelled successfully and
> the savepoint made, but I can only see that through the dasboard.
>
> Cancelling job 790b60a2b44bc98854782d4e0cac05d5 with savepoint to default
> savepoint directory.
>
> 
>  The program finished with the following exception:
>
> java.util.concurrent.TimeoutException: Futures timed out after [6
> milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(
> BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:190)
> at scala.concurrent.Await.result(package.scala)
> at org.apache.flink.client.CliFrontend.cancel(CliFrontend.java:618)
> at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1079)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
> at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(
> HadoopSecurityContext.java:43)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1698)
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(
> HadoopSecurityContext.java:40)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1117)
>
> Is there any way to configure this timeout? So we can depend on the
> outcome of this execution for scripts, etc.
>
> Thanks!
>
> Bruno
>


Re: Diff between stop and cancel job

2016-05-09 Thread Ufuk Celebi
On Thu, May 5, 2016 at 1:59 AM, Bajaj, Abhinav 
wrote:

> Or can we resume a stopped streaming job ?


You can use savepoints [1] to take a snapshot of a streaming program from
which you can restart the job at a later point in time. This is independent
of whether you cancel or stop the program after taking the savepoint.

Currently, you can effectively only use savepoint + cancel.

Stopping is currently only implemented for the TwitterSource. I expect this
to be implemented for other more common sources like Kafka as well in
future versions. Futhermore, I expect a convenience feature like
stop-and-savepoint, which  gracefully stops your streaming program and
automatically takes a savepoint. But as of now, I probably need to go with
savepoint + cancel.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html


Re: Cancel Job

2016-01-18 Thread Matthias J. Sax
Hi,

currently, messaged in flight will be dropped if a streaming job gets
canceled.

There is already WIP to add a STOP signal which allows for a clean
shutdown of a streaming job. This should get merged soon and will be
available in Flink 1.0.

You can follow the JIRA an PR here:
https://issues.apache.org/jira/browse/FLINK-2111
https://github.com/apache/flink/pull/750

-Matthias


On 01/18/2016 08:26 PM, Don Frascuchon wrote:
> Hi,
> 
> When some streaming job is manually canceled, what's about the messages
> in process ? Flink's engine wait to task finish process  messages inside
> (some like apache-storm) ? If not, there is a safe way for stop
> streaming jobs ?
> 
> Thanks in advance!
> Best regards



signature.asc
Description: OpenPGP digital signature


Re: Cancel Job

2016-01-18 Thread Don Frascuchon
Thanks Matthias !

El lun., 18 ene. 2016 a las 20:51, Matthias J. Sax ()
escribió:

> Hi,
>
> currently, messaged in flight will be dropped if a streaming job gets
> canceled.
>
> There is already WIP to add a STOP signal which allows for a clean
> shutdown of a streaming job. This should get merged soon and will be
> available in Flink 1.0.
>
> You can follow the JIRA an PR here:
> https://issues.apache.org/jira/browse/FLINK-2111
> https://github.com/apache/flink/pull/750
>
> -Matthias
>
>
> On 01/18/2016 08:26 PM, Don Frascuchon wrote:
> > Hi,
> >
> > When some streaming job is manually canceled, what's about the messages
> > in process ? Flink's engine wait to task finish process  messages inside
> > (some like apache-storm) ? If not, there is a safe way for stop
> > streaming jobs ?
> >
> > Thanks in advance!
> > Best regards
>
>