[GitHub] [kafka] chia7712 commented on a change in pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest
chia7712 commented on a change in pull request #8913: URL: https://github.com/apache/kafka/pull/8913#discussion_r448711625 ## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ## @@ -168,7 +168,7 @@ public int run(final String[] args, consumerConfig.putAll(properties); exitCode = maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun); maybeDeleteInternalTopics(adminClient, dryRun); - +System.out.println("succeed to reset stream application: " + groupId); Review comment: copy that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest
chia7712 commented on a change in pull request #8913: URL: https://github.com/apache/kafka/pull/8913#discussion_r447488640 ## File path: tests/kafkatest/services/streams.py ## @@ -445,6 +449,40 @@ def __init__(self, test_context, kafka, configs): "org.apache.kafka.streams.tests.StreamsStandByReplicaTest", configs) +class StreamsResetter(StreamsTestBaseService): Review comment: @ableegoldman Please take a look at look :) ## File path: tests/kafkatest/services/streams.py ## @@ -445,6 +449,40 @@ def __init__(self, test_context, kafka, configs): "org.apache.kafka.streams.tests.StreamsStandByReplicaTest", configs) +class StreamsResetter(StreamsTestBaseService): Review comment: @ableegoldman Please take a look :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest - call KafkaStreams#cleanU…
chia7712 commented on a change in pull request #8913: URL: https://github.com/apache/kafka/pull/8913#discussion_r444374011 ## File path: streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java ## @@ -130,6 +130,8 @@ public static void main(final String[] args) throws Exception { } }); +if (streamsProperties.containsKey("streams.cleanup") +&& Boolean.parseBoolean(streamsProperties.getProperty("streams.cleanup"))) streams.cleanUp(); Review comment: Copy that. one line change for this PR :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest - call KafkaStreams#cleanU…
chia7712 commented on a change in pull request #8913: URL: https://github.com/apache/kafka/pull/8913#discussion_r444373556 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java ## @@ -303,7 +303,8 @@ public void computeTaskLags(final UUID uuid, final Map allTaskEndO public long lagFor(final TaskId task) { final Long totalLag = taskLagTotals.get(task); if (totalLag == null) { -throw new IllegalStateException("Tried to lookup lag for unknown task " + task); +throw new IllegalStateException("Tried to lookup lag for unknown task: " + task ++ " (This exception may be caused by that you don't call KafkaStreams#cleanUp when topology optimization is enabled)"); Review comment: you are right. I will revert it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org