Repository: samza
Updated Branches:
  refs/heads/master 6dc33a850 -> c6c10d31e


SAMZA-1077: SamzaContainer should catch all Throwables instead of only 
exceptions

Author: vjagadish1989 <jvenk...@linkedin.com>

Reviewers: Jake Maes <jacob.m...@gmail.com>

Closes #30 from vjagadish1989/samza-1077


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c6c10d31
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c6c10d31
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c6c10d31

Branch: refs/heads/master
Commit: c6c10d31e37be3c9b07d202d17bcba6cb1213c3b
Parents: 6dc33a8
Author: vjagadish1989 <jvenk...@linkedin.com>
Authored: Mon Jan 30 14:41:57 2017 -0800
Committer: vjagadish1989 <jvenk...@linkedin.com>
Committed: Mon Jan 30 14:44:10 2017 -0800

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala |  4 +-
 .../samza/container/TestSamzaContainer.scala    | 59 ++++++++++++++++++++
 2 files changed, 61 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c6c10d31/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index f1d62c5..e49da57 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -668,8 +668,8 @@ class SamzaContainer(
       addShutdownHook
       runLoop.run
     } catch {
-      case e: Exception =>
-        error("Caught exception in process loop.", e)
+      case e: Throwable =>
+        error("Caught exception/error in process loop.", e)
         throw e
     } finally {
       info("Shutting down.")

http://git-wip-us.apache.org/repos/asf/samza/blob/c6c10d31/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 5895037..0d86833 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -238,6 +238,65 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
   }
 
   @Test
+  def testErrorInTaskInitShutsDownTask {
+    val task = new StreamTask with InitableTask with ClosableTask {
+      var wasShutdown = false
+
+      def init(config: Config, context: TaskContext) {
+        throw new NoSuchMethodError("Trigger a shutdown, please.")
+      }
+
+      def process(envelope: IncomingMessageEnvelope, collector: 
MessageCollector, coordinator: TaskCoordinator) {
+      }
+
+      def close {
+        wasShutdown = true
+      }
+    }
+    val config = new MapConfig
+    val taskName = new TaskName("taskName")
+    val consumerMultiplexer = new SystemConsumers(
+      new RoundRobinChooser,
+      Map[String, SystemConsumer]())
+    val producerMultiplexer = new SystemProducers(
+      Map[String, SystemProducer](),
+      new SerdeManager)
+    val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext(0, config, 
Set[TaskName](taskName))
+    val taskInstance: TaskInstance[StreamTask] = new TaskInstance[StreamTask](
+      task,
+      taskName,
+      config,
+      new TaskInstanceMetrics,
+      null,
+      consumerMultiplexer,
+      collector,
+      containerContext
+    )
+    val runLoop = new RunLoop(
+      taskInstances = Map(taskName -> taskInstance),
+      consumerMultiplexer = consumerMultiplexer,
+      metrics = new SamzaContainerMetrics,
+      maxThrottlingDelayMs = TimeUnit.SECONDS.toMillis(1))
+    val container = new SamzaContainer(
+      containerContext = containerContext,
+      taskInstances = Map(taskName -> taskInstance),
+      runLoop = runLoop,
+      consumerMultiplexer = consumerMultiplexer,
+      producerMultiplexer = producerMultiplexer,
+      metrics = new SamzaContainerMetrics,
+      jmxServer = null
+    )
+    try {
+      container.run
+      fail("Expected error to be thrown in run method.")
+    } catch {
+      case e: Throwable => // Expected
+    }
+    assertTrue(task.wasShutdown)
+  }
+
+  @Test
   def testStartStoresIncrementsCounter {
     val task = new StreamTask {
       def process(envelope: IncomingMessageEnvelope, collector: 
MessageCollector, coordinator: TaskCoordinator) {

Reply via email to