Hello Eno,
So, if I change the input topic partitions, it affects the ability of kafka
streams to find partitions for the state store changelog? I think I'm
missing something here.
In my case, the application was new, so it's for sure that there were no
changes.
Also, if I have regex for the input topic on kafka streams and a new topic
is added to kafka matching the regex, the application will break?

On Fri, Aug 4, 2017, 8:33 PM Eno Thereska <eno.there...@gmail.com> wrote:

> Hi,
>
> Could you check if this helps:
>
> https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition
> <
> https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition
> >
>
> Thanks
> Eno
> > On Aug 4, 2017, at 12:48 PM, Anish Mashankar <an...@systeminsights.com>
> wrote:
> >
> > Hello Eno,
> > Thanks for considering the question.
> >
> > How I am creating the state stores:
> >
> > StateStoreSupplier stateStoreSupplier =
> >
> StateStorStores.create("testing-2-store").withKeys(keySerde).withValues(valueSerde).persistent().build();
> > TopologyBuilder builder = ...
> > builder.addStateStore(stateStoreSupplier, "ProcessorUsingStateStore");
> >
> > The Error Message with stack trace is as follows:
> >
> > 2017-08-04 17:11:23,184 53205
> > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO
> > o.a.k.s.p.internals.StreamThread - stream-thread
> > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] Created
> > active task -727063541_0 with assigned partitions [testing-topic-0]
> >
> > 2017-08-04 17:11:23,185 53206
> > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO
> > o.a.k.s.p.internals.StreamThread - stream-thread
> > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] partition
> > assignment took 41778 ms.
> > current active tasks: []
> > current standby tasks: []
> >
> > 2017-08-04 17:11:23,187 53208
> > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] ERROR
> > o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> >
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
> > for group testing-2 failed on partition assignment
> > org.apache.kafka.streams.errors.StreamsException: Store testing-2-store's
> > change log (testing-2-testing-2-store-changelog) does not contain
> partition
> > 0
> > at
> >
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:87)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165)
> > at
> >
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100)
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:177)
> > at
> >
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
> > at
> >
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:57)
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130)
> > at
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:140)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
> >
> > I hope this shares more light on the situation.
> > Thanks
> >
> > On Fri, Aug 4, 2017 at 2:28 PM Eno Thereska <eno.there...@gmail.com
> <mailto:eno.there...@gmail.com>> wrote:
> >
> >> Hi Anish,
> >>
> >> Could you give more info on how you create the state stores in your
> code?
> >> Also could you copy-paste the exact error message from the log?
> >>
> >> Thanks
> >> Eno
> >>> On Aug 4, 2017, at 9:05 AM, Anish Mashankar <an...@systeminsights.com>
> >> wrote:
> >>>
> >>> I have a new application, call it streamsApp with state stores S1 and
> S2.
> >>> So, according to the documentation, upon the first time startup, the
> >>> application should've created the changelog topics
> >> streamsApp-S1-changelog
> >>> and streamsApp-S2-changelog. But I see that these topics are not
> created.
> >>> Also, the application throws an error that it couldn't find any
> partition
> >>> for topics *streamsApp-S1-changelog and streamsApp-S2-changelog *and
> then
> >>> exits*. *To get it working, I manually created the topics, but I am
> >>> skeptical because the docs say that this convention might change any
> >> time.
> >>> I am using Kafka Streams v0.11, with a Kafka Broker v0.11, but message
> >>> protocol set to v0.10.0. Am I missing something?
> >>> --
> >>>
> >>> Regards,
> >>> Anish Samir Mashankar
> >>> R&D Engineer
> >>> System Insights
> >>> +91-9789870733 <+91%2097898%2070733>
> >>
> >> --
> >
> > Regards,
> > Anish Samir Mashankar
> > R&D Engineer
> > System Insights
> > +91-9789870733
>
> --

Regards,
Anish Samir Mashankar
R&D Engineer
System Insights
+91-9789870733

Reply via email to