Greg Harris created KAFKA-15834:
-----------------------------------

             Summary: Subscribing to non-existent topic blocks StreamThread 
from stopping
                 Key: KAFKA-15834
                 URL: https://issues.apache.org/jira/browse/KAFKA-15834
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.6.0
            Reporter: Greg Harris


In 
NamedTopologyIntegrationTest#shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics
 a topology is created which references an input topic which does not exist. 
The test as-written passes, but the KafkaStreams#close(Duration) at the end 
times out, and leaves StreamsThreads running.

>From some cursory investigation it appears that this is happening:
1. The consumer calls the StreamsPartitionAssignor, which calls 
TaskManager#handleRebalanceStart as a side-effect
2. handleRebalanceStart sets the rebalanceInProgress flag
3. This flag is checked by StreamThread.runLoop, and causes the loop to remain 
running.
4. The consumer never calls StreamsRebalanceListener#onPartitionsAssigned, 
because the topic does not exist
5. Because no partitions are ever assigned, the 
TaskManager#handleRebalanceComplete never clears the rebalanceInProgress flag
 
This log message is printed in a tight loop while the close is ongoing and the 
consumer is being polled with zero duration:
{noformat}
[2023-11-15 11:42:43,661] WARN [Consumer 
clientId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics-942756f8-5213-4c44-bb6b-5f805884e026-StreamThread-1-consumer,
 
groupId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics]
 Received unknown topic or partition error in fetch for partition 
unique_topic_prefix-topology-1-store-repartition-0 
(org.apache.kafka.clients.consumer.internals.FetchCollector:321)
{noformat}
Practically, this means that this test leaks two StreamsThreads and the 
associated clients and sockets, and delays the completion of the test until the 
KafkaStreams#close(Duration) call times out.

Either we should change the rebalanceInProgress flag to avoid getting stuck in 
this rebalance state, or figure out a way to shut down a StreamsThread that is 
in an extended rebalance state during shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to