[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612273#comment-15612273
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2652


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0, 1.1.4
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15611336#comment-15611336
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2652
  
OK thanks. I addressed one last issue with shutting down the watch dog 
thread (it was lingering around sleeping and only noticed that the task has 
terminated after that sleep... now the task canceler interrupts the watchdog 
when everything works as expected and the watch dog thread terminates early).

I've also backported this to the `release-1.1` branch.


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591567#comment-15591567
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2652
  
+1 go ahead


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15588166#comment-15588166
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2652
  
OK, I would like to go ahead and merge this if there are no objections.


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15586082#comment-15586082
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2652
  
Concerning the Kafka test: From the logs, the test fails because a topic 
cannot be deleted. The ZooKeeper operation blocks and test times out. I am 
pretty sure that this is unrelated, as no Flink is running at that point.


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585948#comment-15585948
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2652
  
Travis gives the green light except for a Kafka failure. @StephanEwen 
@rmetzger Do you know whether this is a known issue? Or might it be a 
regression from this change? 
https://s3.amazonaws.com/archive.travis-ci.org/jobs/168613643/log.txt


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585425#comment-15585425
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2652
  
Haha, that picture convinced me to actually add the test :smile: 


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585403#comment-15585403
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2652
  
Yeah, this sort of covers it. Just afraid of such a situation here: 
https://twitter.com/thepracticaldev/status/687672086152753152


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585380#comment-15585380
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2652
  
We have the `TaskManagerProcessReapingTest` which tests that the 
TaskManager process properly exits when the TaskManager actor dies. In 
addition, there is `TaskManagerTest#testTerminationOnFatalError`, which tests 
that the `FatalError` message terminates the actor. 

Do you think we are already covered by this? We can certainly add a process 
reaper test variant that sends a `FatalError` message instead of the 
`PoisonPill`. 


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585352#comment-15585352
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2652
  
What do you think about a followup test, where we ensure that a fatal error 
notification on the TaskManager actually results in a process kill?


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585346#comment-15585346
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2652
  
Renamed the `TaskOptions` class to `TaskManagerOptions` so that we can 
easily migrate the task manager options as a follow up. 


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585344#comment-15585344
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83842005
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java 
---
@@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws 
Exception {
verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
}
 
+   /**
+* Tests that interrupt happens via watch dog if canceller is stuck in 
cancel.
+* Task cancellation blocks the task canceller. Interrupt after cancel 
via
+* cancellation watch dog.
+*/
+   @Test
+   public void testWatchDogInterruptsTask() throws Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = createTask(InvokableBlockingInCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+
+   // No fatal error
+   for (Object msg : taskManagerMessages) {
+   assertEquals(false, msg instanceof 
TaskManagerMessages.FatalError);
+   }
+   }
+
+   /**
+* The invoke() method holds a lock (trigger awaitLatch after 
acquisition)
+* and cancel cannot complete because it also tries to acquire the same 
lock.
+* This is resolved by the watch dog, no fatal error.
+*/
+   @Test
+   public void testInterruptableSharedLockInInvokeAndCancel() throws 
Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = 
createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+
+   // No fatal error
+   for (Object msg : taskManagerMessages) {
+   assertEquals(false, msg instanceof 
TaskManagerMessages.FatalError);
--- End diff --

You can also give a message to `assertFalse` - I like assertEquals for 
printing the expected value, but if the expected value is false, the former 
seems more natural to me...


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585332#comment-15585332
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83841451
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1251,33 +1299,124 @@ public void run() {
catch (InterruptedException e) {
// we can ignore this
}
+   }
+   catch (Throwable t) {
+   logger.error("Error in the task canceler", t);
+   }
+   }
+   }
+
+   /**
+* Watchdog for the cancellation. If the task is stuck in cancellation,
+* we notify the task manager about a fatal error.
+*/
+   private static class TaskCancellationWatchDog extends TimerTask {
+
+   /**
+* Pass logger in order to prevent that the compiler needs to 
inject static bridge methods
+* to access it.
+*/
+   private final Logger logger;
+
+   /** Thread executing the Task. */
+   private final Thread executor;
+
+   /** Interrupt interval. */
+   private final long interruptInterval;
+
+   /** Timeout after which a fatal error notification happens. */
+   private final long interruptTimeout;
+
+   /** TaskManager to notify about a timeout */
+   private final TaskManagerConnection taskManager;
+
+   /** Task name (for logging and error messages). */
+   private final String taskName;
+
+   /** Synchronization with the {@link TaskCanceler} thread. */
+   private final CountDownLatch taskCancellerLatch;
+
+   public TaskCancellationWatchDog(
+   Logger logger,
+   Thread executor,
+   long interruptInterval,
+   long interruptTimeout,
+   TaskManagerConnection taskManager,
+   String taskName,
+   CountDownLatch taskCancellerLatch) {
+
+   this.logger = checkNotNull(logger);
+   this.executor = checkNotNull(executor);
+   this.interruptInterval = 
checkNotNull(interruptInterval);
+   this.interruptTimeout = checkNotNull(interruptTimeout);
+   this.taskManager = checkNotNull(taskManager);
+   this.taskName = checkNotNull(taskName);
+   this.taskCancellerLatch = 
checkNotNull(taskCancellerLatch);
+   }
+
+   @Override
+   public void run() {
+   try {
+   // Synchronize with task canceler
+   taskCancellerLatch.await();
+   } catch (Exception e) {
+   String msg = String.format("Exception while 
waiting on task " +
+   "canceller to cancel task 
'%s'.", taskName);
+   taskManager.notifyFatalError(msg, e);
+   return;
+   }
+
+   long intervalNanos = 
TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
+   long timeoutNanos = 
TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS);
+   long deadline = System.nanoTime() + timeoutNanos;
+
+   try {
+   // Initial wait before interrupting periodically
+   Thread.sleep(interruptInterval);
+   } catch (InterruptedException ignored) {
+   }
+
+   // It is possible that the user code does not react to 
the task canceller.
+   // for that reason, we spawn this separate thread that 
repeatedly interrupts
+   // the user code until it exits. If the suer user code 
does not exit within
+   // the timeout, we notify the job manager about a fatal 
error.
+   while (executor.isAlive()) {
+   long now = System.nanoTime();
+
+   // build the stack trace of where the thread is 
stuck, for the log
+   StringBuilder bld = new StringBuilder();
+   

[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585325#comment-15585325
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83840872
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java 
---
@@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws 
Exception {
verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
}
 
+   /**
+* Tests that interrupt happens via watch dog if canceller is stuck in 
cancel.
+* Task cancellation blocks the task canceller. Interrupt after cancel 
via
+* cancellation watch dog.
+*/
+   @Test
+   public void testWatchDogInterruptsTask() throws Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = createTask(InvokableBlockingInCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+
+   // No fatal error
+   for (Object msg : taskManagerMessages) {
+   assertEquals(false, msg instanceof 
TaskManagerMessages.FatalError);
+   }
+   }
+
+   /**
+* The invoke() method holds a lock (trigger awaitLatch after 
acquisition)
+* and cancel cannot complete because it also tries to acquire the same 
lock.
+* This is resolved by the watch dog, no fatal error.
+*/
+   @Test
+   public void testInterruptableSharedLockInInvokeAndCancel() throws 
Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = 
createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+
+   // No fatal error
+   for (Object msg : taskManagerMessages) {
+   assertEquals(false, msg instanceof 
TaskManagerMessages.FatalError);
--- End diff --

`assertFalse` only fails with printing `AssertionError` and you have to 
check the stack trace whereas `assertEquals` has a message printing expected 
and actual inputs.


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585320#comment-15585320
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83840545
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java 
---
@@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws 
Exception {
verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
}
 
+   /**
+* Tests that interrupt happens via watch dog if canceller is stuck in 
cancel.
+* Task cancellation blocks the task canceller. Interrupt after cancel 
via
+* cancellation watch dog.
+*/
+   @Test
+   public void testWatchDogInterruptsTask() throws Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = createTask(InvokableBlockingInCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+
+   // No fatal error
+   for (Object msg : taskManagerMessages) {
+   assertEquals(false, msg instanceof 
TaskManagerMessages.FatalError);
+   }
+   }
+
+   /**
+* The invoke() method holds a lock (trigger awaitLatch after 
acquisition)
+* and cancel cannot complete because it also tries to acquire the same 
lock.
+* This is resolved by the watch dog, no fatal error.
+*/
+   @Test
+   public void testInterruptableSharedLockInInvokeAndCancel() throws 
Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = 
createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
--- End diff --

how about adding `task.getExecutingThread().join()` instead of the using 
the trigger latch? Seems more intuitive and safer.


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585309#comment-15585309
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83839720
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1251,33 +1299,124 @@ public void run() {
catch (InterruptedException e) {
// we can ignore this
}
+   }
+   catch (Throwable t) {
+   logger.error("Error in the task canceler", t);
+   }
+   }
+   }
+
+   /**
+* Watchdog for the cancellation. If the task is stuck in cancellation,
+* we notify the task manager about a fatal error.
+*/
+   private static class TaskCancellationWatchDog extends TimerTask {
+
+   /**
+* Pass logger in order to prevent that the compiler needs to 
inject static bridge methods
+* to access it.
+*/
+   private final Logger logger;
+
+   /** Thread executing the Task. */
+   private final Thread executor;
+
+   /** Interrupt interval. */
+   private final long interruptInterval;
+
+   /** Timeout after which a fatal error notification happens. */
+   private final long interruptTimeout;
+
+   /** TaskManager to notify about a timeout */
+   private final TaskManagerConnection taskManager;
+
+   /** Task name (for logging and error messages). */
+   private final String taskName;
+
+   /** Synchronization with the {@link TaskCanceler} thread. */
+   private final CountDownLatch taskCancellerLatch;
+
+   public TaskCancellationWatchDog(
+   Logger logger,
+   Thread executor,
+   long interruptInterval,
+   long interruptTimeout,
+   TaskManagerConnection taskManager,
+   String taskName,
+   CountDownLatch taskCancellerLatch) {
+
+   this.logger = checkNotNull(logger);
+   this.executor = checkNotNull(executor);
+   this.interruptInterval = 
checkNotNull(interruptInterval);
+   this.interruptTimeout = checkNotNull(interruptTimeout);
+   this.taskManager = checkNotNull(taskManager);
+   this.taskName = checkNotNull(taskName);
+   this.taskCancellerLatch = 
checkNotNull(taskCancellerLatch);
+   }
+
+   @Override
+   public void run() {
+   try {
+   // Synchronize with task canceler
+   taskCancellerLatch.await();
+   } catch (Exception e) {
+   String msg = String.format("Exception while 
waiting on task " +
+   "canceller to cancel task 
'%s'.", taskName);
+   taskManager.notifyFatalError(msg, e);
+   return;
+   }
+
+   long intervalNanos = 
TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
+   long timeoutNanos = 
TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS);
+   long deadline = System.nanoTime() + timeoutNanos;
+
+   try {
+   // Initial wait before interrupting periodically
+   Thread.sleep(interruptInterval);
+   } catch (InterruptedException ignored) {
+   }
+
+   // It is possible that the user code does not react to 
the task canceller.
+   // for that reason, we spawn this separate thread that 
repeatedly interrupts
+   // the user code until it exits. If the suer user code 
does not exit within
+   // the timeout, we notify the job manager about a fatal 
error.
+   while (executor.isAlive()) {
+   long now = System.nanoTime();
+
+   // build the stack trace of where the thread is 
stuck, for the log
+   StringBuilder bld = new StringBuilder();
+   

[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585283#comment-15585283
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2652
  
Should we have one class `TaskManagerOptions`? To not spread the config 
over too many classes.


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585275#comment-15585275
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2652
  
Thanks for the valuable feedback. Some of the errors were a little sloppy 
on my side. Sorry for that. I addressed all your comments.


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585118#comment-15585118
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83824787
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1251,33 +1289,113 @@ public void run() {
catch (InterruptedException e) {
// we can ignore this
}
+   }
+   catch (Throwable t) {
+   LOG.error("Error in the task canceler", t);
+   }
 
-   // it is possible that the user code does not 
react immediately. for that
-   // reason, we spawn a separate thread that 
repeatedly interrupts the user code until
-   // it exits
-   while (executer.isAlive()) {
-   // build the stack trace of where the 
thread is stuck, for the log
-   StringBuilder bld = new StringBuilder();
-   StackTraceElement[] stack = 
executer.getStackTrace();
-   for (StackTraceElement e : stack) {
-   bld.append(e).append('\n');
-   }
+   System.out.println("Canceler done");
+   }
+   }
+
+   /**
+* Watchdog for the cancellation. If the task is stuck in cancellation,
+* we notify the task manager about a fatal error.
+*/
+   private static class TaskCancellationWatchDog extends TimerTask {
+
+   /** Thread executing the Task. */
+   private final Thread executor;
+
+   /** Interrupt interval. */
+   private final long interruptInterval;
+
+   /** Timeout after which a fatal error notification happens. */
+   private final long interruptTimeout;
+
+   /** TaskManager to notify about a timeout */
+   private final TaskManagerConnection taskManager;
 
-   logger.warn("Task '{}' did not react to 
cancelling signal, but is stuck in method:\n {}",
+   /** Task name (for logging and error messages). */
+   private final String taskName;
+
+   /** Synchronization with the {@link TaskCanceler} thread. */
+   private final CountDownLatch taskCancellerLatch;
+
+   public TaskCancellationWatchDog(
+   Thread executor,
+   long interruptInterval,
+   long interruptTimeout,
+   TaskManagerConnection taskManager,
+   String taskName,
+   CountDownLatch taskCancellerLatch) {
+
+   this.executor = checkNotNull(executor);
+   this.interruptInterval = 
checkNotNull(interruptInterval);
+   this.interruptTimeout = checkNotNull(interruptTimeout);
+   this.taskManager = checkNotNull(taskManager);
+   this.taskName = checkNotNull(taskName);
+   this.taskCancellerLatch = 
checkNotNull(taskCancellerLatch);
+   }
+
+   @Override
+   public void run() {
+   try {
+   // Synchronize with task canceler
+   if (!taskCancellerLatch.await(interruptTimeout, 
TimeUnit.MILLISECONDS)) {
+   return; // Did not return
+   }
+   } catch (InterruptedException e) {
+   return;
+   }
+
+   long deadline = System.currentTimeMillis() + 
interruptTimeout;
--- End diff --

OK, will change


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> 

[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585117#comment-15585117
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83824750
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1251,33 +1289,113 @@ public void run() {
catch (InterruptedException e) {
// we can ignore this
}
+   }
+   catch (Throwable t) {
+   LOG.error("Error in the task canceler", t);
+   }
 
-   // it is possible that the user code does not 
react immediately. for that
-   // reason, we spawn a separate thread that 
repeatedly interrupts the user code until
-   // it exits
-   while (executer.isAlive()) {
-   // build the stack trace of where the 
thread is stuck, for the log
-   StringBuilder bld = new StringBuilder();
-   StackTraceElement[] stack = 
executer.getStackTrace();
-   for (StackTraceElement e : stack) {
-   bld.append(e).append('\n');
-   }
+   System.out.println("Canceler done");
+   }
+   }
+
+   /**
+* Watchdog for the cancellation. If the task is stuck in cancellation,
+* we notify the task manager about a fatal error.
+*/
+   private static class TaskCancellationWatchDog extends TimerTask {
+
+   /** Thread executing the Task. */
+   private final Thread executor;
+
+   /** Interrupt interval. */
+   private final long interruptInterval;
+
+   /** Timeout after which a fatal error notification happens. */
+   private final long interruptTimeout;
+
+   /** TaskManager to notify about a timeout */
+   private final TaskManagerConnection taskManager;
 
-   logger.warn("Task '{}' did not react to 
cancelling signal, but is stuck in method:\n {}",
+   /** Task name (for logging and error messages). */
+   private final String taskName;
+
+   /** Synchronization with the {@link TaskCanceler} thread. */
+   private final CountDownLatch taskCancellerLatch;
+
+   public TaskCancellationWatchDog(
+   Thread executor,
+   long interruptInterval,
+   long interruptTimeout,
+   TaskManagerConnection taskManager,
+   String taskName,
+   CountDownLatch taskCancellerLatch) {
+
+   this.executor = checkNotNull(executor);
+   this.interruptInterval = 
checkNotNull(interruptInterval);
+   this.interruptTimeout = checkNotNull(interruptTimeout);
+   this.taskManager = checkNotNull(taskManager);
+   this.taskName = checkNotNull(taskName);
+   this.taskCancellerLatch = 
checkNotNull(taskCancellerLatch);
+   }
+
+   @Override
+   public void run() {
+   try {
+   // Synchronize with task canceler
+   if (!taskCancellerLatch.await(interruptTimeout, 
TimeUnit.MILLISECONDS)) {
+   return; // Did not return
--- End diff --

Leftover, changed to trigger fatal error as well.


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585120#comment-15585120
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83824898
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java 
---
@@ -565,6 +568,50 @@ public void testOnPartitionStateUpdate() throws 
Exception {
verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
}
 
+   /**
+* Task cancellation blocks the task canceller. Interrupt after cancel 
via
+* cancellation watch dog.
+*/
+   @Test
+   public void testTaskCancelWatchDog() throws Exception {
+   Configuration config = new Configuration();
+   
config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 100);
+   
config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 1000);
+
+   Task task = createTask(InvokableBlockingInCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+   }
+
+   @Test
+   public void testReportFatalErrorAfterCancellationTimeout() throws 
Exception {
+   Configuration config = new Configuration();
+   
config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 10);
+   
config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 200);
+
+   Task task = createTask(InvokableBlockingInvokeAndCancel.class, 
config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   for (int i = 0; i < 10; i++) {
+   Object msg = taskManagerMessages.poll(1, 
TimeUnit.SECONDS);
+   if (msg instanceof TaskManagerMessages.FatalError) {
+   System.out.println(msg);
+   return; // success
+   }
+   }
+
+   fail("Did not receive expected task manager message");
--- End diff --

Good catch. Will let the test thread let it shut down.


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585115#comment-15585115
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83824704
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1251,33 +1289,113 @@ public void run() {
catch (InterruptedException e) {
// we can ignore this
}
+   }
+   catch (Throwable t) {
+   LOG.error("Error in the task canceler", t);
+   }
 
-   // it is possible that the user code does not 
react immediately. for that
-   // reason, we spawn a separate thread that 
repeatedly interrupts the user code until
-   // it exits
-   while (executer.isAlive()) {
-   // build the stack trace of where the 
thread is stuck, for the log
-   StringBuilder bld = new StringBuilder();
-   StackTraceElement[] stack = 
executer.getStackTrace();
-   for (StackTraceElement e : stack) {
-   bld.append(e).append('\n');
-   }
+   System.out.println("Canceler done");
--- End diff --

Yes, removed


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584853#comment-15584853
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83800757
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1251,33 +1289,113 @@ public void run() {
catch (InterruptedException e) {
// we can ignore this
}
+   }
+   catch (Throwable t) {
+   LOG.error("Error in the task canceler", t);
+   }
 
-   // it is possible that the user code does not 
react immediately. for that
-   // reason, we spawn a separate thread that 
repeatedly interrupts the user code until
-   // it exits
-   while (executer.isAlive()) {
-   // build the stack trace of where the 
thread is stuck, for the log
-   StringBuilder bld = new StringBuilder();
-   StackTraceElement[] stack = 
executer.getStackTrace();
-   for (StackTraceElement e : stack) {
-   bld.append(e).append('\n');
-   }
+   System.out.println("Canceler done");
+   }
+   }
+
+   /**
+* Watchdog for the cancellation. If the task is stuck in cancellation,
+* we notify the task manager about a fatal error.
+*/
+   private static class TaskCancellationWatchDog extends TimerTask {
+
+   /** Thread executing the Task. */
+   private final Thread executor;
+
+   /** Interrupt interval. */
+   private final long interruptInterval;
+
+   /** Timeout after which a fatal error notification happens. */
+   private final long interruptTimeout;
+
+   /** TaskManager to notify about a timeout */
+   private final TaskManagerConnection taskManager;
 
-   logger.warn("Task '{}' did not react to 
cancelling signal, but is stuck in method:\n {}",
+   /** Task name (for logging and error messages). */
+   private final String taskName;
+
+   /** Synchronization with the {@link TaskCanceler} thread. */
+   private final CountDownLatch taskCancellerLatch;
+
+   public TaskCancellationWatchDog(
+   Thread executor,
+   long interruptInterval,
+   long interruptTimeout,
+   TaskManagerConnection taskManager,
+   String taskName,
+   CountDownLatch taskCancellerLatch) {
+
+   this.executor = checkNotNull(executor);
+   this.interruptInterval = 
checkNotNull(interruptInterval);
+   this.interruptTimeout = checkNotNull(interruptTimeout);
+   this.taskManager = checkNotNull(taskManager);
+   this.taskName = checkNotNull(taskName);
+   this.taskCancellerLatch = 
checkNotNull(taskCancellerLatch);
+   }
+
+   @Override
+   public void run() {
+   try {
+   // Synchronize with task canceler
+   if (!taskCancellerLatch.await(interruptTimeout, 
TimeUnit.MILLISECONDS)) {
+   return; // Did not return
--- End diff --

I think returning early here sort of defeats the idea of the watch dog...


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584854#comment-15584854
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83803660
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java 
---
@@ -565,6 +568,50 @@ public void testOnPartitionStateUpdate() throws 
Exception {
verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
}
 
+   /**
+* Task cancellation blocks the task canceller. Interrupt after cancel 
via
+* cancellation watch dog.
+*/
+   @Test
+   public void testTaskCancelWatchDog() throws Exception {
+   Configuration config = new Configuration();
+   
config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 100);
+   
config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 1000);
+
+   Task task = createTask(InvokableBlockingInCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+   }
+
+   @Test
+   public void testReportFatalErrorAfterCancellationTimeout() throws 
Exception {
+   Configuration config = new Configuration();
+   
config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 10);
+   
config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 200);
+
+   Task task = createTask(InvokableBlockingInvokeAndCancel.class, 
config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   for (int i = 0; i < 10; i++) {
+   Object msg = taskManagerMessages.poll(1, 
TimeUnit.SECONDS);
+   if (msg instanceof TaskManagerMessages.FatalError) {
+   System.out.println(msg);
+   return; // success
+   }
+   }
+
+   fail("Did not receive expected task manager message");
--- End diff --

Does this test leave a lingering endlessly looped execution thread?


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584852#comment-15584852
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83800396
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1183,47 +1220,48 @@ public String toString() {
 */
private static class TaskCanceler implements Runnable {
 
-   private final Logger logger;
private final AbstractInvokable invokable;
private final Thread executer;
-   private final String taskName;
private final long taskCancellationIntervalMillis;
private final ResultPartition[] producedPartitions;
private final SingleInputGate[] inputGates;
+   private final CountDownLatch watchDogLatch;
 
public TaskCanceler(
-   Logger logger,
--- End diff --

I think the logger was added to prevent that the compiler needs to inject 
static bridge methods to access it. Why not keep it?


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584855#comment-15584855
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83800845
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1251,33 +1289,113 @@ public void run() {
catch (InterruptedException e) {
// we can ignore this
}
+   }
+   catch (Throwable t) {
+   LOG.error("Error in the task canceler", t);
+   }
 
-   // it is possible that the user code does not 
react immediately. for that
-   // reason, we spawn a separate thread that 
repeatedly interrupts the user code until
-   // it exits
-   while (executer.isAlive()) {
-   // build the stack trace of where the 
thread is stuck, for the log
-   StringBuilder bld = new StringBuilder();
-   StackTraceElement[] stack = 
executer.getStackTrace();
-   for (StackTraceElement e : stack) {
-   bld.append(e).append('\n');
-   }
+   System.out.println("Canceler done");
+   }
+   }
+
+   /**
+* Watchdog for the cancellation. If the task is stuck in cancellation,
+* we notify the task manager about a fatal error.
+*/
+   private static class TaskCancellationWatchDog extends TimerTask {
+
+   /** Thread executing the Task. */
+   private final Thread executor;
+
+   /** Interrupt interval. */
+   private final long interruptInterval;
+
+   /** Timeout after which a fatal error notification happens. */
+   private final long interruptTimeout;
+
+   /** TaskManager to notify about a timeout */
+   private final TaskManagerConnection taskManager;
 
-   logger.warn("Task '{}' did not react to 
cancelling signal, but is stuck in method:\n {}",
+   /** Task name (for logging and error messages). */
+   private final String taskName;
+
+   /** Synchronization with the {@link TaskCanceler} thread. */
+   private final CountDownLatch taskCancellerLatch;
+
+   public TaskCancellationWatchDog(
+   Thread executor,
+   long interruptInterval,
+   long interruptTimeout,
+   TaskManagerConnection taskManager,
+   String taskName,
+   CountDownLatch taskCancellerLatch) {
+
+   this.executor = checkNotNull(executor);
+   this.interruptInterval = 
checkNotNull(interruptInterval);
+   this.interruptTimeout = checkNotNull(interruptTimeout);
+   this.taskManager = checkNotNull(taskManager);
+   this.taskName = checkNotNull(taskName);
+   this.taskCancellerLatch = 
checkNotNull(taskCancellerLatch);
+   }
+
+   @Override
+   public void run() {
+   try {
+   // Synchronize with task canceler
+   if (!taskCancellerLatch.await(interruptTimeout, 
TimeUnit.MILLISECONDS)) {
+   return; // Did not return
+   }
+   } catch (InterruptedException e) {
+   return;
+   }
+
+   long deadline = System.currentTimeMillis() + 
interruptTimeout;
--- End diff --

Using `System.nanoTime()` is more stable than `System.currentTimeMillis()`. 
Would be good to use, especially if we are dealing with timeouts that want to 
kill the process.


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 

[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584851#comment-15584851
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83801277
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java 
---
@@ -565,6 +568,50 @@ public void testOnPartitionStateUpdate() throws 
Exception {
verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
}
 
+   /**
+* Task cancellation blocks the task canceller. Interrupt after cancel 
via
+* cancellation watch dog.
+*/
+   @Test
+   public void testTaskCancelWatchDog() throws Exception {
+   Configuration config = new Configuration();
+   
config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 100);
+   
config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 1000);
+
+   Task task = createTask(InvokableBlockingInCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+   }
+
+   @Test
+   public void testReportFatalErrorAfterCancellationTimeout() throws 
Exception {
+   Configuration config = new Configuration();
+   
config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 10);
+   
config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 200);
+
+   Task task = createTask(InvokableBlockingInvokeAndCancel.class, 
config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   for (int i = 0; i < 10; i++) {
+   Object msg = taskManagerMessages.poll(1, 
TimeUnit.SECONDS);
+   if (msg instanceof TaskManagerMessages.FatalError) {
+   System.out.println(msg);
--- End diff --

Leftover?


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584857#comment-15584857
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83800264
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -289,6 +294,10 @@ public Task(
ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,

ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS);
 
+   this.taskCancellationTimeout = jobConfiguration.getLong(
--- End diff --

I think the `jobConfiguration` is the wrong configuration here. It should 
probably be the TaskManager's configuration in the `taskManagerConfig`. Same 
probably holds for the cancellation interval above.

Seems neither were ever tested?


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584856#comment-15584856
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83800506
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1251,33 +1289,113 @@ public void run() {
catch (InterruptedException e) {
// we can ignore this
}
+   }
+   catch (Throwable t) {
+   LOG.error("Error in the task canceler", t);
+   }
 
-   // it is possible that the user code does not 
react immediately. for that
-   // reason, we spawn a separate thread that 
repeatedly interrupts the user code until
-   // it exits
-   while (executer.isAlive()) {
-   // build the stack trace of where the 
thread is stuck, for the log
-   StringBuilder bld = new StringBuilder();
-   StackTraceElement[] stack = 
executer.getStackTrace();
-   for (StackTraceElement e : stack) {
-   bld.append(e).append('\n');
-   }
+   System.out.println("Canceler done");
--- End diff --

Leftover?


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582735#comment-15582735
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/2652

[FLINK-4715] Fail TaskManager with fatal error if task cancellation is stuck

- Splits the cancellation up into two threads:
* The `TaskCanceler` calls `cancel` on the invokable and `interrupt` on 
the executing Thread. It then exists.
   * The `TaskCancellationWatchDog` kicks in after the task cancellation 
timeout (current default: 30 secs) and periodically calls `interrupt` on the 
executing Thread. If the Thread does not terminate within the task cancellation 
timeout (new config value, default 3 mins), the task manager is notified about 
a fatal error, leading to termination of the JVM.
- The new configuration is exposed via 
`ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS`
  (default: 3 mins) and the `ExecutionConfig` (similar to the cancellation 
interval).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 4715-suicide

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2652.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2652






> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-17 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582048#comment-15582048
 ] 

Stephan Ewen commented on FLINK-4715:
-

I think we should do the following:

Split the cancellation up into two threads:
  # The first thread calls {{cancel()}} on the task and {{interrupt()}} on the 
main thread. It then exits.
  # The second thread is a watchdog that kicks in after {{n}} seconds (default 
is 10, I think) and periodically calls {{interrupt()}} every {{n}} seconds. 
After a maximum duration (lets say 1 minute) it notifies the {{TaskManager}} of 
a fatal error. In most setups, this leads to a process kill.

The reason to separate this into two threads is that we have seen cases where 
{{cancel()}} blocks waiting on a lock held by the main thread. In that case, 
neither an {{interrupt()}} call would come, nor would the "task manager exit" 
safety net ever kick in.

> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-09-29 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15534797#comment-15534797
 ] 

Zhijiang Wang commented on FLINK-4715:
--

Yes, we already experienced this problem in real production many times,  
because the user code can not be controlled. If the thread is waiting for 
synchronized lock or other cases, it can not be cancelled, and the job master 
cancel the task failed many times, the job master will let the task manager 
exit itself.

> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)