Thanks Matthias for the information! 2017-11-20 19:23 GMT+01:00 Matthias J. Sax <matth...@confluent.io>:
> Sound like Streams can't fetch the metadata completely. > > You can increase Consumer config `REQUEST_TIMEOUT_MS_CONFIG` to give > more time to the cluster to broadcast the information to all brokers. > > https://docs.confluent.io/current/streams/developer- > guide/config-streams.html#kafka-consumers-and-producer- > configuration-parameters > > Please let us know if this resolved the issue. > > > -Matthias > > On 11/20/17 7:22 AM, D Stephan wrote: > > Hello, > > > > We are using Kafka version 0.11.0.1. with KafkaStream. > > Using the leftJoin API in the Kafka Stream: > > > > <VO,VR> KStream<K,VR> leftJoin(KStream<K,VO> otherStream, > > ValueJoiner<? super V,? super VO,? extends > VR> > > joiner, > > JoinWindows windows) > > > > https://kafka.apache.org/0110/javadoc/org/apache/kafka/ > streams/kstream/KStream.html#leftJoin(org.apache.kafka. > streams.kstream.KStream,%20org.apache.kafka.streams. > kstream.ValueJoiner,%20org.apache.kafka.streams.kstream.JoinWindows) > > > > with partitions = 30 > > brokers = 3 and replicas =1. > > > > We run the stream application on 3 instances machines. When one of the > > machine the machine automatically re-started, we got exception because > > autogenerated stores name is not available anymore. > > and the state of the instance is changing from rebalancing to error. > > > > Hereby is the exception: > > > > org.apache.kafka.streams.errors.StreamsException: Store > > KSTREAM-JOINTHIS-0000000056-store's change log > > (joinService1-KSTREAM-JOINTHIS-0000000056-store-changelog) does not > contain > > partition 21 > > at > > org.apache.kafka.streams.processor.internals.StoreChangelogReader. > validatePartitionExists(StoreChangelogReader.java:91) > > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager. > register(ProcessorStateManager.java:158) > > at > > org.apache.kafka.streams.processor.internals.AbstractProcessorContext. > register(AbstractProcessorContext.java:100) > > at > > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore. > init(RocksDBSegmentedBytesStore.java:110) > > at > > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesSto > re.init(ChangeLoggingSegmentedBytesStore.java:72) > > at > > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore. > init(MeteredSegmentedBytesStore.java:72) > > at > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init( > RocksDBWindowStore.java:95) > > at > > org.apache.kafka.streams.processor.internals.AbstractTask. > initializeStateStores(AbstractTask.java:220) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.initialize( > StreamTask.java:566) > > at > > org.apache.kafka.streams.processor.internals.AssignedTasks. > initializeNewTasks(AssignedTasks.java:89) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > StreamThread.java:493) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:480) > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:457) > > > > > > Do you have any ideas to mitigate this issue? > > > > Thanks, > > Danny > > > >