Repository: samza
Updated Branches:
  refs/heads/master c51693bcd -> 002e13178


SAMZA-1572: Add fixed retries on failure in KafkaCheckpointManager.

KafkaCheckpointManager.writeCheckpoint currently goes into a infinite loop when 
an irrecoverable failure happens, this indefinitely blocks the commit phase 
(there by preventing processing). Added finite retries (50), which would retry 
for fixed time in case of failure before giving up.

Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>

Reviewers: Prateek M<prate...@cs.utexas.edu>

Closes #420 from shanthoosh/add_fixed_retries_in_kafka_checkpoint_manager


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/002e1317
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/002e1317
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/002e1317

Branch: refs/heads/master
Commit: 002e131784d52e02f6d4b46453b31d47f20cad31
Parents: c51693b
Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>
Authored: Tue Feb 13 11:22:34 2018 -0800
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Tue Feb 13 11:22:34 2018 -0800

----------------------------------------------------------------------
 .../kafka/KafkaCheckpointManager.scala          | 10 +++++++-
 .../kafka/TestKafkaCheckpointManager.scala      | 25 ++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/002e1317/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 50d22b1..965209d 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -53,6 +53,9 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
                              checkpointMsgSerde: Serde[Checkpoint] = new 
CheckpointSerde,
                              checkpointKeySerde: Serde[KafkaCheckpointLogKey] 
= new KafkaCheckpointLogKeySerde) extends CheckpointManager with Logging {
 
+  // Retry duration is approximately 83 minutes.
+  var MaxRetriesOnFailure = 50
+
   info(s"Creating KafkaCheckpointManager for checkpointTopic:$checkpointTopic, 
systemName:$checkpointSystem " +
     s"validateCheckpoints:$validateCheckpoint")
 
@@ -159,7 +162,12 @@ class KafkaCheckpointManager(checkpointSpec: 
KafkaStreamSpec,
       },
 
       (exception, loop) => {
-        warn(s"Retrying failed checkpoint write to key: $key, checkpoint: 
$checkpoint for task: $taskName", exception)
+        if (loop.sleepCount >= MaxRetriesOnFailure) {
+          error(s"Exhausted $MaxRetriesOnFailure retries when writing 
checkpoint: $checkpoint for task: $taskName.")
+          throw new SamzaException(s"Exception when writing checkpoint: 
$checkpoint for task: $taskName.", exception)
+        } else {
+          warn(s"Retrying failed checkpoint write to key: $key, checkpoint: 
$checkpoint for task: $taskName", exception)
+        }
       }
     )
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/002e1317/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index c4e57f7..3761ea1 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -33,6 +33,7 @@ import org.apache.samza.checkpoint.Checkpoint
 import org.apache.samza.config._
 import org.apache.samza.container.TaskName
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.serializers.CheckpointSerde
 import org.apache.samza.system._
 import org.apache.samza.system.kafka.{KafkaStreamSpec, KafkaSystemFactory}
@@ -40,6 +41,7 @@ import org.apache.samza.util.{KafkaUtilException, 
NoOpMetricsRegistry, Util}
 import org.apache.samza.{Partition, SamzaException}
 import org.junit.Assert._
 import org.junit._
+import org.mockito.Mockito
 
 class TestKafkaCheckpointManager extends KafkaServerTestHarness {
 
@@ -96,6 +98,29 @@ class TestKafkaCheckpointManager extends 
KafkaServerTestHarness {
     assertEquals(checkpoint2, readCheckpoint(checkpointTopic, taskName))
   }
 
+  @Test(expected = classOf[SamzaException])
+  def testWriteCheckpointShouldRetryFiniteTimesOnFailure: Unit = {
+    val checkpointTopic = "checkpoint-topic-2"
+    val mockKafkaProducer: SystemProducer = 
Mockito.mock(classOf[SystemProducer])
+
+    class MockSystemFactory extends KafkaSystemFactory {
+      override def getProducer(systemName: String, config: Config, registry: 
MetricsRegistry): SystemProducer = {
+        mockKafkaProducer
+      }
+    }
+
+    Mockito.doThrow(new 
RuntimeException()).when(mockKafkaProducer).flush(taskName.getTaskName)
+
+    val props = new 
org.apache.samza.config.KafkaConfig(config).getCheckpointTopicProperties()
+    val spec = new KafkaStreamSpec("id", checkpointTopic, 
checkpointSystemName, 1, 1, false, props)
+    val checkPointManager = new KafkaCheckpointManager(spec, new 
MockSystemFactory, false, config, new NoOpMetricsRegistry)
+    checkPointManager.MaxRetriesOnFailure = 1
+
+    checkPointManager.register(taskName)
+    checkPointManager.start
+    checkPointManager.writeCheckpoint(taskName, new 
Checkpoint(ImmutableMap.of()))
+  }
+
   @Test
   def testFailOnTopicValidation {
     // By default, should fail if there is a topic validation error

Reply via email to