[ 
https://issues.apache.org/jira/browse/KAFKA-7290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16588030#comment-16588030
 ] 

Guozhang Wang commented on KAFKA-7290:
--------------------------------------

The metadata request sent to brokers look normal to me.

What I think is the root cause is that the rocksDB file is corrupted, and hence 
whenever rebalance happens, while the task is trying to flush its state to the 
state directory, it will get the same issue as 

{code}
Manager.java:335) ~[firechief.jar:?]
        ... 8 more
Caused by: org.rocksdb.RocksDBException: _
        at org.rocksdb.RocksDB.flush(Native Method) ~[firechief.jar:?]
        at org.rocksdb.RocksDB.flush(RocksDB.java:1642) ~[firechief.jar:?]
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:352)
 ~[firechief.jar:?]
{code}

Note that when you change the application id, the state directory will be 
changed as well.

To validate if this is the case, next time you hit the issue you can wipe out 
the corresponding task's state directory, and restart to see if the issue goes 
away..

> Kafka Streams application fails to rebalance and is stuck in "Updated cluster 
> metadata version"
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7290
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7290
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1, 0.10.2.2, 0.11.0.3
>            Reporter: Tim Van Laer
>            Priority: Major
>         Attachments: cg_metadata_failure.txt
>
>
> Our kafka streams application crashed due to a RocksDBException, after that 
> the consumer group basically became unusable. Every consumer in the group 
> went from RUNNING to REBALANCING and was stuck to that state. 
> The application was still on an older version of Kafka Streams (0.10.2.1), 
> but an upgrade of the library didn't got the consumer group back active.
> We tried:
> * adding and removing consumers to the group, no luck, none of the consumers 
> starts processing
> * stopping all consumers and restarted the application, no luck
> * stopping all consumer, reset the consumer group (using the 
> kafka-streams-application-reset tool), no luck
> * replaced the underlying machines, no luck
> * Upgrading our application from Kafka Streams 0.10.2.1 to 0.10.2.2 and 
> 0.11.0.3 after it got stuck, no luck
> We finally got the application back running by changing the applicationId (we 
> could afford to loose the state in this particular case). 
> See attachment for debug logs of the application. The application can reach 
> the Kafka cluster but fails to join the group. 
> The RocksDBException that triggered this state (I lost the container, so 
> unfortunately I don't have more logging):
> {code}
> 2018-08-14 01:40:39 ERROR StreamThread:813 - stream-thread [StreamThread-1] 
> Failed to commit StreamTask 1_1 state:
> org.apache.kafka.streams.errors.ProcessorStateException: task [1_1] Failed to 
> flush state store firehose_subscriptions
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
>  [firechief.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
>  [firechief.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
>  [firechief.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
>  [firechief.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [firechief.jar:?]
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error 
> while executing flush from store firehose_subscriptions
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:354)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:113)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335)
>  ~[firechief.jar:?]
>         ... 8 more
> Caused by: org.rocksdb.RocksDBException: _
>         at org.rocksdb.RocksDB.flush(Native Method) ~[firechief.jar:?]
>         at org.rocksdb.RocksDB.flush(RocksDB.java:1642) ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:352)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:113)
>  ~[firechief.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335)
>  ~[firechief.jar:?]
>         ... 8 more
> {code}
> Any ideas on what is wrong or what we can do to workaround this issue? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to