This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 330a7e8 Cleanup ThreadRuntime Exception Management (#1673) 330a7e8 is described below commit 330a7e879e7123fe25b13be4d5db3f0b5abea7fb Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Fri Apr 27 13:44:45 2018 -0700 Cleanup ThreadRuntime Exception Management (#1673) * Cleanup ThreadRuntime Exception Management * More fixes * Make JavaInstance keep track and restarting its thread --- .../functions/instance/JavaInstanceRunnable.java | 19 +++++-------------- .../pulsar/functions/runtime/JavaInstanceMain.java | 2 +- .../pulsar/functions/runtime/ProcessRuntime.java | 14 ++------------ .../pulsar/functions/runtime/RuntimeSpawner.java | 2 ++ .../pulsar/functions/runtime/ThreadRuntime.java | 15 ++------------- 5 files changed, 12 insertions(+), 40 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index c16bc10..d3853e2 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -89,7 +89,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private Table<ByteBuf, ByteBuf> stateTable; private JavaInstance javaInstance; - private AtomicBoolean running = new AtomicBoolean(true); + @Getter + private Exception deathException; @Getter(AccessLevel.PACKAGE) private Map<String, SerDe> inputSerDe; @@ -172,7 +173,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { public void run() { try { javaInstance = setupJavaInstance(); - while (running.get()) { + while (true) { currentRecord = processor.recieveMessage(); @@ -227,7 +228,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } } catch (Exception ex) { log.error("Uncaught exception in Java Instance", ex); - throw new RuntimeException(ex); + deathException = ex; + return; } finally { close(); } @@ -343,19 +345,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { processor.sendOutputMessage(srcRecord, msgBuilder); } - /** - * Stop java instance runnable - */ - public void stop() { - this.running.set(false); - } - @Override public void close() { - if (!running.get()) { - return; - } - processor.close(); javaInstance.close(); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index 77d8e97..cf5238f 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -189,7 +189,7 @@ public class JavaInstanceMain { instanceConfig, jarFile, containerFactory, - 0); + 30000); server = ServerBuilder.forPort(port) .addService(new InstanceControlImpl(runtimeSpawner)) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index ba510cc..20407f8 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -213,6 +213,8 @@ class ProcessRuntime implements Runtime { public void stop() { process.destroy(); channel.shutdown(); + channel = null; + stub = null; } @Override @@ -311,18 +313,6 @@ class ProcessRuntime implements Runtime { } return false; } - FunctionStatus status; - try { - status = getFunctionStatus().get(); - } catch (Exception ex) { - return false; - } - if (!status.getRunning()) { - if (status.getFailureException() != null && !status.getFailureException().isEmpty()) { - deathException = new Exception(status.getFailureException()); - } - return false; - } return true; } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java index 2b020df..ede9056 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java @@ -72,6 +72,8 @@ public class RuntimeSpawner implements AutoCloseable { if (!runtime.isAlive()) { log.error("Function Container is dead with exception", runtime.getDeathException()); log.error("Restarting..."); + // Just for the sake of sanity, just destroy the runtime + runtime.stop(); runtimeDeathException = runtime.getDeathException(); runtime.start(); numRestarts++; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java index a0d5fb8..4bec173 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java @@ -44,7 +44,6 @@ class ThreadRuntime implements Runtime { @Getter private InstanceConfig instanceConfig; private JavaInstanceRunnable javaInstanceRunnable; - private Exception startupException; private ThreadGroup threadGroup; ThreadRuntime(InstanceConfig instanceConfig, @@ -72,16 +71,8 @@ class ThreadRuntime implements Runtime { @Override public void start() { log.info("ThreadContainer starting function with instance config {}", instanceConfig); - startupException = null; this.fnThread = new Thread(threadGroup, javaInstanceRunnable, FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())); - this.fnThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - startupException = new Exception(e); - log.error("Error occured in java instance:", e); - } - }); this.fnThread.start(); } @@ -95,8 +86,6 @@ class ThreadRuntime implements Runtime { @Override public void stop() { if (fnThread != null) { - // Stop instance thread - javaInstanceRunnable.stop(); // interrupt the instance thread fnThread.interrupt(); try { @@ -138,8 +127,8 @@ class ThreadRuntime implements Runtime { public Exception getDeathException() { if (isAlive()) { return null; - } else if (null != startupException) { - return startupException; + } else if (null != javaInstanceRunnable) { + return javaInstanceRunnable.getDeathException(); } else { return null; } -- To stop receiving notification emails like this one, please contact si...@apache.org.