[ 
https://issues.apache.org/jira/browse/KAFKA-15834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787678#comment-17787678
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15834:
------------------------------------------------

I just checked the current code and it looks like we do still respect the 
guarantee of invoking #onPartitionsAssigned in all cases. So I don't think step 
#4 is correct. Did you happen to see anything in the logs that would suggest 
the StreamThread was continuing in its regular loop and never stopping due to 
the rebalanceInProgress flag? Or is it possible that it's hanging somewhere in 
the shutdown process (or even in the rebalance itself)?

I'm just wondering if it might be related to the Producer, not the Consumer. I 
know we had some issues with the Producer#close hanging in the past, and that 
it was related to users deleting topics from under the app, which would be a 
similar situation to what you found here. I'm not sure if we ever fixed that, 
maybe [~mjsax] will remember the ticket for the Producer issue?

> Subscribing to non-existent topic blocks StreamThread from stopping
> -------------------------------------------------------------------
>
>                 Key: KAFKA-15834
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15834
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.6.0
>            Reporter: Greg Harris
>            Priority: Major
>
> In 
> NamedTopologyIntegrationTest#shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics
>  a topology is created which references an input topic which does not exist. 
> The test as-written passes, but the KafkaStreams#close(Duration) at the end 
> times out, and leaves StreamsThreads running.
> From some cursory investigation it appears that this is happening:
> 1. The consumer calls the StreamsPartitionAssignor, which calls 
> TaskManager#handleRebalanceStart as a side-effect
> 2. handleRebalanceStart sets the rebalanceInProgress flag
> 3. This flag is checked by StreamThread.runLoop, and causes the loop to 
> remain running.
> 4. The consumer never calls StreamsRebalanceListener#onPartitionsAssigned, 
> because the topic does not exist
> 5. Because no partitions are ever assigned, the 
> TaskManager#handleRebalanceComplete never clears the rebalanceInProgress flag
>  
> This log message is printed in a tight loop while the close is ongoing and 
> the consumer is being polled with zero duration:
> {noformat}
> [2023-11-15 11:42:43,661] WARN [Consumer 
> clientId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics-942756f8-5213-4c44-bb6b-5f805884e026-StreamThread-1-consumer,
>  
> groupId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics]
>  Received unknown topic or partition error in fetch for partition 
> unique_topic_prefix-topology-1-store-repartition-0 
> (org.apache.kafka.clients.consumer.internals.FetchCollector:321)
> {noformat}
> Practically, this means that this test leaks two StreamsThreads and the 
> associated clients and sockets, and delays the completion of the test until 
> the KafkaStreams#close(Duration) call times out.
> Either we should change the rebalanceInProgress flag to avoid getting stuck 
> in this rebalance state, or figure out a way to shut down a StreamsThread 
> that is in an extended rebalance state during shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to