[ 
https://issues.apache.org/jira/browse/KAFKA-10191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17145917#comment-17145917
 ] 

Sophie Blee-Goldman commented on KAFKA-10191:
---------------------------------------------

[~mhmdchebbi] There's something called the "application reset tool" for Streams 
applications. It just resets some of the internal metadata and state for the 
application and its consumer group. You can read up on it here: 
[https://kafka.apache.org/25/documentation/streams/developer-guide/app-reset-tool.html]

Let me know if you have any questions about it or how to use it

> fix flaky StreamsOptimizedTest - call KafkaStreams#cleanUp before starting 
> the application up the second time
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10191
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10191
>             Project: Kafka
>          Issue Type: Test
>          Components: streams, unit tests
>            Reporter: Chia-Ping Tsai
>            Assignee: Chia-Ping Tsai
>            Priority: Major
>
> {quote}Exception in thread 
> "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" 
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 
> 2_0Exception in thread 
> "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" 
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 2_0 at 
> org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306)
>  at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511) 
> at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) 
> at java.util.TreeMap.put(TreeMap.java:552) at 
> java.util.TreeSet.add(TreeSet.java:255) at 
> java.util.AbstractCollection.addAll(AbstractCollection.java:344) at 
> java.util.TreeSet.addAll(TreeSet.java:312) at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1250)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1164)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:920)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:391)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:583)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1400(AbstractCoordinator.java:111)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:602)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1132)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1107)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1263)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1229) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1204) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:762)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:622)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:549)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:508)
> {quote}
>  
> this issue may be related to 
> [https://github.com/apache/kafka/commit/0f68dc7a640b26a8edea154ea4ea2b6d93b5104b]
>  since the test passes If  the commit is reverted



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to