Thanks Eno, for the info!. I will try your suggestion. 2017-06-27 14:04 GMT+02:00 Eno Thereska <eno.there...@gmail.com>:
> Thanks. I believe we’ve addressed this issue in 0.10.2.1, any chance you > could try that? > > Thanks > Eno > > On Jun 27, 2017, at 11:14 AM, D Stephan <kafkastre...@gmail.com> wrote: > > > > Hello, > > > > Thanks for your reply. > > > > I use Kafka & KafkaStream version 0.10.2.0. > > Between the runs, the number of partitions are not intentionally changed > > programmatically or manually. > > > > This topic: "external-batch-request-store-repartition" is an internally > > generated topic from this KafkaStream DSL > > "aggregate" > > https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/ > KGroupedStream.html#aggregate(org.apache.kafka.streams. > kstream.Initializer,%20org.apache.kafka.streams.kstream. > Aggregator,%20org.apache.kafka.streams.kstream.Windows, > %20org.apache.kafka.common.serialization.Serde,%20java.lang.String) > > > > > > > > I use this API as follows: > > > > ... > > .groupByKey() > > .aggregate(...) > > .toStream(...); > > > > > > Please let me know if you need addiotional information. > > > > Thanks, > > > > > > 2017-06-27 11:39 GMT+02:00 Eno Thereska <eno.there...@gmail.com>: > > > >> Hi there, > >> > >> Thanks for the report. What version of Kafka are you using? Also, > between > >> runs do you change the number of partitions for your topics? I’m trying > to > >> figure out how this problem happens, any information on what is > changing in > >> between runs is appreciated. > >> > >> Thanks, > >> Eno > >> > >>> On Jun 27, 2017, at 8:52 AM, D Stephan <kafkastre...@gmail.com> wrote: > >>> > >>> Hello, > >>> > >>> When I use KafkaStreams DSL GroupByKey and Aggregate APIs, I have > >> randomly > >>> & frequently below exceptions: > >>> In my opinion, it is not practical to clean up the invalid partitions > >>> everydays. For your information, this partition is an internal > partition > >>> that automatically created by KafkaStream Aggregate API. > >>> Dou you have any idea or workarounds to mitigate this exception? > >>> > >>> > >>> > >>> > >>> 2017-06-21T06:48:31.488210812Z 2017-06-21 06:48:31.487 WARN 1 --- [ > >>> StreamThread-4] o.a.k.s.p.i.InternalTopicManager : > >>> Could not create internal topics: Existing internal topic > >>> external-batch-request-store-repartition has invalid partitions. > >>> Expected: 20 Actual: 1. Use 'kafka.tools.StreamsResetter' tool to clean > >> up > >>> invalid topics before processing. Retry #4 > >>> > >>> 2017-06-21T06:48:31.491071442Z Exception in thread "StreamThread-4" > >>> org.apache.kafka.streams.errors.StreamsException: Could not create > >> internal > >>> topics. > >>> 2017-06-21T06:48:31.491087557Z at > >>> org.apache.kafka.streams.processor.internals.InternalTopicManager. > >> makeReady(InternalTopicManager.java:70) > >>> 2017-06-21T06:48:31.491091661Z at > >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor. > >> prepareTopic(StreamPartitionAssignor.java:618) > >>> 2017-06-21T06:48:31.491096794Z at > >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor. > >> assign(StreamPartitionAssignor.java:372) > >>> 2017-06-21T06:48:31.491368662Z at > >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > >> performAssignment(ConsumerCoordinator.java:339) > >>> 2017-06-21T06:48:31.491390576Z at > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >> onJoinLeader(AbstractCoordinator.java:488) > >>> 2017-06-21T06:48:31.491397476Z at > >>> org.apache.kafka.clients.consumer.internals. > AbstractCoordinator.access$ > >> 1100(AbstractCoordinator.java:89) > >>> 2017-06-21T06:48:31.491403757Z at > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$ > >> JoinGroupResponseHandler.handle(AbstractCoordinator.java:438) > >>> 2017-06-21T06:48:31.491408328Z at > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$ > >> JoinGroupResponseHandler.handle(AbstractCoordinator.java:420) > >>> 2017-06-21T06:48:31.491413053Z at > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$ > >> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764) > >> > >> > >