Hi Damian -

Yes, it exists .. It's actually a change log topic corresponding to the
state store log-count

$ dcos confluent-kafka topic describe kstream-log-count-log-counts-changelog
{
  "partitions": [
    {
      "0": {
        "leader": -1,
        "controller_epoch": 3,
        "isr": [],
        "leader_epoch": 3,
        "version": 1
      }
    }
  ]
}

Also 1 point to note is that when Mesos restarts the process it starts in a
different node. So the local state store will not exist there. But I expect
Kafka will create it from the corresponding backed up topic. Hence the
exception looks a bit confusing to me.

Thoughts ?

regards.

On Wed, Jul 26, 2017 at 3:43 PM, Damian Guy <damian....@gmail.com> wrote:

> The exception indicates that streams was unable to find that
> topic-partition on the kafka brokers. Can you verify that it exists?
> Also, i'm assuming you are on 0.10.2.x?
>
> On Wed, 26 Jul 2017 at 10:54 Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
> > Thanks Damien .. this worked. But now after the application restarts, I
> > see the following exception ..
> >
> > 09:41:26.516 TKD [StreamThread-1] ERROR
> >> c.l.fdp.sample.kstream.WeblogDriver$ - Stream terminated because of
> >> uncaught exception .. Shutting down app
> >> org.apache.kafka.streams.errors.StreamsException: stream-thread
> >> [StreamThread-1] Failed to rebalance
> >>         at
> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:598)
> >>         at
> >> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> >> Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0]
> >> Store log-counts's change log (kstream-log-count-log-counts-changelog)
> does
> >> not contain partition 0
> >>         at
> >> org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> register(ProcessorStateManager.java:188)
> >>         at
> >> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
> register(AbstractProcessorContext.java:99)
> >
> >
> > I found this thread ..
> > https://stackoverflow.com/questions/42329387/failed-to-
> rebalance-error-in-kafka-streams-with-more-than-one-topic-partition
> > but unlike this use case I don't make any change in the partition of any
> > topic in between the restarts. BTW my application uses stateful streaming
> > and hence Kafka creates any internal topics. Not sure if it's related to
> > this exception though. But the store name mentioned in the exception
> > (log-count) is one for stateful streaming.
> >
> > regards.
> >
> > On Wed, Jul 26, 2017 at 2:20 PM, Damian Guy <damian....@gmail.com>
> wrote:
> >
> >> Hi Debasish,
> >>
> >> It might be that it is blocked in `streams.close()`
> >> You might want to to try the overload that has a long and TimeUnit as
> >> params, i.e., `streams.close(1, TimeUnit.MINUTES)`
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Wed, 26 Jul 2017 at 09:11 Debasish Ghosh <ghosh.debas...@gmail.com>
> >> wrote:
> >>
> >>> Hi -
> >>>
> >>> I have a Kafka streams application deployed on a Mesos DC/OS cluster.
> >>> While
> >>> the application was running, Kafka suddenly reported to be unhealthy
> and
> >>> the application got an exception ..
> >>>
> >>> 07:45:16.606 TKD [StreamThread-1] ERROR
> >>> c.l.f.s.kstream.WeblogProcessing$ -
> >>> > Stream terminated because of uncaught exception .. Shutting down app
> >>> > org.apache.kafka.streams.errors.StreamsException: task [1_0]
> exception
> >>> > caught when producing
> >>> >         at
> >>> >
> >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:121)
> >>> >         at
> >>> >
> >>> org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.flush(RecordCollectorImpl.java:129)
> >>> >         at
> >>> >
> >>> org.apache.kafka.streams.processor.internals.
> StreamTask$1.run(StreamTask.java:76)
> >>> >         at
> >>> >
> >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
> >>> >         at
> >>> >
> >>> org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:280)
> >>> >         at
> >>> >
> >>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> StreamThread.java:807)
> >>> >         at
> >>> >
> >>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> StreamThread.java:794)
> >>> >         at
> >>> >
> >>> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> StreamThread.java:769)
> >>> >         at
> >>> >
> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:647)
> >>> >         at
> >>> >
> >>> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> >>> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring
> >>> 205
> >>> > record(s) for
> >>> > kstream-log-processing-windowed-access-count-per-host-repartition-0:
> >>> 30020
> >>> > ms has passed since last attempt plus backoff time
> >>> > 07:45:16.606 TKD [StreamThread-1] ERROR
> >>> c.l.f.s.kstream.WeblogProcessing$
> >>> > - Stopping http service ..
> >>> > 07:45:16.606 TKD [StreamThread-1] INFO
> >>> >  c.l.f.s.k.http.WeblogDSLHttpService - Stopping the http server
> >>> > 07:45:16.607 TKD [StreamThread-1] ERROR
> >>> c.l.f.s.kstream.WeblogProcessing$
> >>> > - Stopping streams service ..
> >>> > 07:45:16.608 TKD [StreamThread-1] INFO
> >>> >  o.apache.kafka.streams.KafkaStreams - stream-client
> >>> > [kstream-log-processing-39b51b2b-e8da-4db8-b782-bec4fb030999] State
> >>> > transition from RUNNING to PENDING_SHUTDOWN.
> >>> > 07:45:16.608 TKD [kafka-streams-close-thread] INFO
> >>> >  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> >>> Informed
> >>> > thread to shut down
> >>> > 07:45:16.609 TKD [kafka-streams-close-thread] WARN
> >>> >  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> >>> > Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN.
> >>> > 07:45:16.627 TKD [default-akka.kafka.default-dispatcher-43] INFO
> >>> >  o.a.k.clients.producer.KafkaProducer - Closing the Kafka producer
> with
> >>> > timeoutMillis = 60000 ms.
> >>>
> >>>
> >>> The streams application stopped and I have the following exception
> >>> handler
> >>> registered ..
> >>>
> >>>     // need to exit for any stream exception
> >>>     // mesos will restart the application
> >>>     streams.setUncaughtExceptionHandler(new
> >>> Thread.UncaughtExceptionHandler() {
> >>>       override def uncaughtException(t: Thread, e: Throwable): Unit =
> >>> try {
> >>>         logger.error(s"Stream terminated because of uncaught exception
> ..
> >>> Shutting down app", e)
> >>>         logger.error(s"Stopping http service ..")
> >>>         restService.stop()
> >>>         logger.error(s"Stopping streams service ..")
> >>>         streams.close()
> >>>       } catch {
> >>>         case _: Exception =>
> >>>       } finally {
> >>>         System.exit(-1)
> >>>       }
> >>>     })
> >>>
> >>> Ideally the application should terminate and Mesos should have
> restarted
> >>> it. But I see that the application doesn't terminate though I have a
> >>> System.exit(-1) in the finally clause. Any idea what's happening or how
> >>> can
> >>> I make the application terminate ..
> >>>
> >>> Any help will be appreciated ..
> >>>
> >>> regards.
> >>>
> >>>
> >>> --
> >>> Debasish Ghosh
> >>> http://manning.com/ghosh2
> >>> http://manning.com/ghosh
> >>>
> >>> Twttr: @debasishg
> >>> Blog: http://debasishg.blogspot.com
> >>> Code: http://github.com/debasishg
> >>>
> >>
> >
> >
> > --
> > Debasish Ghosh
> > http://manning.com/ghosh2
> > http://manning.com/ghosh
> >
> > Twttr: @debasishg
> > Blog: http://debasishg.blogspot.com
> > Code: http://github.com/debasishg
> >
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to