Repository: samza Updated Branches: refs/heads/master c51693bcd -> 002e13178
SAMZA-1572: Add fixed retries on failure in KafkaCheckpointManager. KafkaCheckpointManager.writeCheckpoint currently goes into a infinite loop when an irrecoverable failure happens, this indefinitely blocks the commit phase (there by preventing processing). Added finite retries (50), which would retry for fixed time in case of failure before giving up. Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Reviewers: Prateek M<prate...@cs.utexas.edu> Closes #420 from shanthoosh/add_fixed_retries_in_kafka_checkpoint_manager Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/002e1317 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/002e1317 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/002e1317 Branch: refs/heads/master Commit: 002e131784d52e02f6d4b46453b31d47f20cad31 Parents: c51693b Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Authored: Tue Feb 13 11:22:34 2018 -0800 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Tue Feb 13 11:22:34 2018 -0800 ---------------------------------------------------------------------- .../kafka/KafkaCheckpointManager.scala | 10 +++++++- .../kafka/TestKafkaCheckpointManager.scala | 25 ++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/002e1317/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index 50d22b1..965209d 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -53,6 +53,9 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, checkpointMsgSerde: Serde[Checkpoint] = new CheckpointSerde, checkpointKeySerde: Serde[KafkaCheckpointLogKey] = new KafkaCheckpointLogKeySerde) extends CheckpointManager with Logging { + // Retry duration is approximately 83 minutes. + var MaxRetriesOnFailure = 50 + info(s"Creating KafkaCheckpointManager for checkpointTopic:$checkpointTopic, systemName:$checkpointSystem " + s"validateCheckpoints:$validateCheckpoint") @@ -159,7 +162,12 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, }, (exception, loop) => { - warn(s"Retrying failed checkpoint write to key: $key, checkpoint: $checkpoint for task: $taskName", exception) + if (loop.sleepCount >= MaxRetriesOnFailure) { + error(s"Exhausted $MaxRetriesOnFailure retries when writing checkpoint: $checkpoint for task: $taskName.") + throw new SamzaException(s"Exception when writing checkpoint: $checkpoint for task: $taskName.", exception) + } else { + warn(s"Retrying failed checkpoint write to key: $key, checkpoint: $checkpoint for task: $taskName", exception) + } } ) } http://git-wip-us.apache.org/repos/asf/samza/blob/002e1317/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index c4e57f7..3761ea1 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -33,6 +33,7 @@ import org.apache.samza.checkpoint.Checkpoint import org.apache.samza.config._ import org.apache.samza.container.TaskName import org.apache.samza.container.grouper.stream.GroupByPartitionFactory +import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.serializers.CheckpointSerde import org.apache.samza.system._ import org.apache.samza.system.kafka.{KafkaStreamSpec, KafkaSystemFactory} @@ -40,6 +41,7 @@ import org.apache.samza.util.{KafkaUtilException, NoOpMetricsRegistry, Util} import org.apache.samza.{Partition, SamzaException} import org.junit.Assert._ import org.junit._ +import org.mockito.Mockito class TestKafkaCheckpointManager extends KafkaServerTestHarness { @@ -96,6 +98,29 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { assertEquals(checkpoint2, readCheckpoint(checkpointTopic, taskName)) } + @Test(expected = classOf[SamzaException]) + def testWriteCheckpointShouldRetryFiniteTimesOnFailure: Unit = { + val checkpointTopic = "checkpoint-topic-2" + val mockKafkaProducer: SystemProducer = Mockito.mock(classOf[SystemProducer]) + + class MockSystemFactory extends KafkaSystemFactory { + override def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = { + mockKafkaProducer + } + } + + Mockito.doThrow(new RuntimeException()).when(mockKafkaProducer).flush(taskName.getTaskName) + + val props = new org.apache.samza.config.KafkaConfig(config).getCheckpointTopicProperties() + val spec = new KafkaStreamSpec("id", checkpointTopic, checkpointSystemName, 1, 1, false, props) + val checkPointManager = new KafkaCheckpointManager(spec, new MockSystemFactory, false, config, new NoOpMetricsRegistry) + checkPointManager.MaxRetriesOnFailure = 1 + + checkPointManager.register(taskName) + checkPointManager.start + checkPointManager.writeCheckpoint(taskName, new Checkpoint(ImmutableMap.of())) + } + @Test def testFailOnTopicValidation { // By default, should fail if there is a topic validation error