[ 
https://issues.apache.org/jira/browse/KAFKA-8062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16786882#comment-16786882
 ] 

Bill Bejeck commented on KAFKA-8062:
------------------------------------

Hi [~andrey.v.volkov]

Thanks for reporting this, can you share your topology to re-create the error 
locally?

Thanks,

Bill

> StateListener is not notified when StreamThread dies
> ----------------------------------------------------
>
>                 Key: KAFKA-8062
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8062
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.1.1
>         Environment: Kafka 2.1.1, kafka-streams-scala version 2.1.1
>            Reporter: Andrey Volkov
>            Priority: Minor
>
> I want my application to react when streams die. Trying to use 
> KafkaStreams.setStateListener. Also checking KafkaStreams.state() from time 
> to time.
> The test scenario: Kafka is available, but there are no topics that my 
> Topology is supposed to use.
> I expect streams to crash and the state listener to be notified about that, 
> with the new state ERROR. KafkaStreams.state() should also return ERROR.
> In reality the streams crash, but the KafkaStreams.state() method always 
> returns REBALANCING and the last time the StateListener was called, the new 
> state was also REBALANCING. 
>  
> I believe the reason for this is in the methods:
> org.apache.kafka.streams.KafkaStreams.StreamStateListener.onChange() which 
> does not react on the state StreamsThread.State.PENDING_SHUTDOWN
> and
> org.apache.kafka.streams.processor.internals.StreamThread.RebalanceListener.onPartitionsAssigned,
>  which calls shutdown() setting the state to PENDING_SHUTDOWN and then
> streamThread.setStateListener(null) effectively removing the state listener, 
> so that the DEAD state of the thread never reaches KafkaStreams object.
> Here is an extract from the logs:
> {{14:57:03.272 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> ERROR o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread 
> [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer] 
> test-input-topic is unknown yet during rebalance, please make sure they have 
> been pre-created before starting the Streams application.}}
> {{14:57:03.283 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer 
> clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer, 
> groupId=Test] Successfully joined group with generation 1}}
> {{14:57:03.284 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer, 
> groupId=Test] Setting newly assigned partitions []}}
> {{14:57:03.285 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.s.p.i.StreamThread - stream-thread 
> [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Informed to shut 
> down}}
> {{14:57:03.285 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.s.p.i.StreamThread - stream-thread 
> [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] State transition 
> from PARTITIONS_REVOKED to PENDING_SHUTDOWN}}
> {{14:57:03.316 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.s.p.i.StreamThread - stream-thread 
> [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Shutting down}}
> {{14:57:03.317 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.c.c.KafkaConsumer - [Consumer 
> clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-restore-consumer,
>  groupId=] Unsubscribed all topics or patterns and assigned partitions}}
> {{14:57:03.317 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.c.p.KafkaProducer - [Producer 
> clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-producer] 
> Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.}}
> {{14:57:03.332 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.s.p.i.StreamThread - stream-thread 
> [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] State transition 
> from PENDING_SHUTDOWN to DEAD}}
> {{14:57:03.332 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.s.p.i.StreamThread - stream-thread 
> [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Shutdown complete}}
> After this calls to KafkaStreams.state() still return REBALANCING
> There is a workaround with requesting KafkaStreams.localThreadsMetadata() and 
> checking each thread's state manually, but that seems very wrong.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to