wcarlson5 commented on a change in pull request #11812:
URL: https://github.com/apache/kafka/pull/11812#discussion_r815316387



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
##########
@@ -73,11 +73,11 @@ public boolean canProcess() {
         }
 
         public boolean canProcessTask(final Task task, final long now) {
-            // TODO: implement exponential backoff, for now we just wait 15s
+            // TODO: implement exponential backoff, for now we just wait 5s
             final Long errorTime = tasksToErrorTime.get(task.id());
             if (errorTime == null) {
                 return true;
-            } else if (now - errorTime > 15000L) {
+            } else if (now - errorTime > 5000L) {

Review comment:
       Nit: Could you make this a static var?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -248,14 +252,94 @@ public void 
shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread() throw
 
             
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
 
-            produceMessages(0L, inputTopic, "A");
+            produceMessages(0L, inputTopic2, "A");
             waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.ERROR, DEFAULT_DURATION);
 
             assertThat(processorValueCollector.size(), equalTo(1));
         }
 
     }
 
+    @Test
+    public void shouldEmitSameRecordAfterFailover() throws Exception {

Review comment:
       Nice move, this really does belong in this class

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -775,6 +781,70 @@ public void shouldWaitForMissingInputTopicsToBeCreated() 
throws Exception {
         }
     }
 
+    @Test
+    public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws 
Exception {
+        final AtomicInteger noOutputExpected = new AtomicInteger(0);
+        final AtomicInteger outputExpected = new AtomicInteger(0);
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L);
+        props.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(appId).getPath());
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class);
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+
+        streams = new KafkaStreamsNamedTopologyWrapper(props);
+        streams.setUncaughtExceptionHandler(exception -> 
StreamThreadExceptionResponse.REPLACE_THREAD);
+
+        final NamedTopologyBuilder builder = 
streams.newNamedTopologyBuilder("topology_A");
+        builder.stream(DELAYED_INPUT_STREAM_1).peek((k, v) -> 
outputExpected.incrementAndGet()).to(OUTPUT_STREAM_1);

Review comment:
       would it be useful to have a second named topology as well to process 
data and make sure it makes progress?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to