Thanks! On Thu, Jul 27, 2017 at 4:12 PM, Damian Guy <damian....@gmail.com> wrote:
> > On Wed, 26 Jul 2017 at 15:53 Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> One of the brokers died. The good thing is that it's not a production >> cluster, it's just a demo cluster. I have no replicas. But I can knock off >> the current Kafka instance and have a new one. >> >> > That explains it. > > >> Just for my understanding, if I don't have a replica, how should such >> situations be handled ? And if I have replicas, is there any documentation >> that discusses how the leader for the partition will be decided in such >> situations, so that I can take care of things when I move to production. >> >> > If you don't have any replicas, and the broker with that partition goes > offline, then you won't be able to access that partition until the broker > comes back online. There are some docs on replication here: > https://kafka.apache.org/documentation/#replication > > Thanks, > Damian > > >> regards. >> >> On Wed, Jul 26, 2017 at 7:51 PM, Damian Guy <damian....@gmail.com> wrote: >> >>> Hi, >>> >>> It looks to me that there is currently no leader for the partition, >>> i.e., leader -1. Also there are no replicas? Something up with your brokers? >>> >>> Thanks, >>> Damian >>> >>> On Wed, 26 Jul 2017 at 12:34 Debasish Ghosh <ghosh.debas...@gmail.com> >>> wrote: >>> >>>> Hi Damian - >>>> >>>> Yes, it exists .. It's actually a change log topic corresponding to the >>>> state store log-count >>>> >>>> $ dcos confluent-kafka topic describe kstream-log-count-log-counts- >>>> changelog >>>> { >>>> "partitions": [ >>>> { >>>> "0": { >>>> "leader": -1, >>>> "controller_epoch": 3, >>>> "isr": [], >>>> "leader_epoch": 3, >>>> "version": 1 >>>> } >>>> } >>>> ] >>>> } >>>> >>>> Also 1 point to note is that when Mesos restarts the process it starts >>>> in a >>>> different node. So the local state store will not exist there. But I >>>> expect >>>> Kafka will create it from the corresponding backed up topic. Hence the >>>> exception looks a bit confusing to me. >>>> >>>> Thoughts ? >>>> >>>> regards. >>>> >>>> On Wed, Jul 26, 2017 at 3:43 PM, Damian Guy <damian....@gmail.com> >>>> wrote: >>>> >>>> > The exception indicates that streams was unable to find that >>>> > topic-partition on the kafka brokers. Can you verify that it exists? >>>> > Also, i'm assuming you are on 0.10.2.x? >>>> > >>>> > On Wed, 26 Jul 2017 at 10:54 Debasish Ghosh <ghosh.debas...@gmail.com >>>> > >>>> > wrote: >>>> > >>>> > > Thanks Damien .. this worked. But now after the application >>>> restarts, I >>>> > > see the following exception .. >>>> > > >>>> > > 09:41:26.516 TKD [StreamThread-1] ERROR >>>> > >> c.l.fdp.sample.kstream.WeblogDriver$ - Stream terminated because >>>> of >>>> > >> uncaught exception .. Shutting down app >>>> > >> org.apache.kafka.streams.errors.StreamsException: stream-thread >>>> > >> [StreamThread-1] Failed to rebalance >>>> > >> at >>>> > >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >>>> > StreamThread.java:598) >>>> > >> at >>>> > >> org.apache.kafka.streams.processor.internals. >>>> > StreamThread.run(StreamThread.java:361) >>>> > >> Caused by: org.apache.kafka.streams.errors.StreamsException: task >>>> [0_0] >>>> > >> Store log-counts's change log (kstream-log-count-log-counts- >>>> changelog) >>>> > does >>>> > >> not contain partition 0 >>>> > >> at >>>> > >> org.apache.kafka.streams.processor.internals. >>>> ProcessorStateManager. >>>> > register(ProcessorStateManager.java:188) >>>> > >> at >>>> > >> org.apache.kafka.streams.processor.internals. >>>> AbstractProcessorContext. >>>> > register(AbstractProcessorContext.java:99) >>>> > > >>>> > > >>>> > > I found this thread .. >>>> > > https://stackoverflow.com/questions/42329387/failed-to- >>>> > rebalance-error-in-kafka-streams-with-more-than-one-topic-partition >>>> > > but unlike this use case I don't make any change in the partition >>>> of any >>>> > > topic in between the restarts. BTW my application uses stateful >>>> streaming >>>> > > and hence Kafka creates any internal topics. Not sure if it's >>>> related to >>>> > > this exception though. But the store name mentioned in the exception >>>> > > (log-count) is one for stateful streaming. >>>> > > >>>> > > regards. >>>> > > >>>> > > On Wed, Jul 26, 2017 at 2:20 PM, Damian Guy <damian....@gmail.com> >>>> > wrote: >>>> > > >>>> > >> Hi Debasish, >>>> > >> >>>> > >> It might be that it is blocked in `streams.close()` >>>> > >> You might want to to try the overload that has a long and TimeUnit >>>> as >>>> > >> params, i.e., `streams.close(1, TimeUnit.MINUTES)` >>>> > >> >>>> > >> Thanks, >>>> > >> Damian >>>> > >> >>>> > >> On Wed, 26 Jul 2017 at 09:11 Debasish Ghosh < >>>> ghosh.debas...@gmail.com> >>>> > >> wrote: >>>> > >> >>>> > >>> Hi - >>>> > >>> >>>> > >>> I have a Kafka streams application deployed on a Mesos DC/OS >>>> cluster. >>>> > >>> While >>>> > >>> the application was running, Kafka suddenly reported to be >>>> unhealthy >>>> > and >>>> > >>> the application got an exception .. >>>> > >>> >>>> > >>> 07:45:16.606 TKD [StreamThread-1] ERROR >>>> > >>> c.l.f.s.kstream.WeblogProcessing$ - >>>> > >>> > Stream terminated because of uncaught exception .. Shutting >>>> down app >>>> > >>> > org.apache.kafka.streams.errors.StreamsException: task [1_0] >>>> > exception >>>> > >>> > caught when producing >>>> > >>> > at >>>> > >>> > >>>> > >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl. >>>> > checkForException(RecordCollectorImpl.java:121) >>>> > >>> > at >>>> > >>> > >>>> > >>> org.apache.kafka.streams.processor.internals. >>>> > RecordCollectorImpl.flush(RecordCollectorImpl.java:129) >>>> > >>> > at >>>> > >>> > >>>> > >>> org.apache.kafka.streams.processor.internals. >>>> > StreamTask$1.run(StreamTask.java:76) >>>> > >>> > at >>>> > >>> > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >>>> > measureLatencyNs(StreamsMetricsImpl.java:188) >>>> > >>> > at >>>> > >>> > >>>> > >>> org.apache.kafka.streams.processor.internals. >>>> > StreamTask.commit(StreamTask.java:280) >>>> > >>> > at >>>> > >>> > >>>> > >>> org.apache.kafka.streams.processor.internals. >>>> StreamThread.commitOne( >>>> > StreamThread.java:807) >>>> > >>> > at >>>> > >>> > >>>> > >>> org.apache.kafka.streams.processor.internals. >>>> StreamThread.commitAll( >>>> > StreamThread.java:794) >>>> > >>> > at >>>> > >>> > >>>> > >>> org.apache.kafka.streams.processor.internals. >>>> StreamThread.maybeCommit( >>>> > StreamThread.java:769) >>>> > >>> > at >>>> > >>> > >>>> > >>> org.apache.kafka.streams.processor.internals. >>>> StreamThread.runLoop( >>>> > StreamThread.java:647) >>>> > >>> > at >>>> > >>> > >>>> > >>> org.apache.kafka.streams.processor.internals. >>>> > StreamThread.run(StreamThread.java:361) >>>> > >>> > Caused by: org.apache.kafka.common.errors.TimeoutException: >>>> Expiring >>>> > >>> 205 >>>> > >>> > record(s) for >>>> > >>> > kstream-log-processing-windowed-access-count-per- >>>> host-repartition-0: >>>> > >>> 30020 >>>> > >>> > ms has passed since last attempt plus backoff time >>>> > >>> > 07:45:16.606 TKD [StreamThread-1] ERROR >>>> > >>> c.l.f.s.kstream.WeblogProcessing$ >>>> > >>> > - Stopping http service .. >>>> > >>> > 07:45:16.606 TKD [StreamThread-1] INFO >>>> > >>> > c.l.f.s.k.http.WeblogDSLHttpService - Stopping the http server >>>> > >>> > 07:45:16.607 TKD [StreamThread-1] ERROR >>>> > >>> c.l.f.s.kstream.WeblogProcessing$ >>>> > >>> > - Stopping streams service .. >>>> > >>> > 07:45:16.608 TKD [StreamThread-1] INFO >>>> > >>> > o.apache.kafka.streams.KafkaStreams - stream-client >>>> > >>> > [kstream-log-processing-39b51b2b-e8da-4db8-b782-bec4fb030999] >>>> State >>>> > >>> > transition from RUNNING to PENDING_SHUTDOWN. >>>> > >>> > 07:45:16.608 TKD [kafka-streams-close-thread] INFO >>>> > >>> > o.a.k.s.p.internals.StreamThread - stream-thread >>>> [StreamThread-1] >>>> > >>> Informed >>>> > >>> > thread to shut down >>>> > >>> > 07:45:16.609 TKD [kafka-streams-close-thread] WARN >>>> > >>> > o.a.k.s.p.internals.StreamThread - stream-thread >>>> [StreamThread-1] >>>> > >>> > Unexpected state transition from NOT_RUNNING to >>>> PENDING_SHUTDOWN. >>>> > >>> > 07:45:16.627 TKD [default-akka.kafka.default-dispatcher-43] >>>> INFO >>>> > >>> > o.a.k.clients.producer.KafkaProducer - Closing the Kafka >>>> producer >>>> > with >>>> > >>> > timeoutMillis = 60000 ms. >>>> > >>> >>>> > >>> >>>> > >>> The streams application stopped and I have the following exception >>>> > >>> handler >>>> > >>> registered .. >>>> > >>> >>>> > >>> // need to exit for any stream exception >>>> > >>> // mesos will restart the application >>>> > >>> streams.setUncaughtExceptionHandler(new >>>> > >>> Thread.UncaughtExceptionHandler() { >>>> > >>> override def uncaughtException(t: Thread, e: Throwable): >>>> Unit = >>>> > >>> try { >>>> > >>> logger.error(s"Stream terminated because of uncaught >>>> exception >>>> > .. >>>> > >>> Shutting down app", e) >>>> > >>> logger.error(s"Stopping http service ..") >>>> > >>> restService.stop() >>>> > >>> logger.error(s"Stopping streams service ..") >>>> > >>> streams.close() >>>> > >>> } catch { >>>> > >>> case _: Exception => >>>> > >>> } finally { >>>> > >>> System.exit(-1) >>>> > >>> } >>>> > >>> }) >>>> > >>> >>>> > >>> Ideally the application should terminate and Mesos should have >>>> > restarted >>>> > >>> it. But I see that the application doesn't terminate though I >>>> have a >>>> > >>> System.exit(-1) in the finally clause. Any idea what's happening >>>> or how >>>> > >>> can >>>> > >>> I make the application terminate .. >>>> > >>> >>>> > >>> Any help will be appreciated .. >>>> > >>> >>>> > >>> regards. >>>> > >>> >>>> > >>> >>>> > >>> -- >>>> > >>> Debasish Ghosh >>>> > >>> http://manning.com/ghosh2 >>>> > >>> http://manning.com/ghosh >>>> > >>> >>>> > >>> Twttr: @debasishg >>>> > >>> Blog: http://debasishg.blogspot.com >>>> > >>> Code: http://github.com/debasishg >>>> > >>> >>>> > >> >>>> > > >>>> > > >>>> > > -- >>>> > > Debasish Ghosh >>>> > > http://manning.com/ghosh2 >>>> > > http://manning.com/ghosh >>>> > > >>>> > > Twttr: @debasishg >>>> > > Blog: http://debasishg.blogspot.com >>>> > > Code: http://github.com/debasishg >>>> > > >>>> > >>>> >>>> >>>> >>>> -- >>>> Debasish Ghosh >>>> http://manning.com/ghosh2 >>>> http://manning.com/ghosh >>>> >>>> Twttr: @debasishg >>>> Blog: http://debasishg.blogspot.com >>>> Code: http://github.com/debasishg >>>> >>> >> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg