One of the brokers died. The good thing is that it's not a production
cluster, it's just a demo cluster. I have no replicas. But I can knock off
the current Kafka instance and have a new one.

Just for my understanding, if I don't have a replica, how should such
situations be handled ? And if I have replicas, is there any documentation
that discusses how the leader for the partition will be decided in such
situations, so that I can take care of things when I move to production.

regards.

On Wed, Jul 26, 2017 at 7:51 PM, Damian Guy <damian....@gmail.com> wrote:

> Hi,
>
> It looks to me that there is currently no leader for the partition, i.e.,
> leader -1. Also there are no replicas? Something up with your brokers?
>
> Thanks,
> Damian
>
> On Wed, 26 Jul 2017 at 12:34 Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> Hi Damian -
>>
>> Yes, it exists .. It's actually a change log topic corresponding to the
>> state store log-count
>>
>> $ dcos confluent-kafka topic describe kstream-log-count-log-counts-
>> changelog
>> {
>>   "partitions": [
>>     {
>>       "0": {
>>         "leader": -1,
>>         "controller_epoch": 3,
>>         "isr": [],
>>         "leader_epoch": 3,
>>         "version": 1
>>       }
>>     }
>>   ]
>> }
>>
>> Also 1 point to note is that when Mesos restarts the process it starts in
>> a
>> different node. So the local state store will not exist there. But I
>> expect
>> Kafka will create it from the corresponding backed up topic. Hence the
>> exception looks a bit confusing to me.
>>
>> Thoughts ?
>>
>> regards.
>>
>> On Wed, Jul 26, 2017 at 3:43 PM, Damian Guy <damian....@gmail.com> wrote:
>>
>> > The exception indicates that streams was unable to find that
>> > topic-partition on the kafka brokers. Can you verify that it exists?
>> > Also, i'm assuming you are on 0.10.2.x?
>> >
>> > On Wed, 26 Jul 2017 at 10:54 Debasish Ghosh <ghosh.debas...@gmail.com>
>> > wrote:
>> >
>> > > Thanks Damien .. this worked. But now after the application restarts,
>> I
>> > > see the following exception ..
>> > >
>> > > 09:41:26.516 TKD [StreamThread-1] ERROR
>> > >> c.l.fdp.sample.kstream.WeblogDriver$ - Stream terminated because of
>> > >> uncaught exception .. Shutting down app
>> > >> org.apache.kafka.streams.errors.StreamsException: stream-thread
>> > >> [StreamThread-1] Failed to rebalance
>> > >>         at
>> > >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> > StreamThread.java:598)
>> > >>         at
>> > >> org.apache.kafka.streams.processor.internals.
>> > StreamThread.run(StreamThread.java:361)
>> > >> Caused by: org.apache.kafka.streams.errors.StreamsException: task
>> [0_0]
>> > >> Store log-counts's change log (kstream-log-count-log-counts-
>> changelog)
>> > does
>> > >> not contain partition 0
>> > >>         at
>> > >> org.apache.kafka.streams.processor.internals.ProcessorStateManager.
>> > register(ProcessorStateManager.java:188)
>> > >>         at
>> > >> org.apache.kafka.streams.processor.internals.
>> AbstractProcessorContext.
>> > register(AbstractProcessorContext.java:99)
>> > >
>> > >
>> > > I found this thread ..
>> > > https://stackoverflow.com/questions/42329387/failed-to-
>> > rebalance-error-in-kafka-streams-with-more-than-one-topic-partition
>> > > but unlike this use case I don't make any change in the partition of
>> any
>> > > topic in between the restarts. BTW my application uses stateful
>> streaming
>> > > and hence Kafka creates any internal topics. Not sure if it's related
>> to
>> > > this exception though. But the store name mentioned in the exception
>> > > (log-count) is one for stateful streaming.
>> > >
>> > > regards.
>> > >
>> > > On Wed, Jul 26, 2017 at 2:20 PM, Damian Guy <damian....@gmail.com>
>> > wrote:
>> > >
>> > >> Hi Debasish,
>> > >>
>> > >> It might be that it is blocked in `streams.close()`
>> > >> You might want to to try the overload that has a long and TimeUnit as
>> > >> params, i.e., `streams.close(1, TimeUnit.MINUTES)`
>> > >>
>> > >> Thanks,
>> > >> Damian
>> > >>
>> > >> On Wed, 26 Jul 2017 at 09:11 Debasish Ghosh <
>> ghosh.debas...@gmail.com>
>> > >> wrote:
>> > >>
>> > >>> Hi -
>> > >>>
>> > >>> I have a Kafka streams application deployed on a Mesos DC/OS
>> cluster.
>> > >>> While
>> > >>> the application was running, Kafka suddenly reported to be unhealthy
>> > and
>> > >>> the application got an exception ..
>> > >>>
>> > >>> 07:45:16.606 TKD [StreamThread-1] ERROR
>> > >>> c.l.f.s.kstream.WeblogProcessing$ -
>> > >>> > Stream terminated because of uncaught exception .. Shutting down
>> app
>> > >>> > org.apache.kafka.streams.errors.StreamsException: task [1_0]
>> > exception
>> > >>> > caught when producing
>> > >>> >         at
>> > >>> >
>> > >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>> > checkForException(RecordCollectorImpl.java:121)
>> > >>> >         at
>> > >>> >
>> > >>> org.apache.kafka.streams.processor.internals.
>> > RecordCollectorImpl.flush(RecordCollectorImpl.java:129)
>> > >>> >         at
>> > >>> >
>> > >>> org.apache.kafka.streams.processor.internals.
>> > StreamTask$1.run(StreamTask.java:76)
>> > >>> >         at
>> > >>> >
>> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> > measureLatencyNs(StreamsMetricsImpl.java:188)
>> > >>> >         at
>> > >>> >
>> > >>> org.apache.kafka.streams.processor.internals.
>> > StreamTask.commit(StreamTask.java:280)
>> > >>> >         at
>> > >>> >
>> > >>> org.apache.kafka.streams.processor.internals.
>> StreamThread.commitOne(
>> > StreamThread.java:807)
>> > >>> >         at
>> > >>> >
>> > >>> org.apache.kafka.streams.processor.internals.
>> StreamThread.commitAll(
>> > StreamThread.java:794)
>> > >>> >         at
>> > >>> >
>> > >>> org.apache.kafka.streams.processor.internals.
>> StreamThread.maybeCommit(
>> > StreamThread.java:769)
>> > >>> >         at
>> > >>> >
>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> > StreamThread.java:647)
>> > >>> >         at
>> > >>> >
>> > >>> org.apache.kafka.streams.processor.internals.
>> > StreamThread.run(StreamThread.java:361)
>> > >>> > Caused by: org.apache.kafka.common.errors.TimeoutException:
>> Expiring
>> > >>> 205
>> > >>> > record(s) for
>> > >>> > kstream-log-processing-windowed-access-count-per-
>> host-repartition-0:
>> > >>> 30020
>> > >>> > ms has passed since last attempt plus backoff time
>> > >>> > 07:45:16.606 TKD [StreamThread-1] ERROR
>> > >>> c.l.f.s.kstream.WeblogProcessing$
>> > >>> > - Stopping http service ..
>> > >>> > 07:45:16.606 TKD [StreamThread-1] INFO
>> > >>> >  c.l.f.s.k.http.WeblogDSLHttpService - Stopping the http server
>> > >>> > 07:45:16.607 TKD [StreamThread-1] ERROR
>> > >>> c.l.f.s.kstream.WeblogProcessing$
>> > >>> > - Stopping streams service ..
>> > >>> > 07:45:16.608 TKD [StreamThread-1] INFO
>> > >>> >  o.apache.kafka.streams.KafkaStreams - stream-client
>> > >>> > [kstream-log-processing-39b51b2b-e8da-4db8-b782-bec4fb030999]
>> State
>> > >>> > transition from RUNNING to PENDING_SHUTDOWN.
>> > >>> > 07:45:16.608 TKD [kafka-streams-close-thread] INFO
>> > >>> >  o.a.k.s.p.internals.StreamThread - stream-thread
>> [StreamThread-1]
>> > >>> Informed
>> > >>> > thread to shut down
>> > >>> > 07:45:16.609 TKD [kafka-streams-close-thread] WARN
>> > >>> >  o.a.k.s.p.internals.StreamThread - stream-thread
>> [StreamThread-1]
>> > >>> > Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN.
>> > >>> > 07:45:16.627 TKD [default-akka.kafka.default-dispatcher-43] INFO
>> > >>> >  o.a.k.clients.producer.KafkaProducer - Closing the Kafka
>> producer
>> > with
>> > >>> > timeoutMillis = 60000 ms.
>> > >>>
>> > >>>
>> > >>> The streams application stopped and I have the following exception
>> > >>> handler
>> > >>> registered ..
>> > >>>
>> > >>>     // need to exit for any stream exception
>> > >>>     // mesos will restart the application
>> > >>>     streams.setUncaughtExceptionHandler(new
>> > >>> Thread.UncaughtExceptionHandler() {
>> > >>>       override def uncaughtException(t: Thread, e: Throwable): Unit
>> =
>> > >>> try {
>> > >>>         logger.error(s"Stream terminated because of uncaught
>> exception
>> > ..
>> > >>> Shutting down app", e)
>> > >>>         logger.error(s"Stopping http service ..")
>> > >>>         restService.stop()
>> > >>>         logger.error(s"Stopping streams service ..")
>> > >>>         streams.close()
>> > >>>       } catch {
>> > >>>         case _: Exception =>
>> > >>>       } finally {
>> > >>>         System.exit(-1)
>> > >>>       }
>> > >>>     })
>> > >>>
>> > >>> Ideally the application should terminate and Mesos should have
>> > restarted
>> > >>> it. But I see that the application doesn't terminate though I have a
>> > >>> System.exit(-1) in the finally clause. Any idea what's happening or
>> how
>> > >>> can
>> > >>> I make the application terminate ..
>> > >>>
>> > >>> Any help will be appreciated ..
>> > >>>
>> > >>> regards.
>> > >>>
>> > >>>
>> > >>> --
>> > >>> Debasish Ghosh
>> > >>> http://manning.com/ghosh2
>> > >>> http://manning.com/ghosh
>> > >>>
>> > >>> Twttr: @debasishg
>> > >>> Blog: http://debasishg.blogspot.com
>> > >>> Code: http://github.com/debasishg
>> > >>>
>> > >>
>> > >
>> > >
>> > > --
>> > > Debasish Ghosh
>> > > http://manning.com/ghosh2
>> > > http://manning.com/ghosh
>> > >
>> > > Twttr: @debasishg
>> > > Blog: http://debasishg.blogspot.com
>> > > Code: http://github.com/debasishg
>> > >
>> >
>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to