Repository: samza Updated Branches: refs/heads/master 6dc33a850 -> c6c10d31e
SAMZA-1077: SamzaContainer should catch all Throwables instead of only exceptions Author: vjagadish1989 <jvenk...@linkedin.com> Reviewers: Jake Maes <jacob.m...@gmail.com> Closes #30 from vjagadish1989/samza-1077 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c6c10d31 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c6c10d31 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c6c10d31 Branch: refs/heads/master Commit: c6c10d31e37be3c9b07d202d17bcba6cb1213c3b Parents: 6dc33a8 Author: vjagadish1989 <jvenk...@linkedin.com> Authored: Mon Jan 30 14:41:57 2017 -0800 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Mon Jan 30 14:44:10 2017 -0800 ---------------------------------------------------------------------- .../apache/samza/container/SamzaContainer.scala | 4 +- .../samza/container/TestSamzaContainer.scala | 59 ++++++++++++++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c6c10d31/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index f1d62c5..e49da57 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -668,8 +668,8 @@ class SamzaContainer( addShutdownHook runLoop.run } catch { - case e: Exception => - error("Caught exception in process loop.", e) + case e: Throwable => + error("Caught exception/error in process loop.", e) throw e } finally { info("Shutting down.") http://git-wip-us.apache.org/repos/asf/samza/blob/c6c10d31/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 5895037..0d86833 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -238,6 +238,65 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { } @Test + def testErrorInTaskInitShutsDownTask { + val task = new StreamTask with InitableTask with ClosableTask { + var wasShutdown = false + + def init(config: Config, context: TaskContext) { + throw new NoSuchMethodError("Trigger a shutdown, please.") + } + + def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { + } + + def close { + wasShutdown = true + } + } + val config = new MapConfig + val taskName = new TaskName("taskName") + val consumerMultiplexer = new SystemConsumers( + new RoundRobinChooser, + Map[String, SystemConsumer]()) + val producerMultiplexer = new SystemProducers( + Map[String, SystemProducer](), + new SerdeManager) + val collector = new TaskInstanceCollector(producerMultiplexer) + val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName)) + val taskInstance: TaskInstance[StreamTask] = new TaskInstance[StreamTask]( + task, + taskName, + config, + new TaskInstanceMetrics, + null, + consumerMultiplexer, + collector, + containerContext + ) + val runLoop = new RunLoop( + taskInstances = Map(taskName -> taskInstance), + consumerMultiplexer = consumerMultiplexer, + metrics = new SamzaContainerMetrics, + maxThrottlingDelayMs = TimeUnit.SECONDS.toMillis(1)) + val container = new SamzaContainer( + containerContext = containerContext, + taskInstances = Map(taskName -> taskInstance), + runLoop = runLoop, + consumerMultiplexer = consumerMultiplexer, + producerMultiplexer = producerMultiplexer, + metrics = new SamzaContainerMetrics, + jmxServer = null + ) + try { + container.run + fail("Expected error to be thrown in run method.") + } catch { + case e: Throwable => // Expected + } + assertTrue(task.wasShutdown) + } + + @Test def testStartStoresIncrementsCounter { val task = new StreamTask { def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {