Hi Damian - Yes, it exists .. It's actually a change log topic corresponding to the state store log-count
$ dcos confluent-kafka topic describe kstream-log-count-log-counts-changelog { "partitions": [ { "0": { "leader": -1, "controller_epoch": 3, "isr": [], "leader_epoch": 3, "version": 1 } } ] } Also 1 point to note is that when Mesos restarts the process it starts in a different node. So the local state store will not exist there. But I expect Kafka will create it from the corresponding backed up topic. Hence the exception looks a bit confusing to me. Thoughts ? regards. On Wed, Jul 26, 2017 at 3:43 PM, Damian Guy <damian....@gmail.com> wrote: > The exception indicates that streams was unable to find that > topic-partition on the kafka brokers. Can you verify that it exists? > Also, i'm assuming you are on 0.10.2.x? > > On Wed, 26 Jul 2017 at 10:54 Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > > > Thanks Damien .. this worked. But now after the application restarts, I > > see the following exception .. > > > > 09:41:26.516 TKD [StreamThread-1] ERROR > >> c.l.fdp.sample.kstream.WeblogDriver$ - Stream terminated because of > >> uncaught exception .. Shutting down app > >> org.apache.kafka.streams.errors.StreamsException: stream-thread > >> [StreamThread-1] Failed to rebalance > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:598) > >> at > >> org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:361) > >> Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] > >> Store log-counts's change log (kstream-log-count-log-counts-changelog) > does > >> not contain partition 0 > >> at > >> org.apache.kafka.streams.processor.internals.ProcessorStateManager. > register(ProcessorStateManager.java:188) > >> at > >> org.apache.kafka.streams.processor.internals.AbstractProcessorContext. > register(AbstractProcessorContext.java:99) > > > > > > I found this thread .. > > https://stackoverflow.com/questions/42329387/failed-to- > rebalance-error-in-kafka-streams-with-more-than-one-topic-partition > > but unlike this use case I don't make any change in the partition of any > > topic in between the restarts. BTW my application uses stateful streaming > > and hence Kafka creates any internal topics. Not sure if it's related to > > this exception though. But the store name mentioned in the exception > > (log-count) is one for stateful streaming. > > > > regards. > > > > On Wed, Jul 26, 2017 at 2:20 PM, Damian Guy <damian....@gmail.com> > wrote: > > > >> Hi Debasish, > >> > >> It might be that it is blocked in `streams.close()` > >> You might want to to try the overload that has a long and TimeUnit as > >> params, i.e., `streams.close(1, TimeUnit.MINUTES)` > >> > >> Thanks, > >> Damian > >> > >> On Wed, 26 Jul 2017 at 09:11 Debasish Ghosh <ghosh.debas...@gmail.com> > >> wrote: > >> > >>> Hi - > >>> > >>> I have a Kafka streams application deployed on a Mesos DC/OS cluster. > >>> While > >>> the application was running, Kafka suddenly reported to be unhealthy > and > >>> the application got an exception .. > >>> > >>> 07:45:16.606 TKD [StreamThread-1] ERROR > >>> c.l.f.s.kstream.WeblogProcessing$ - > >>> > Stream terminated because of uncaught exception .. Shutting down app > >>> > org.apache.kafka.streams.errors.StreamsException: task [1_0] > exception > >>> > caught when producing > >>> > at > >>> > > >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:121) > >>> > at > >>> > > >>> org.apache.kafka.streams.processor.internals. > RecordCollectorImpl.flush(RecordCollectorImpl.java:129) > >>> > at > >>> > > >>> org.apache.kafka.streams.processor.internals. > StreamTask$1.run(StreamTask.java:76) > >>> > at > >>> > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:188) > >>> > at > >>> > > >>> org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask.java:280) > >>> > at > >>> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne( > StreamThread.java:807) > >>> > at > >>> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll( > StreamThread.java:794) > >>> > at > >>> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit( > StreamThread.java:769) > >>> > at > >>> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:647) > >>> > at > >>> > > >>> org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:361) > >>> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring > >>> 205 > >>> > record(s) for > >>> > kstream-log-processing-windowed-access-count-per-host-repartition-0: > >>> 30020 > >>> > ms has passed since last attempt plus backoff time > >>> > 07:45:16.606 TKD [StreamThread-1] ERROR > >>> c.l.f.s.kstream.WeblogProcessing$ > >>> > - Stopping http service .. > >>> > 07:45:16.606 TKD [StreamThread-1] INFO > >>> > c.l.f.s.k.http.WeblogDSLHttpService - Stopping the http server > >>> > 07:45:16.607 TKD [StreamThread-1] ERROR > >>> c.l.f.s.kstream.WeblogProcessing$ > >>> > - Stopping streams service .. > >>> > 07:45:16.608 TKD [StreamThread-1] INFO > >>> > o.apache.kafka.streams.KafkaStreams - stream-client > >>> > [kstream-log-processing-39b51b2b-e8da-4db8-b782-bec4fb030999] State > >>> > transition from RUNNING to PENDING_SHUTDOWN. > >>> > 07:45:16.608 TKD [kafka-streams-close-thread] INFO > >>> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] > >>> Informed > >>> > thread to shut down > >>> > 07:45:16.609 TKD [kafka-streams-close-thread] WARN > >>> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] > >>> > Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN. > >>> > 07:45:16.627 TKD [default-akka.kafka.default-dispatcher-43] INFO > >>> > o.a.k.clients.producer.KafkaProducer - Closing the Kafka producer > with > >>> > timeoutMillis = 60000 ms. > >>> > >>> > >>> The streams application stopped and I have the following exception > >>> handler > >>> registered .. > >>> > >>> // need to exit for any stream exception > >>> // mesos will restart the application > >>> streams.setUncaughtExceptionHandler(new > >>> Thread.UncaughtExceptionHandler() { > >>> override def uncaughtException(t: Thread, e: Throwable): Unit = > >>> try { > >>> logger.error(s"Stream terminated because of uncaught exception > .. > >>> Shutting down app", e) > >>> logger.error(s"Stopping http service ..") > >>> restService.stop() > >>> logger.error(s"Stopping streams service ..") > >>> streams.close() > >>> } catch { > >>> case _: Exception => > >>> } finally { > >>> System.exit(-1) > >>> } > >>> }) > >>> > >>> Ideally the application should terminate and Mesos should have > restarted > >>> it. But I see that the application doesn't terminate though I have a > >>> System.exit(-1) in the finally clause. Any idea what's happening or how > >>> can > >>> I make the application terminate .. > >>> > >>> Any help will be appreciated .. > >>> > >>> regards. > >>> > >>> > >>> -- > >>> Debasish Ghosh > >>> http://manning.com/ghosh2 > >>> http://manning.com/ghosh > >>> > >>> Twttr: @debasishg > >>> Blog: http://debasishg.blogspot.com > >>> Code: http://github.com/debasishg > >>> > >> > > > > > > -- > > Debasish Ghosh > > http://manning.com/ghosh2 > > http://manning.com/ghosh > > > > Twttr: @debasishg > > Blog: http://debasishg.blogspot.com > > Code: http://github.com/debasishg > > > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg