Andrey Volkov created KAFKA-8062:
------------------------------------

             Summary: StateListener is not notified when StreamThread dies
                 Key: KAFKA-8062
                 URL: https://issues.apache.org/jira/browse/KAFKA-8062
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.1.1
         Environment: Kafka 2.1.1, kafka-streams-scala version 2.1.1
            Reporter: Andrey Volkov


I want my application to react when streams die. Trying to use 
KafkaStreams.setStateListener. Also checking KafkaStreams.state() from time to 
time.

The test scenario: Kafka is available, but there are no topics that my Topology 
is supposed to use.

I expect streams to crash and the state listener to be notified about that, 
with the new state ERROR. KafkaStreams.state() should also return ERROR.

In reality the streams crash, but the KafkaStreams.state() method always 
returns REBALANCING and the last time the StateListener was called, the new 
state was also REBALANCING. 

 

I believe the reason for this is in the methods:

org.apache.kafka.streams.KafkaStreams.StreamStateListener.onChange() which does 
not react on the state StreamsThread.State.PENDING_SHUTDOWN

and

org.apache.kafka.streams.processor.internals.StreamThread.RebalanceListener.onPartitionsAssigned,
 which calls shutdown() setting the state to PENDING_SHUTDOWN and then

streamThread.setStateListener(null) effectively removing the state listener, so 
that the DEAD state of the thread never reaches KafkaStreams object.

Here is an extract from the logs:

{{14:57:03.272 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] ERROR 
o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread 
[Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer] 
test-input-topic is unknown yet during rebalance, please make sure they have 
been pre-created before starting the Streams application.}}
{{14:57:03.283 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO 
o.a.k.c.c.i.AbstractCoordinator - [Consumer 
clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer, 
groupId=Test] Successfully joined group with generation 1}}
{{14:57:03.284 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO 
o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer, 
groupId=Test] Setting newly assigned partitions []}}
{{14:57:03.285 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO 
o.a.k.s.p.i.StreamThread - stream-thread 
[Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Informed to shut 
down}}
{{14:57:03.285 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO 
o.a.k.s.p.i.StreamThread - stream-thread 
[Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] State transition 
from PARTITIONS_REVOKED to PENDING_SHUTDOWN}}
{{14:57:03.316 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO 
o.a.k.s.p.i.StreamThread - stream-thread 
[Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Shutting down}}
{{14:57:03.317 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO 
o.a.k.c.c.KafkaConsumer - [Consumer 
clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-restore-consumer,
 groupId=] Unsubscribed all topics or patterns and assigned partitions}}
{{14:57:03.317 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO 
o.a.k.c.p.KafkaProducer - [Producer 
clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-producer] 
Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.}}
{{14:57:03.332 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO 
o.a.k.s.p.i.StreamThread - stream-thread 
[Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] State transition 
from PENDING_SHUTDOWN to DEAD}}
{{14:57:03.332 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO 
o.a.k.s.p.i.StreamThread - stream-thread 
[Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Shutdown complete}}

After this calls to KafkaStreams.state() still return REBALANCING

There is a workaround with requesting KafkaStreams.localThreadsMetadata() and 
checking each thread's state manually, but that seems very wrong.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to