[jira] [Commented] (FLINK-20992) Checkpoint cleanup can kill JobMaster

2021-01-20 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268495#comment-17268495
 ] 

Dawid Wysakowicz commented on FLINK-20992:
--

I'm afraid this change broke the 1.12 branch: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12274=results

> Checkpoint cleanup can kill JobMaster
> -
>
> Key: FLINK-20992
> URL: https://issues.apache.org/jira/browse/FLINK-20992
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.0
>Reporter: Till Rohrmann
>Assignee: Roman Khachatryan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.2
>
>
> A user reported that cancelling a job can lead to an uncaught exception which 
> kills the {{JobMaster}}. The problem seems to be that the 
> {{CheckpointsCleaner}} might trigger {{CheckpointCoordinator}} actions after 
> the job has reached a terminal state and, thus, is shut down. Apparently, we 
> do not properly manage the lifecycles of {{CheckpointCoordinator}} and 
> checkpoint post clean up actions.
> The uncaught exception is 
> {code}
> java.util.concurrent.RejectedExecutionException: Task 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407 
> rejected from 
> java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated, pool 
> size = 0, active threads = 0, queued tasks = 0, completed tasks = 25977] at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063
>  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622
>  at 
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668
>  at 
> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624
>  at java.lang.Thread.run(Thread.java:748 undefined)
> {code}
> cc [~roman_khachatryan].
> https://lists.apache.org/thread.html/r75901008d88163560aabb8ab6fc458cd9d4f19076e03ae85e00f9a3a%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20992) Checkpoint cleanup can kill JobMaster

2021-01-20 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268491#comment-17268491
 ] 

Roman Khachatryan commented on FLINK-20992:
---

I see. I created FLINK-21053 where I described both approaches.

> Checkpoint cleanup can kill JobMaster
> -
>
> Key: FLINK-20992
> URL: https://issues.apache.org/jira/browse/FLINK-20992
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.0
>Reporter: Till Rohrmann
>Assignee: Roman Khachatryan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.2
>
>
> A user reported that cancelling a job can lead to an uncaught exception which 
> kills the {{JobMaster}}. The problem seems to be that the 
> {{CheckpointsCleaner}} might trigger {{CheckpointCoordinator}} actions after 
> the job has reached a terminal state and, thus, is shut down. Apparently, we 
> do not properly manage the lifecycles of {{CheckpointCoordinator}} and 
> checkpoint post clean up actions.
> The uncaught exception is 
> {code}
> java.util.concurrent.RejectedExecutionException: Task 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407 
> rejected from 
> java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated, pool 
> size = 0, active threads = 0, queued tasks = 0, completed tasks = 25977] at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063
>  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622
>  at 
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668
>  at 
> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624
>  at java.lang.Thread.run(Thread.java:748 undefined)
> {code}
> cc [~roman_khachatryan].
> https://lists.apache.org/thread.html/r75901008d88163560aabb8ab6fc458cd9d4f19076e03ae85e00f9a3a%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20992) Checkpoint cleanup can kill JobMaster

2021-01-19 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268019#comment-17268019
 ] 

Till Rohrmann commented on FLINK-20992:
---

Yes, I am also not saying that we should change it. I just wanted to document 
that there would also be a different solution approach.

> Checkpoint cleanup can kill JobMaster
> -
>
> Key: FLINK-20992
> URL: https://issues.apache.org/jira/browse/FLINK-20992
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.0
>Reporter: Till Rohrmann
>Assignee: Roman Khachatryan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.2
>
>
> A user reported that cancelling a job can lead to an uncaught exception which 
> kills the {{JobMaster}}. The problem seems to be that the 
> {{CheckpointsCleaner}} might trigger {{CheckpointCoordinator}} actions after 
> the job has reached a terminal state and, thus, is shut down. Apparently, we 
> do not properly manage the lifecycles of {{CheckpointCoordinator}} and 
> checkpoint post clean up actions.
> The uncaught exception is 
> {code}
> java.util.concurrent.RejectedExecutionException: Task 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407 
> rejected from 
> java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated, pool 
> size = 0, active threads = 0, queued tasks = 0, completed tasks = 25977] at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063
>  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622
>  at 
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668
>  at 
> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624
>  at java.lang.Thread.run(Thread.java:748 undefined)
> {code}
> cc [~roman_khachatryan].
> https://lists.apache.org/thread.html/r75901008d88163560aabb8ab6fc458cd9d4f19076e03ae85e00f9a3a%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20992) Checkpoint cleanup can kill JobMaster

2021-01-19 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267951#comment-17267951
 ] 

Roman Khachatryan commented on FLINK-20992:
---

I see what you're saying. But the current code mostly uses async versions. For 
example, in CheckpointCoordinator.startTriggeringCheckpoint there are 4 
submissions of future callbacks using timer.

> Checkpoint cleanup can kill JobMaster
> -
>
> Key: FLINK-20992
> URL: https://issues.apache.org/jira/browse/FLINK-20992
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.0
>Reporter: Till Rohrmann
>Assignee: Roman Khachatryan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.2
>
>
> A user reported that cancelling a job can lead to an uncaught exception which 
> kills the {{JobMaster}}. The problem seems to be that the 
> {{CheckpointsCleaner}} might trigger {{CheckpointCoordinator}} actions after 
> the job has reached a terminal state and, thus, is shut down. Apparently, we 
> do not properly manage the lifecycles of {{CheckpointCoordinator}} and 
> checkpoint post clean up actions.
> The uncaught exception is 
> {code}
> java.util.concurrent.RejectedExecutionException: Task 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407 
> rejected from 
> java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated, pool 
> size = 0, active threads = 0, queued tasks = 0, completed tasks = 25977] at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063
>  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622
>  at 
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668
>  at 
> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624
>  at java.lang.Thread.run(Thread.java:748 undefined)
> {code}
> cc [~roman_khachatryan].
> https://lists.apache.org/thread.html/r75901008d88163560aabb8ab6fc458cd9d4f19076e03ae85e00f9a3a%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20992) Checkpoint cleanup can kill JobMaster

2021-01-19 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267853#comment-17267853
 ] 

Till Rohrmann commented on FLINK-20992:
---

If you call {{handleAsync}}, then this is the case. However, if you use

{code}
coordinatorCheckpointsComplete.handle(x -> {
if (executor.isShutdown()) { // too late
// ...
}
});
{code}

then you are running in the thread which completed the 
{{coordinatorCheckpointsComplete}}. That's what I meant with using {{handle}}.

> Checkpoint cleanup can kill JobMaster
> -
>
> Key: FLINK-20992
> URL: https://issues.apache.org/jira/browse/FLINK-20992
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.0
>Reporter: Till Rohrmann
>Assignee: Roman Khachatryan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.2
>
>
> A user reported that cancelling a job can lead to an uncaught exception which 
> kills the {{JobMaster}}. The problem seems to be that the 
> {{CheckpointsCleaner}} might trigger {{CheckpointCoordinator}} actions after 
> the job has reached a terminal state and, thus, is shut down. Apparently, we 
> do not properly manage the lifecycles of {{CheckpointCoordinator}} and 
> checkpoint post clean up actions.
> The uncaught exception is 
> {code}
> java.util.concurrent.RejectedExecutionException: Task 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407 
> rejected from 
> java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated, pool 
> size = 0, active threads = 0, queued tasks = 0, completed tasks = 25977] at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063
>  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622
>  at 
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668
>  at 
> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624
>  at java.lang.Thread.run(Thread.java:748 undefined)
> {code}
> cc [~roman_khachatryan].
> https://lists.apache.org/thread.html/r75901008d88163560aabb8ab6fc458cd9d4f19076e03ae85e00f9a3a%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20992) Checkpoint cleanup can kill JobMaster

2021-01-19 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267826#comment-17267826
 ] 

Roman Khachatryan commented on FLINK-20992:
---

{quote}one could use a handle call in which one first checks whether the 
CheckepointCoordinator is still running
{quote}
But checking *inside* handle(Async) means checking inside an already 
*submitted* action - i.e. too late:
{code:java}
coordinatorCheckpointsComplete.handleAsync(x -> {
if (executor.isShutdown()) { // too late
// ...
}
} , timer);
{code}
 Or am I missing something?

> Checkpoint cleanup can kill JobMaster
> -
>
> Key: FLINK-20992
> URL: https://issues.apache.org/jira/browse/FLINK-20992
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.0
>Reporter: Till Rohrmann
>Assignee: Roman Khachatryan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.2
>
>
> A user reported that cancelling a job can lead to an uncaught exception which 
> kills the {{JobMaster}}. The problem seems to be that the 
> {{CheckpointsCleaner}} might trigger {{CheckpointCoordinator}} actions after 
> the job has reached a terminal state and, thus, is shut down. Apparently, we 
> do not properly manage the lifecycles of {{CheckpointCoordinator}} and 
> checkpoint post clean up actions.
> The uncaught exception is 
> {code}
> java.util.concurrent.RejectedExecutionException: Task 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407 
> rejected from 
> java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated, pool 
> size = 0, active threads = 0, queued tasks = 0, completed tasks = 25977] at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063
>  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622
>  at 
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668
>  at 
> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624
>  at java.lang.Thread.run(Thread.java:748 undefined)
> {code}
> cc [~roman_khachatryan].
> https://lists.apache.org/thread.html/r75901008d88163560aabb8ab6fc458cd9d4f19076e03ae85e00f9a3a%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20992) Checkpoint cleanup can kill JobMaster

2021-01-19 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267775#comment-17267775
 ] 

Till Rohrmann commented on FLINK-20992:
---

I don't think that FLINK-18290 is impossible to solve differently 
[~roman_khachatryan]. For example, one could use a {{handle}} call in which one 
first checks whether the {{CheckepointCoordinator}} is still running before 
enqueuing a new runnable into the {{timer}} {{Executor}}.

> Checkpoint cleanup can kill JobMaster
> -
>
> Key: FLINK-20992
> URL: https://issues.apache.org/jira/browse/FLINK-20992
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.0
>Reporter: Till Rohrmann
>Assignee: Roman Khachatryan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.2
>
>
> A user reported that cancelling a job can lead to an uncaught exception which 
> kills the {{JobMaster}}. The problem seems to be that the 
> {{CheckpointsCleaner}} might trigger {{CheckpointCoordinator}} actions after 
> the job has reached a terminal state and, thus, is shut down. Apparently, we 
> do not properly manage the lifecycles of {{CheckpointCoordinator}} and 
> checkpoint post clean up actions.
> The uncaught exception is 
> {code}
> java.util.concurrent.RejectedExecutionException: Task 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407 
> rejected from 
> java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated, pool 
> size = 0, active threads = 0, queued tasks = 0, completed tasks = 25977] at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063
>  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622
>  at 
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668
>  at 
> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624
>  at java.lang.Thread.run(Thread.java:748 undefined)
> {code}
> cc [~roman_khachatryan].
> https://lists.apache.org/thread.html/r75901008d88163560aabb8ab6fc458cd9d4f19076e03ae85e00f9a3a%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20992) Checkpoint cleanup can kill JobMaster

2021-01-18 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267470#comment-17267470
 ] 

Roman Khachatryan commented on FLINK-20992:
---

{quote}To me it makes more sense that the CheckpointCoordinator which knows 
about its state is responsible for making sure that we only execute a Runnable 
if we are still running.
{quote}
I think the current issue can be solved this way by adding a check *and 
synchronized* in scheduleTriggerRequest;
 But for cases like FLINK-18290 it's not possible as scheduling a callback is 
done by j.u.c. (e.g. handleAsync).

 

I've created FLINK-21015 to implement waiting for cleanup completion (and added 
your comment there).
 

> Checkpoint cleanup can kill JobMaster
> -
>
> Key: FLINK-20992
> URL: https://issues.apache.org/jira/browse/FLINK-20992
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.0
>Reporter: Till Rohrmann
>Assignee: Roman Khachatryan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.2
>
>
> A user reported that cancelling a job can lead to an uncaught exception which 
> kills the {{JobMaster}}. The problem seems to be that the 
> {{CheckpointsCleaner}} might trigger {{CheckpointCoordinator}} actions after 
> the job has reached a terminal state and, thus, is shut down. Apparently, we 
> do not properly manage the lifecycles of {{CheckpointCoordinator}} and 
> checkpoint post clean up actions.
> The uncaught exception is 
> {code}
> java.util.concurrent.RejectedExecutionException: Task 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407 
> rejected from 
> java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated, pool 
> size = 0, active threads = 0, queued tasks = 0, completed tasks = 25977] at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063
>  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622
>  at 
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668
>  at 
> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624
>  at java.lang.Thread.run(Thread.java:748 undefined)
> {code}
> cc [~roman_khachatryan].
> https://lists.apache.org/thread.html/r75901008d88163560aabb8ab6fc458cd9d4f19076e03ae85e00f9a3a%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20992) Checkpoint cleanup can kill JobMaster

2021-01-18 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267436#comment-17267436
 ] 

Till Rohrmann commented on FLINK-20992:
---

Thanks for creating this PR [~roman_khachatryan].

I agree that there is a lifecycle problem of the {{CheckpointCoordinator}} and 
its services. However, I am not sure whether we should harden the exception 
handling logic or rather make the {{CheckpointCoordinator}} no longer enqueuing 
something into the {{Executor}} after it is shut down. To me it makes more 
sense that the {{CheckpointCoordinator}} which knows about its state is 
responsible for making sure that we only execute a {{Runnable}} if we are still 
running.

I think you are also right that we currently don't wait for the checkpoint 
cleanup to complete. I think the component responsible for the clean up tasks 
should make sure that they are completed before shutting down or hand them over 
to a new owner who is responsible for them. Hence, if the 
{{CheckpointCoordinator}} is responsible, then the 
{{CheckpointCoordinator.shutdown}} method should make sure that all checkpoints 
are cleaned up. Alternatively, {{CheckpointCoordinator.shutdown}} could return 
a {{CompletableFuture}} which is completed once everything is cleaned up. 
Consequently, I wouldn't make it the responsibility of the exeuctors to make 
sure that all checkpoints are properly cleaned up by waiting on the completion 
of all enqueued tasks.

Both things should be fixed imo.

> Checkpoint cleanup can kill JobMaster
> -
>
> Key: FLINK-20992
> URL: https://issues.apache.org/jira/browse/FLINK-20992
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.0
>Reporter: Till Rohrmann
>Assignee: Roman Khachatryan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.2
>
>
> A user reported that cancelling a job can lead to an uncaught exception which 
> kills the {{JobMaster}}. The problem seems to be that the 
> {{CheckpointsCleaner}} might trigger {{CheckpointCoordinator}} actions after 
> the job has reached a terminal state and, thus, is shut down. Apparently, we 
> do not properly manage the lifecycles of {{CheckpointCoordinator}} and 
> checkpoint post clean up actions.
> The uncaught exception is 
> {code}
> java.util.concurrent.RejectedExecutionException: Task 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407 
> rejected from 
> java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated, pool 
> size = 0, active threads = 0, queued tasks = 0, completed tasks = 25977] at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063
>  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622
>  at 
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668
>  at 
> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624
>  at java.lang.Thread.run(Thread.java:748 undefined)
> {code}
> cc [~roman_khachatryan].
> https://lists.apache.org/thread.html/r75901008d88163560aabb8ab6fc458cd9d4f19076e03ae85e00f9a3a%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20992) Checkpoint cleanup can kill JobMaster

2021-01-18 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267188#comment-17267188
 ] 

Roman Khachatryan commented on FLINK-20992:
---

I've published a simple PR to address the issue directly.

 

However, this is not the first time we hit this RejectedExecutionException 
problem (e.g. FLINK-18290).

I think the reason is that the executors used by coordinator aren't aware of 
it's lifecycle.

So I propose to:
 # Create executors inside CheckpointCoordinator (both io & timer thread pools)
 # Check isShutdown() in their error handlers (if yes and it's 
RejectedExecutionException then just log; otherwise delegate to 
FatalExitExceptionHandler)
 # (this will allow to remove such RejectedExecutionException checks from 
coordinator code)

 

Additionally, I found that during the shutting down we don't wait for 
checkpoint cleanup to complete (or any other tasks submitted to executors):
{code:java}
checkpointCoordinatorTimer.shutdownNow() // in ExecutionGraph
scheduledExecutorService.shutdownNow(); // in JobManagerSharedServices
{code}
So only currently executing actions will complete, but not any queued.

I think we SHOULD complete cleanup on shutdown and propose the following:
 # Replace shutdownNow with shutdown to allow cleanup to finish
 # Add awaitTermination (with timeout)
 # At least log the result of shutdownNow (list of runnables)

 

 WDYT [~trohrmann]?

I'd create separate tickets for the latter two issues.

> Checkpoint cleanup can kill JobMaster
> -
>
> Key: FLINK-20992
> URL: https://issues.apache.org/jira/browse/FLINK-20992
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.2
>
>
> A user reported that cancelling a job can lead to an uncaught exception which 
> kills the {{JobMaster}}. The problem seems to be that the 
> {{CheckpointsCleaner}} might trigger {{CheckpointCoordinator}} actions after 
> the job has reached a terminal state and, thus, is shut down. Apparently, we 
> do not properly manage the lifecycles of {{CheckpointCoordinator}} and 
> checkpoint post clean up actions.
> The uncaught exception is 
> {code}
> java.util.concurrent.RejectedExecutionException: Task 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407 
> rejected from 
> java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated, pool 
> size = 0, active threads = 0, queued tasks = 0, completed tasks = 25977] at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063
>  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622
>  at 
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668
>  at 
> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624
>  at java.lang.Thread.run(Thread.java:748 undefined)
> {code}
> cc [~roman_khachatryan].
> https://lists.apache.org/thread.html/r75901008d88163560aabb8ab6fc458cd9d4f19076e03ae85e00f9a3a%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)