Thanks!

On Thu, Jul 27, 2017 at 4:12 PM, Damian Guy <damian....@gmail.com> wrote:

>
> On Wed, 26 Jul 2017 at 15:53 Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> One of the brokers died. The good thing is that it's not a production
>> cluster, it's just a demo cluster. I have no replicas. But I can knock off
>> the current Kafka instance and have a new one.
>>
>>
> That explains it.
>
>
>> Just for my understanding, if I don't have a replica, how should such
>> situations be handled ? And if I have replicas, is there any documentation
>> that discusses how the leader for the partition will be decided in such
>> situations, so that I can take care of things when I move to production.
>>
>>
> If you don't have any replicas, and the broker with that partition goes
> offline, then you won't be able to access that partition until the broker
> comes back online. There are some docs on replication here:
> https://kafka.apache.org/documentation/#replication
>
> Thanks,
> Damian
>
>
>> regards.
>>
>> On Wed, Jul 26, 2017 at 7:51 PM, Damian Guy <damian....@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> It looks to me that there is currently no leader for the partition,
>>> i.e., leader -1. Also there are no replicas? Something up with your brokers?
>>>
>>> Thanks,
>>> Damian
>>>
>>> On Wed, 26 Jul 2017 at 12:34 Debasish Ghosh <ghosh.debas...@gmail.com>
>>> wrote:
>>>
>>>> Hi Damian -
>>>>
>>>> Yes, it exists .. It's actually a change log topic corresponding to the
>>>> state store log-count
>>>>
>>>> $ dcos confluent-kafka topic describe kstream-log-count-log-counts-
>>>> changelog
>>>> {
>>>>   "partitions": [
>>>>     {
>>>>       "0": {
>>>>         "leader": -1,
>>>>         "controller_epoch": 3,
>>>>         "isr": [],
>>>>         "leader_epoch": 3,
>>>>         "version": 1
>>>>       }
>>>>     }
>>>>   ]
>>>> }
>>>>
>>>> Also 1 point to note is that when Mesos restarts the process it starts
>>>> in a
>>>> different node. So the local state store will not exist there. But I
>>>> expect
>>>> Kafka will create it from the corresponding backed up topic. Hence the
>>>> exception looks a bit confusing to me.
>>>>
>>>> Thoughts ?
>>>>
>>>> regards.
>>>>
>>>> On Wed, Jul 26, 2017 at 3:43 PM, Damian Guy <damian....@gmail.com>
>>>> wrote:
>>>>
>>>> > The exception indicates that streams was unable to find that
>>>> > topic-partition on the kafka brokers. Can you verify that it exists?
>>>> > Also, i'm assuming you are on 0.10.2.x?
>>>> >
>>>> > On Wed, 26 Jul 2017 at 10:54 Debasish Ghosh <ghosh.debas...@gmail.com
>>>> >
>>>> > wrote:
>>>> >
>>>> > > Thanks Damien .. this worked. But now after the application
>>>> restarts, I
>>>> > > see the following exception ..
>>>> > >
>>>> > > 09:41:26.516 TKD [StreamThread-1] ERROR
>>>> > >> c.l.fdp.sample.kstream.WeblogDriver$ - Stream terminated because
>>>> of
>>>> > >> uncaught exception .. Shutting down app
>>>> > >> org.apache.kafka.streams.errors.StreamsException: stream-thread
>>>> > >> [StreamThread-1] Failed to rebalance
>>>> > >>         at
>>>> > >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>> > StreamThread.java:598)
>>>> > >>         at
>>>> > >> org.apache.kafka.streams.processor.internals.
>>>> > StreamThread.run(StreamThread.java:361)
>>>> > >> Caused by: org.apache.kafka.streams.errors.StreamsException: task
>>>> [0_0]
>>>> > >> Store log-counts's change log (kstream-log-count-log-counts-
>>>> changelog)
>>>> > does
>>>> > >> not contain partition 0
>>>> > >>         at
>>>> > >> org.apache.kafka.streams.processor.internals.
>>>> ProcessorStateManager.
>>>> > register(ProcessorStateManager.java:188)
>>>> > >>         at
>>>> > >> org.apache.kafka.streams.processor.internals.
>>>> AbstractProcessorContext.
>>>> > register(AbstractProcessorContext.java:99)
>>>> > >
>>>> > >
>>>> > > I found this thread ..
>>>> > > https://stackoverflow.com/questions/42329387/failed-to-
>>>> > rebalance-error-in-kafka-streams-with-more-than-one-topic-partition
>>>> > > but unlike this use case I don't make any change in the partition
>>>> of any
>>>> > > topic in between the restarts. BTW my application uses stateful
>>>> streaming
>>>> > > and hence Kafka creates any internal topics. Not sure if it's
>>>> related to
>>>> > > this exception though. But the store name mentioned in the exception
>>>> > > (log-count) is one for stateful streaming.
>>>> > >
>>>> > > regards.
>>>> > >
>>>> > > On Wed, Jul 26, 2017 at 2:20 PM, Damian Guy <damian....@gmail.com>
>>>> > wrote:
>>>> > >
>>>> > >> Hi Debasish,
>>>> > >>
>>>> > >> It might be that it is blocked in `streams.close()`
>>>> > >> You might want to to try the overload that has a long and TimeUnit
>>>> as
>>>> > >> params, i.e., `streams.close(1, TimeUnit.MINUTES)`
>>>> > >>
>>>> > >> Thanks,
>>>> > >> Damian
>>>> > >>
>>>> > >> On Wed, 26 Jul 2017 at 09:11 Debasish Ghosh <
>>>> ghosh.debas...@gmail.com>
>>>> > >> wrote:
>>>> > >>
>>>> > >>> Hi -
>>>> > >>>
>>>> > >>> I have a Kafka streams application deployed on a Mesos DC/OS
>>>> cluster.
>>>> > >>> While
>>>> > >>> the application was running, Kafka suddenly reported to be
>>>> unhealthy
>>>> > and
>>>> > >>> the application got an exception ..
>>>> > >>>
>>>> > >>> 07:45:16.606 TKD [StreamThread-1] ERROR
>>>> > >>> c.l.f.s.kstream.WeblogProcessing$ -
>>>> > >>> > Stream terminated because of uncaught exception .. Shutting
>>>> down app
>>>> > >>> > org.apache.kafka.streams.errors.StreamsException: task [1_0]
>>>> > exception
>>>> > >>> > caught when producing
>>>> > >>> >         at
>>>> > >>> >
>>>> > >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>> > checkForException(RecordCollectorImpl.java:121)
>>>> > >>> >         at
>>>> > >>> >
>>>> > >>> org.apache.kafka.streams.processor.internals.
>>>> > RecordCollectorImpl.flush(RecordCollectorImpl.java:129)
>>>> > >>> >         at
>>>> > >>> >
>>>> > >>> org.apache.kafka.streams.processor.internals.
>>>> > StreamTask$1.run(StreamTask.java:76)
>>>> > >>> >         at
>>>> > >>> >
>>>> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>>>> > measureLatencyNs(StreamsMetricsImpl.java:188)
>>>> > >>> >         at
>>>> > >>> >
>>>> > >>> org.apache.kafka.streams.processor.internals.
>>>> > StreamTask.commit(StreamTask.java:280)
>>>> > >>> >         at
>>>> > >>> >
>>>> > >>> org.apache.kafka.streams.processor.internals.
>>>> StreamThread.commitOne(
>>>> > StreamThread.java:807)
>>>> > >>> >         at
>>>> > >>> >
>>>> > >>> org.apache.kafka.streams.processor.internals.
>>>> StreamThread.commitAll(
>>>> > StreamThread.java:794)
>>>> > >>> >         at
>>>> > >>> >
>>>> > >>> org.apache.kafka.streams.processor.internals.
>>>> StreamThread.maybeCommit(
>>>> > StreamThread.java:769)
>>>> > >>> >         at
>>>> > >>> >
>>>> > >>> org.apache.kafka.streams.processor.internals.
>>>> StreamThread.runLoop(
>>>> > StreamThread.java:647)
>>>> > >>> >         at
>>>> > >>> >
>>>> > >>> org.apache.kafka.streams.processor.internals.
>>>> > StreamThread.run(StreamThread.java:361)
>>>> > >>> > Caused by: org.apache.kafka.common.errors.TimeoutException:
>>>> Expiring
>>>> > >>> 205
>>>> > >>> > record(s) for
>>>> > >>> > kstream-log-processing-windowed-access-count-per-
>>>> host-repartition-0:
>>>> > >>> 30020
>>>> > >>> > ms has passed since last attempt plus backoff time
>>>> > >>> > 07:45:16.606 TKD [StreamThread-1] ERROR
>>>> > >>> c.l.f.s.kstream.WeblogProcessing$
>>>> > >>> > - Stopping http service ..
>>>> > >>> > 07:45:16.606 TKD [StreamThread-1] INFO
>>>> > >>> >  c.l.f.s.k.http.WeblogDSLHttpService - Stopping the http server
>>>> > >>> > 07:45:16.607 TKD [StreamThread-1] ERROR
>>>> > >>> c.l.f.s.kstream.WeblogProcessing$
>>>> > >>> > - Stopping streams service ..
>>>> > >>> > 07:45:16.608 TKD [StreamThread-1] INFO
>>>> > >>> >  o.apache.kafka.streams.KafkaStreams - stream-client
>>>> > >>> > [kstream-log-processing-39b51b2b-e8da-4db8-b782-bec4fb030999]
>>>> State
>>>> > >>> > transition from RUNNING to PENDING_SHUTDOWN.
>>>> > >>> > 07:45:16.608 TKD [kafka-streams-close-thread] INFO
>>>> > >>> >  o.a.k.s.p.internals.StreamThread - stream-thread
>>>> [StreamThread-1]
>>>> > >>> Informed
>>>> > >>> > thread to shut down
>>>> > >>> > 07:45:16.609 TKD [kafka-streams-close-thread] WARN
>>>> > >>> >  o.a.k.s.p.internals.StreamThread - stream-thread
>>>> [StreamThread-1]
>>>> > >>> > Unexpected state transition from NOT_RUNNING to
>>>> PENDING_SHUTDOWN.
>>>> > >>> > 07:45:16.627 TKD [default-akka.kafka.default-dispatcher-43]
>>>> INFO
>>>> > >>> >  o.a.k.clients.producer.KafkaProducer - Closing the Kafka
>>>> producer
>>>> > with
>>>> > >>> > timeoutMillis = 60000 ms.
>>>> > >>>
>>>> > >>>
>>>> > >>> The streams application stopped and I have the following exception
>>>> > >>> handler
>>>> > >>> registered ..
>>>> > >>>
>>>> > >>>     // need to exit for any stream exception
>>>> > >>>     // mesos will restart the application
>>>> > >>>     streams.setUncaughtExceptionHandler(new
>>>> > >>> Thread.UncaughtExceptionHandler() {
>>>> > >>>       override def uncaughtException(t: Thread, e: Throwable):
>>>> Unit =
>>>> > >>> try {
>>>> > >>>         logger.error(s"Stream terminated because of uncaught
>>>> exception
>>>> > ..
>>>> > >>> Shutting down app", e)
>>>> > >>>         logger.error(s"Stopping http service ..")
>>>> > >>>         restService.stop()
>>>> > >>>         logger.error(s"Stopping streams service ..")
>>>> > >>>         streams.close()
>>>> > >>>       } catch {
>>>> > >>>         case _: Exception =>
>>>> > >>>       } finally {
>>>> > >>>         System.exit(-1)
>>>> > >>>       }
>>>> > >>>     })
>>>> > >>>
>>>> > >>> Ideally the application should terminate and Mesos should have
>>>> > restarted
>>>> > >>> it. But I see that the application doesn't terminate though I
>>>> have a
>>>> > >>> System.exit(-1) in the finally clause. Any idea what's happening
>>>> or how
>>>> > >>> can
>>>> > >>> I make the application terminate ..
>>>> > >>>
>>>> > >>> Any help will be appreciated ..
>>>> > >>>
>>>> > >>> regards.
>>>> > >>>
>>>> > >>>
>>>> > >>> --
>>>> > >>> Debasish Ghosh
>>>> > >>> http://manning.com/ghosh2
>>>> > >>> http://manning.com/ghosh
>>>> > >>>
>>>> > >>> Twttr: @debasishg
>>>> > >>> Blog: http://debasishg.blogspot.com
>>>> > >>> Code: http://github.com/debasishg
>>>> > >>>
>>>> > >>
>>>> > >
>>>> > >
>>>> > > --
>>>> > > Debasish Ghosh
>>>> > > http://manning.com/ghosh2
>>>> > > http://manning.com/ghosh
>>>> > >
>>>> > > Twttr: @debasishg
>>>> > > Blog: http://debasishg.blogspot.com
>>>> > > Code: http://github.com/debasishg
>>>> > >
>>>> >
>>>>
>>>>
>>>>
>>>> --
>>>> Debasish Ghosh
>>>> http://manning.com/ghosh2
>>>> http://manning.com/ghosh
>>>>
>>>> Twttr: @debasishg
>>>> Blog: http://debasishg.blogspot.com
>>>> Code: http://github.com/debasishg
>>>>
>>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to