akalash commented on a change in pull request #15820:
URL: https://github.com/apache/flink/pull/15820#discussion_r623948875



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -609,42 +615,54 @@ private void ensureNotCanceled() {
 
     @Override
     public final void invoke() throws Exception {
-        try {
-            // Allow invoking method 'invoke' without having to call 'restore' 
before it.
-            if (!isRunning) {
-                LOG.debug("Restoring during invoke will be called.");
-                restore();
-            }
+        runWithCleanUpOnFail(this::executeInvoke);
+
+        cleanUpInvoke();
+    }
+
+    private void executeInvoke() throws Exception {
+        // Allow invoking method 'invoke' without having to call 'restore' 
before it.
+        if (!isRunning) {
+            LOG.debug("Restoring during invoke will be called.");
+            restore();
+        }
 
-            // final check to exit early before starting to run
-            ensureNotCanceled();
+        // final check to exit early before starting to run
+        ensureNotCanceled();
 
-            // let the task do its work
-            runMailboxLoop();
+        // let the task do its work
+        runMailboxLoop();
 
-            // if this left the run() method cleanly despite the fact that 
this was canceled,
-            // make sure the "clean shutdown" is not attempted
-            ensureNotCanceled();
+        // if this left the run() method cleanly despite the fact that this 
was canceled,
+        // make sure the "clean shutdown" is not attempted
+        ensureNotCanceled();
 
-            afterInvoke();
+        afterInvoke();
+    }
+
+    private void runWithCleanUpOnFail(RunnableWithException run) throws 
Exception {
+        try {
+            run.run();
         } catch (Throwable invokeException) {
             failing = !canceled;
             try {
                 if (!canceled) {
-                    cancelTask();
+                    try {
+                        cancelTask();
+                    } catch (Throwable ex) {
+                        invokeException = firstOrSuppressed(ex, 
invokeException);
+                    }

Review comment:
       1. done
   2. now it have




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to