This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 330a7e8  Cleanup ThreadRuntime Exception Management (#1673)
330a7e8 is described below

commit 330a7e879e7123fe25b13be4d5db3f0b5abea7fb
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Fri Apr 27 13:44:45 2018 -0700

    Cleanup ThreadRuntime Exception Management (#1673)
    
    * Cleanup ThreadRuntime Exception Management
    
    * More fixes
    
    * Make JavaInstance keep track and restarting its thread
---
 .../functions/instance/JavaInstanceRunnable.java      | 19 +++++--------------
 .../pulsar/functions/runtime/JavaInstanceMain.java    |  2 +-
 .../pulsar/functions/runtime/ProcessRuntime.java      | 14 ++------------
 .../pulsar/functions/runtime/RuntimeSpawner.java      |  2 ++
 .../pulsar/functions/runtime/ThreadRuntime.java       | 15 ++-------------
 5 files changed, 12 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index c16bc10..d3853e2 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -89,7 +89,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
     private Table<ByteBuf, ByteBuf> stateTable;
 
     private JavaInstance javaInstance;
-    private AtomicBoolean running = new AtomicBoolean(true);
+    @Getter
+    private Exception deathException;
 
     @Getter(AccessLevel.PACKAGE)
     private Map<String, SerDe> inputSerDe;
@@ -172,7 +173,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
     public void run() {
         try {
             javaInstance = setupJavaInstance();
-            while (running.get()) {
+            while (true) {
 
                 currentRecord = processor.recieveMessage();
 
@@ -227,7 +228,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
             }
         } catch (Exception ex) {
             log.error("Uncaught exception in Java Instance", ex);
-            throw new RuntimeException(ex);
+            deathException = ex;
+            return;
         } finally {
             close();
         }
@@ -343,19 +345,8 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         processor.sendOutputMessage(srcRecord, msgBuilder);
     }
 
-    /**
-     * Stop java instance runnable
-     */
-    public void stop() {
-        this.running.set(false);
-    }
-
     @Override
     public void close() {
-        if (!running.get()) {
-            return;
-        }
-
         processor.close();
         javaInstance.close();
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 77d8e97..cf5238f 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -189,7 +189,7 @@ public class JavaInstanceMain {
                 instanceConfig,
                 jarFile,
                 containerFactory,
-                0);
+                30000);
 
         server = ServerBuilder.forPort(port)
                 .addService(new InstanceControlImpl(runtimeSpawner))
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index ba510cc..20407f8 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -213,6 +213,8 @@ class ProcessRuntime implements Runtime {
     public void stop() {
         process.destroy();
         channel.shutdown();
+        channel = null;
+        stub = null;
     }
 
     @Override
@@ -311,18 +313,6 @@ class ProcessRuntime implements Runtime {
             }
             return false;
         }
-        FunctionStatus status;
-        try {
-            status = getFunctionStatus().get();
-        } catch (Exception ex) {
-            return false;
-        }
-        if (!status.getRunning()) {
-            if (status.getFailureException() != null && 
!status.getFailureException().isEmpty()) {
-                deathException = new Exception(status.getFailureException());
-            }
-            return false;
-        }
         return true;
     }
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index 2b020df..ede9056 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -72,6 +72,8 @@ public class RuntimeSpawner implements AutoCloseable {
                     if (!runtime.isAlive()) {
                         log.error("Function Container is dead with exception", 
runtime.getDeathException());
                         log.error("Restarting...");
+                        // Just for the sake of sanity, just destroy the 
runtime
+                        runtime.stop();
                         runtimeDeathException = runtime.getDeathException();
                         runtime.start();
                         numRestarts++;
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index a0d5fb8..4bec173 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -44,7 +44,6 @@ class ThreadRuntime implements Runtime {
     @Getter
     private InstanceConfig instanceConfig;
     private JavaInstanceRunnable javaInstanceRunnable;
-    private Exception startupException;
     private ThreadGroup threadGroup;
 
     ThreadRuntime(InstanceConfig instanceConfig,
@@ -72,16 +71,8 @@ class ThreadRuntime implements Runtime {
     @Override
     public void start() {
         log.info("ThreadContainer starting function with instance config {}", 
instanceConfig);
-        startupException = null;
         this.fnThread = new Thread(threadGroup, javaInstanceRunnable,
                 
FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
-        this.fnThread.setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
-            @Override
-            public void uncaughtException(Thread t, Throwable e) {
-                startupException = new Exception(e);
-                log.error("Error occured in java instance:", e);
-            }
-        });
         this.fnThread.start();
     }
 
@@ -95,8 +86,6 @@ class ThreadRuntime implements Runtime {
     @Override
     public void stop() {
         if (fnThread != null) {
-            // Stop instance thread
-            javaInstanceRunnable.stop();
             // interrupt the instance thread
             fnThread.interrupt();
             try {
@@ -138,8 +127,8 @@ class ThreadRuntime implements Runtime {
     public Exception getDeathException() {
         if (isAlive()) {
             return null;
-        } else if (null != startupException) {
-            return startupException;
+        } else if (null != javaInstanceRunnable) {
+            return javaInstanceRunnable.getDeathException();
         } else {
             return null;
         }

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to