We discovered and fixed some bugs in upcoming 1.0.1 and 1.1.0 releases.

Maybe you can try those out?

A ProducerFenced Exception should actually be self-healing and resolve
over time. How long did the application retry to rebalance?

Without logs, its hard to tell what might cause the issue though. EOS
can be subtle and it could be something different than reported before.


-Matthias


On 2/8/18 9:54 AM, dan bress wrote:
> Hi,
> 
> I recently switched my Kafka Streams 1.0.0 app to use exactly_once
> semantics and since them my cluster has been stuck in rebalancing.  Is
> there an explanation as to what is going on, or how I can resolve it?
> 
> I saw a similar issue discussed on the mailing list, but I don't know if a
> ticket was created or there was a resolution.
> 
> http://mail-archives.apache.org/mod_mbox/kafka-users/201711.mbox/%3CCAKkfnUY0C311Yq%3Drt8kyna4cyucV8HbgWpiYj%3DfnYMt9%2BAb8Mw%40mail.gmail.com%3E
> 
> This is the exception I'm seeing:
> 2018-02-08 17:09:20,763 ERR [kafka-producer-network-thread |
> dp-app-devel-dbress-a92a93de-c1b1-4655-b487-0e9f3f3f3409-StreamThread-4-0_414-producer]
> Sender [Producer
> clientId=dp-app-devel-dbress-a92a93de-c1b1-4655-b487-0e9f3f3f3409-StreamThread-4-0_414-producer,
> transactionalId=dp-app-devel-dbress-0_414] Aborting producer batches due to
> fatal error
> org.apache.kafka.common.errors.ProducerFencedException: Producer attempted
> an operation with an old epoch. Either there is a newer producer with the
> same transactionalId, or the producer's transaction has been expired by the
> broker.
> 2018-02-08 17:09:20,764 ERR
> [dp-app-devel-dbress-a92a93de-c1b1-4655-b487-0e9f3f3f3409-StreamThread-4]
> ProcessorStateManager task [0_414] Failed to flush state store
> summarykey-to-summary:
> org.apache.kafka.common.KafkaException: Cannot perform send because at
> least one previous transactional or idempotent request has failed with
> errors.
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:278)
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:263)
> at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:804)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:100)
> at
> org.apache.kafka.streams.state.internals.StoreChangeFlushingLogger.flush(StoreChangeFlushingLogger.java:92)
> at
> org.apache.kafka.streams.state.internals.InMemoryKeyValueFlushingLoggedStore.flush(InMemoryKeyValueFlushingLoggedStore.java:139)
> at
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:268)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:126)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324)
> at
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to