One of the brokers died. The good thing is that it's not a production cluster, it's just a demo cluster. I have no replicas. But I can knock off the current Kafka instance and have a new one.
Just for my understanding, if I don't have a replica, how should such situations be handled ? And if I have replicas, is there any documentation that discusses how the leader for the partition will be decided in such situations, so that I can take care of things when I move to production. regards. On Wed, Jul 26, 2017 at 7:51 PM, Damian Guy <damian....@gmail.com> wrote: > Hi, > > It looks to me that there is currently no leader for the partition, i.e., > leader -1. Also there are no replicas? Something up with your brokers? > > Thanks, > Damian > > On Wed, 26 Jul 2017 at 12:34 Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> Hi Damian - >> >> Yes, it exists .. It's actually a change log topic corresponding to the >> state store log-count >> >> $ dcos confluent-kafka topic describe kstream-log-count-log-counts- >> changelog >> { >> "partitions": [ >> { >> "0": { >> "leader": -1, >> "controller_epoch": 3, >> "isr": [], >> "leader_epoch": 3, >> "version": 1 >> } >> } >> ] >> } >> >> Also 1 point to note is that when Mesos restarts the process it starts in >> a >> different node. So the local state store will not exist there. But I >> expect >> Kafka will create it from the corresponding backed up topic. Hence the >> exception looks a bit confusing to me. >> >> Thoughts ? >> >> regards. >> >> On Wed, Jul 26, 2017 at 3:43 PM, Damian Guy <damian....@gmail.com> wrote: >> >> > The exception indicates that streams was unable to find that >> > topic-partition on the kafka brokers. Can you verify that it exists? >> > Also, i'm assuming you are on 0.10.2.x? >> > >> > On Wed, 26 Jul 2017 at 10:54 Debasish Ghosh <ghosh.debas...@gmail.com> >> > wrote: >> > >> > > Thanks Damien .. this worked. But now after the application restarts, >> I >> > > see the following exception .. >> > > >> > > 09:41:26.516 TKD [StreamThread-1] ERROR >> > >> c.l.fdp.sample.kstream.WeblogDriver$ - Stream terminated because of >> > >> uncaught exception .. Shutting down app >> > >> org.apache.kafka.streams.errors.StreamsException: stream-thread >> > >> [StreamThread-1] Failed to rebalance >> > >> at >> > >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> > StreamThread.java:598) >> > >> at >> > >> org.apache.kafka.streams.processor.internals. >> > StreamThread.run(StreamThread.java:361) >> > >> Caused by: org.apache.kafka.streams.errors.StreamsException: task >> [0_0] >> > >> Store log-counts's change log (kstream-log-count-log-counts- >> changelog) >> > does >> > >> not contain partition 0 >> > >> at >> > >> org.apache.kafka.streams.processor.internals.ProcessorStateManager. >> > register(ProcessorStateManager.java:188) >> > >> at >> > >> org.apache.kafka.streams.processor.internals. >> AbstractProcessorContext. >> > register(AbstractProcessorContext.java:99) >> > > >> > > >> > > I found this thread .. >> > > https://stackoverflow.com/questions/42329387/failed-to- >> > rebalance-error-in-kafka-streams-with-more-than-one-topic-partition >> > > but unlike this use case I don't make any change in the partition of >> any >> > > topic in between the restarts. BTW my application uses stateful >> streaming >> > > and hence Kafka creates any internal topics. Not sure if it's related >> to >> > > this exception though. But the store name mentioned in the exception >> > > (log-count) is one for stateful streaming. >> > > >> > > regards. >> > > >> > > On Wed, Jul 26, 2017 at 2:20 PM, Damian Guy <damian....@gmail.com> >> > wrote: >> > > >> > >> Hi Debasish, >> > >> >> > >> It might be that it is blocked in `streams.close()` >> > >> You might want to to try the overload that has a long and TimeUnit as >> > >> params, i.e., `streams.close(1, TimeUnit.MINUTES)` >> > >> >> > >> Thanks, >> > >> Damian >> > >> >> > >> On Wed, 26 Jul 2017 at 09:11 Debasish Ghosh < >> ghosh.debas...@gmail.com> >> > >> wrote: >> > >> >> > >>> Hi - >> > >>> >> > >>> I have a Kafka streams application deployed on a Mesos DC/OS >> cluster. >> > >>> While >> > >>> the application was running, Kafka suddenly reported to be unhealthy >> > and >> > >>> the application got an exception .. >> > >>> >> > >>> 07:45:16.606 TKD [StreamThread-1] ERROR >> > >>> c.l.f.s.kstream.WeblogProcessing$ - >> > >>> > Stream terminated because of uncaught exception .. Shutting down >> app >> > >>> > org.apache.kafka.streams.errors.StreamsException: task [1_0] >> > exception >> > >>> > caught when producing >> > >>> > at >> > >>> > >> > >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl. >> > checkForException(RecordCollectorImpl.java:121) >> > >>> > at >> > >>> > >> > >>> org.apache.kafka.streams.processor.internals. >> > RecordCollectorImpl.flush(RecordCollectorImpl.java:129) >> > >>> > at >> > >>> > >> > >>> org.apache.kafka.streams.processor.internals. >> > StreamTask$1.run(StreamTask.java:76) >> > >>> > at >> > >>> > >> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >> > measureLatencyNs(StreamsMetricsImpl.java:188) >> > >>> > at >> > >>> > >> > >>> org.apache.kafka.streams.processor.internals. >> > StreamTask.commit(StreamTask.java:280) >> > >>> > at >> > >>> > >> > >>> org.apache.kafka.streams.processor.internals. >> StreamThread.commitOne( >> > StreamThread.java:807) >> > >>> > at >> > >>> > >> > >>> org.apache.kafka.streams.processor.internals. >> StreamThread.commitAll( >> > StreamThread.java:794) >> > >>> > at >> > >>> > >> > >>> org.apache.kafka.streams.processor.internals. >> StreamThread.maybeCommit( >> > StreamThread.java:769) >> > >>> > at >> > >>> > >> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> > StreamThread.java:647) >> > >>> > at >> > >>> > >> > >>> org.apache.kafka.streams.processor.internals. >> > StreamThread.run(StreamThread.java:361) >> > >>> > Caused by: org.apache.kafka.common.errors.TimeoutException: >> Expiring >> > >>> 205 >> > >>> > record(s) for >> > >>> > kstream-log-processing-windowed-access-count-per- >> host-repartition-0: >> > >>> 30020 >> > >>> > ms has passed since last attempt plus backoff time >> > >>> > 07:45:16.606 TKD [StreamThread-1] ERROR >> > >>> c.l.f.s.kstream.WeblogProcessing$ >> > >>> > - Stopping http service .. >> > >>> > 07:45:16.606 TKD [StreamThread-1] INFO >> > >>> > c.l.f.s.k.http.WeblogDSLHttpService - Stopping the http server >> > >>> > 07:45:16.607 TKD [StreamThread-1] ERROR >> > >>> c.l.f.s.kstream.WeblogProcessing$ >> > >>> > - Stopping streams service .. >> > >>> > 07:45:16.608 TKD [StreamThread-1] INFO >> > >>> > o.apache.kafka.streams.KafkaStreams - stream-client >> > >>> > [kstream-log-processing-39b51b2b-e8da-4db8-b782-bec4fb030999] >> State >> > >>> > transition from RUNNING to PENDING_SHUTDOWN. >> > >>> > 07:45:16.608 TKD [kafka-streams-close-thread] INFO >> > >>> > o.a.k.s.p.internals.StreamThread - stream-thread >> [StreamThread-1] >> > >>> Informed >> > >>> > thread to shut down >> > >>> > 07:45:16.609 TKD [kafka-streams-close-thread] WARN >> > >>> > o.a.k.s.p.internals.StreamThread - stream-thread >> [StreamThread-1] >> > >>> > Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN. >> > >>> > 07:45:16.627 TKD [default-akka.kafka.default-dispatcher-43] INFO >> > >>> > o.a.k.clients.producer.KafkaProducer - Closing the Kafka >> producer >> > with >> > >>> > timeoutMillis = 60000 ms. >> > >>> >> > >>> >> > >>> The streams application stopped and I have the following exception >> > >>> handler >> > >>> registered .. >> > >>> >> > >>> // need to exit for any stream exception >> > >>> // mesos will restart the application >> > >>> streams.setUncaughtExceptionHandler(new >> > >>> Thread.UncaughtExceptionHandler() { >> > >>> override def uncaughtException(t: Thread, e: Throwable): Unit >> = >> > >>> try { >> > >>> logger.error(s"Stream terminated because of uncaught >> exception >> > .. >> > >>> Shutting down app", e) >> > >>> logger.error(s"Stopping http service ..") >> > >>> restService.stop() >> > >>> logger.error(s"Stopping streams service ..") >> > >>> streams.close() >> > >>> } catch { >> > >>> case _: Exception => >> > >>> } finally { >> > >>> System.exit(-1) >> > >>> } >> > >>> }) >> > >>> >> > >>> Ideally the application should terminate and Mesos should have >> > restarted >> > >>> it. But I see that the application doesn't terminate though I have a >> > >>> System.exit(-1) in the finally clause. Any idea what's happening or >> how >> > >>> can >> > >>> I make the application terminate .. >> > >>> >> > >>> Any help will be appreciated .. >> > >>> >> > >>> regards. >> > >>> >> > >>> >> > >>> -- >> > >>> Debasish Ghosh >> > >>> http://manning.com/ghosh2 >> > >>> http://manning.com/ghosh >> > >>> >> > >>> Twttr: @debasishg >> > >>> Blog: http://debasishg.blogspot.com >> > >>> Code: http://github.com/debasishg >> > >>> >> > >> >> > > >> > > >> > > -- >> > > Debasish Ghosh >> > > http://manning.com/ghosh2 >> > > http://manning.com/ghosh >> > > >> > > Twttr: @debasishg >> > > Blog: http://debasishg.blogspot.com >> > > Code: http://github.com/debasishg >> > > >> > >> >> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg