wcarlson5 commented on a change in pull request #11812: URL: https://github.com/apache/kafka/pull/11812#discussion_r815316387
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java ########## @@ -73,11 +73,11 @@ public boolean canProcess() { } public boolean canProcessTask(final Task task, final long now) { - // TODO: implement exponential backoff, for now we just wait 15s + // TODO: implement exponential backoff, for now we just wait 5s final Long errorTime = tasksToErrorTime.get(task.id()); if (errorTime == null) { return true; - } else if (now - errorTime > 15000L) { + } else if (now - errorTime > 5000L) { Review comment: Nit: Could you make this a static var? ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java ########## @@ -248,14 +252,94 @@ public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread() throw StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams); - produceMessages(0L, inputTopic, "A"); + produceMessages(0L, inputTopic2, "A"); waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION); assertThat(processorValueCollector.size(), equalTo(1)); } } + @Test + public void shouldEmitSameRecordAfterFailover() throws Exception { Review comment: Nice move, this really does belong in this class ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java ########## @@ -775,6 +781,70 @@ public void shouldWaitForMissingInputTopicsToBeCreated() throws Exception { } } + @Test + public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws Exception { + final AtomicInteger noOutputExpected = new AtomicInteger(0); + final AtomicInteger outputExpected = new AtomicInteger(0); + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L); + props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + + streams = new KafkaStreamsNamedTopologyWrapper(props); + streams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD); + + final NamedTopologyBuilder builder = streams.newNamedTopologyBuilder("topology_A"); + builder.stream(DELAYED_INPUT_STREAM_1).peek((k, v) -> outputExpected.incrementAndGet()).to(OUTPUT_STREAM_1); Review comment: would it be useful to have a second named topology as well to process data and make sure it makes progress? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org