Repository: samza
Updated Branches:
  refs/heads/master e3f85871c -> 5a88b9e47


SAMZA-1112; BrokerProxy does not log fatal errors

Add an UncaughtExceptionHandler to the broker proxy thread so
failures there get logged.

Author: Tommy Becker <tobec...@tivo.com>

Reviewers: Jagadish <jagad...@apache.org>

Closes #80 from twbecker/SAMZA-1112


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5a88b9e4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5a88b9e4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5a88b9e4

Branch: refs/heads/master
Commit: 5a88b9e47cd9b2a2aba742ff4fe8eeefb7a87e92
Parents: e3f8587
Author: Tommy Becker <tobec...@tivo.com>
Authored: Sat Mar 11 06:56:48 2017 -0800
Committer: vjagadish1989 <jvenk...@linkedin.com>
Committed: Sat Mar 11 06:56:48 2017 -0800

----------------------------------------------------------------------
 .../org/apache/samza/job/local/ThreadJob.scala    |  4 +++-
 .../apache/samza/system/kafka/BrokerProxy.scala   | 18 ++++++++++++------
 2 files changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/5a88b9e4/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
index 63754c5..e0522b1 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
@@ -19,6 +19,8 @@
 
 package org.apache.samza.job.local
 
+import java.lang.Thread.UncaughtExceptionHandler
+
 import org.apache.samza.util.Logging
 import org.apache.samza.job.StreamJob
 import org.apache.samza.job.ApplicationStatus
@@ -42,7 +44,7 @@ class ThreadJob(runnable: Runnable) extends StreamJob with 
Logging {
           runnable.run
           jobStatus = Some(SuccessfulFinish)
         } catch {
-          case e: Exception => {
+          case e: Throwable => {
             error("Failing job with exception.", e)
             jobStatus = Some(UnsuccessfulFinish)
             throw e

http://git-wip-us.apache.org/repos/asf/samza/blob/5a88b9e4/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index cbb8881..539a439 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -21,17 +21,20 @@
 
 package org.apache.samza.system.kafka
 
+import java.lang.Thread.UncaughtExceptionHandler
 import java.nio.channels.ClosedByInterruptException
 import java.util.Map.Entry
 import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
+
 import kafka.api._
-import kafka.common.{NotLeaderForPartitionException, 
UnknownTopicOrPartitionException, ErrorMapping, TopicAndPartition}
+import kafka.common.{ErrorMapping, NotLeaderForPartitionException, 
TopicAndPartition, UnknownTopicOrPartitionException}
 import kafka.consumer.ConsumerConfig
 import kafka.message.MessageSet
 import org.apache.samza.SamzaException
 import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.util.Logging
 import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX
+
 import scala.collection.JavaConversions._
 import scala.collection.concurrent
 import scala.collection.mutable
@@ -198,8 +201,8 @@ class BrokerProxy(
   }
 
   /**
-   * Releases ownership for a single TopicAndPartition. The 
-   * KafkaSystemConsumer will try and find a new broker for the 
+   * Releases ownership for a single TopicAndPartition. The
+   * KafkaSystemConsumer will try and find a new broker for the
    * TopicAndPartition.
    */
   def abdicate(tp: TopicAndPartition) = removeTopicPartition(tp) match {
@@ -209,8 +212,8 @@ class BrokerProxy(
   }
 
   /**
-   * Releases all TopicAndPartition ownership for this BrokerProxy thread. The 
-   * KafkaSystemConsumer will try and find a new broker for the 
+   * Releases all TopicAndPartition ownership for this BrokerProxy thread. The
+   * KafkaSystemConsumer will try and find a new broker for the
    * TopicAndPartition.
    */
   def abdicateAll {
@@ -295,6 +298,9 @@ class BrokerProxy(
       info("Starting " + toString)
       thread.setDaemon(true)
       thread.setName(SAMZA_THREAD_NAME_PREFIX + 
BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName)
+      thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
+        override def uncaughtException(t: Thread, e: Throwable) = 
error("Uncaught exception in broker proxy:", e)
+      })
       thread.start
     } else {
       debug("Tried to start an already started broker proxy (%s). Ignoring." 
format toString)
@@ -330,4 +336,4 @@ class BrokerProxy(
       }
     }
   }
-}
\ No newline at end of file
+}

Reply via email to