Thanks Eno, for the info!.  I will try your suggestion.

2017-06-27 14:04 GMT+02:00 Eno Thereska <eno.there...@gmail.com>:

> Thanks. I believe we’ve addressed this issue in 0.10.2.1, any chance you
> could try that?
>
> Thanks
> Eno
> > On Jun 27, 2017, at 11:14 AM, D Stephan <kafkastre...@gmail.com> wrote:
> >
> > Hello,
> >
> > Thanks for your reply.
> >
> > I use Kafka & KafkaStream version 0.10.2.0.
> > Between the runs, the number of partitions are not intentionally changed
> > programmatically or manually.
> >
> > This topic:  "external-batch-request-store-repartition" is an internally
> > generated topic from this KafkaStream DSL
> > "aggregate"
> > https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/
> KGroupedStream.html#aggregate(org.apache.kafka.streams.
> kstream.Initializer,%20org.apache.kafka.streams.kstream.
> Aggregator,%20org.apache.kafka.streams.kstream.Windows,
> %20org.apache.kafka.common.serialization.Serde,%20java.lang.String)
> >
> >
> >
> > I use this API as follows:
> >
> > ...
> > .groupByKey()
> > .aggregate(...)
> > .toStream(...);
> >
> >
> > Please let me know if you need addiotional information.
> >
> > Thanks,
> >
> >
> > 2017-06-27 11:39 GMT+02:00 Eno Thereska <eno.there...@gmail.com>:
> >
> >> Hi there,
> >>
> >> Thanks for the report. What version of Kafka are you using? Also,
> between
> >> runs do you change the number of partitions for your topics? I’m trying
> to
> >> figure out how this problem happens, any information on what is
> changing in
> >> between runs is appreciated.
> >>
> >> Thanks,
> >> Eno
> >>
> >>> On Jun 27, 2017, at 8:52 AM, D Stephan <kafkastre...@gmail.com> wrote:
> >>>
> >>> Hello,
> >>>
> >>> When I use KafkaStreams DSL GroupByKey and Aggregate APIs, I have
> >> randomly
> >>> & frequently below exceptions:
> >>> In my opinion, it is not practical to clean up the invalid partitions
> >>> everydays.  For your information, this partition is an internal
> partition
> >>> that automatically created by KafkaStream Aggregate API.
> >>> Dou you have any idea or workarounds to mitigate this exception?
> >>>
> >>>
> >>>
> >>>
> >>> 2017-06-21T06:48:31.488210812Z 2017-06-21 06:48:31.487 WARN 1 --- [
> >>> StreamThread-4] o.a.k.s.p.i.InternalTopicManager :
> >>> Could not create internal topics: Existing internal topic
> >>> external-batch-request-store-repartition has invalid partitions.
> >>> Expected: 20 Actual: 1. Use 'kafka.tools.StreamsResetter' tool to clean
> >> up
> >>> invalid topics before processing. Retry #4
> >>>
> >>> 2017-06-21T06:48:31.491071442Z Exception in thread "StreamThread-4"
> >>> org.apache.kafka.streams.errors.StreamsException: Could not create
> >> internal
> >>> topics.
> >>> 2017-06-21T06:48:31.491087557Z at
> >>> org.apache.kafka.streams.processor.internals.InternalTopicManager.
> >> makeReady(InternalTopicManager.java:70)
> >>> 2017-06-21T06:48:31.491091661Z at
> >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
> >> prepareTopic(StreamPartitionAssignor.java:618)
> >>> 2017-06-21T06:48:31.491096794Z at
> >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
> >> assign(StreamPartitionAssignor.java:372)
> >>> 2017-06-21T06:48:31.491368662Z at
> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> >> performAssignment(ConsumerCoordinator.java:339)
> >>> 2017-06-21T06:48:31.491390576Z at
> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >> onJoinLeader(AbstractCoordinator.java:488)
> >>> 2017-06-21T06:48:31.491397476Z at
> >>> org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator.access$
> >> 1100(AbstractCoordinator.java:89)
> >>> 2017-06-21T06:48:31.491403757Z at
> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> >> JoinGroupResponseHandler.handle(AbstractCoordinator.java:438)
> >>> 2017-06-21T06:48:31.491408328Z at
> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> >> JoinGroupResponseHandler.handle(AbstractCoordinator.java:420)
> >>> 2017-06-21T06:48:31.491413053Z at
> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> >> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)
> >>
> >>
>
>

Reply via email to