Hello,

We are using Kafka version 0.11.0.1. with KafkaStream.
Using the leftJoin API in the Kafka Stream:

<VO,VR> KStream<K,VR> leftJoin(KStream<K,VO> otherStream,
                             ValueJoiner<? super V,? super VO,? extends VR>
joiner,
                             JoinWindows windows)

https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/kstream/KStream.html#leftJoin(org.apache.kafka.streams.kstream.KStream,%20org.apache.kafka.streams.kstream.ValueJoiner,%20org.apache.kafka.streams.kstream.JoinWindows)

with partitions = 30
brokers = 3 and replicas =1.

We run the stream application on 3 instances machines.  When one of the
machine the machine automatically re-started, we got exception because
autogenerated stores name is not available anymore.
and the state of the instance is changing from rebalancing to error.

Hereby is the exception:

org.apache.kafka.streams.errors.StreamsException: Store
KSTREAM-JOINTHIS-0000000056-store's change log
(joinService1-KSTREAM-JOINTHIS-0000000056-store-changelog) does not contain
partition 21
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:91)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:158)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100)
at
org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.init(RocksDBSegmentedBytesStore.java:110)
at
org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.init(ChangeLoggingSegmentedBytesStore.java:72)
at
org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.init(MeteredSegmentedBytesStore.java:72)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:95)
at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:220)
at
org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:566)
at
org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:89)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:493)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)


Do you have any ideas to mitigate this issue?

Thanks,
Danny

Reply via email to