To test Kafka streams on 0.10.2.0, we setup a new Kafka cluster with the
latest version and used mirror maker to replicate the data from the
0.10.0.0 Kafka cluster. We pointed our streaming app to the newly created
Kafka cluster.

We have 5 nodes, each running the streaming app with 10 threads. In less
than 10 minutes, the process on all the 5 nodes died with different
exceptions. Below are the different stack traces we got.

Any help would be really appreciated.

*Stacktrace # 1 (got on 3 of 5 nodes):*

18:58:00.349 [StreamThread-2] INFO o.a.k.s.p.i.StreamThread - stream-thread
[StreamThread-2] Stream thread shutdown complete
18:58:00.349 [StreamThread-2] WARN o.a.k.s.p.i.StreamThread - Unexpected
state transition from RUNNING to NOT_RUNNING
Exception in thread "StreamThread-2"
org.apache.kafka.streams.errors.StreamsException: Exception caught in
process. taskId=1_396, processor=KSTREAM-SOURCE-0000000004,
topic=topicname, partition=396, offset=66839
        at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
store %s has closed
        at
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDbIterator.hasNext(RocksDBStore.java:398)
        at
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBRangeIterator.hasNext(RocksDBStore.java:457)
        at
org.apache.kafka.streams.state.internals.WindowStoreKeySchema$1.hasNext(WindowStoreKeySchema.java:30)
        at
org.apache.kafka.streams.state.internals.SegmentIterator.hasNext(SegmentIterator.java:69)
        at
org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore$MeteredSegmentedBytesStoreIterator.hasNext(MeteredSegmentedBytesStore.java:131)
        at
org.apache.kafka.streams.state.internals.RocksDBWindowStore$TheWindowStoreIterator.hasNext(RocksDBWindowStore.java:131)
        at
org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStoreIterator.hasNext(AbstractMergedSortedCacheStoreIterator.java:74)
        at
org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:97)
        at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
        at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
        at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
        at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
        at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
        at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
        ... 2 more


*Stacktrace # 2 (got on 1 node):*

18:57:44.692 [StreamThread-2] INFO o.a.k.s.p.i.StreamThread - stream-thread
[StreamThread-2] Stream thread shutdown complete
18:57:44.692 [StreamThread-2] WARN o.a.k.s.p.i.StreamThread - Unexpected
state transition from ASSIGNING_PARTITIONS to NOT_RUNNING
Exception in thread "StreamThread-2"
org.apache.kafka.streams.errors.StreamsException: stream-thread
[StreamThread-2] Failed to rebalance
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:612)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: java.lang.IllegalArgumentException: A metric named 'MetricName
[name=1_234-sometopic-hitRatio-avg, group=stream-record-cache-metrics,
description=The current count of 1_234-sometopic hitRatio operation.,
tags={record-cache-id=1_234-sometopic}]' already exists, can't register
another one.
        at
org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:433)
        at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249)
        at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234)
        at
org.apache.kafka.streams.state.internals.NamedCache$NamedCacheMetrics.<init>(NamedCache.java:388)
        at
org.apache.kafka.streams.state.internals.NamedCache.<init>(NamedCache.java:62)
        at
org.apache.kafka.streams.state.internals.ThreadCache.getOrCreateCache(ThreadCache.java:226)
        at
org.apache.kafka.streams.state.internals.ThreadCache.addDirtyEntryFlushListener(ThreadCache.java:87)
        at
org.apache.kafka.streams.state.internals.CachingWindowStore.initInternal(CachingWindowStore.java:74)
        at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:62)
        at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
        at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
        at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
        at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
        at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
        at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
        at
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
        at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
        ... 1 more

*Stacktrace # 3 (got on 1 node):*

19:07:34.827 [StreamThread-1] WARN o.a.k.s.p.i.StreamThread - Could not
create task 0_192. Will retry.
org.apache.kafka.streams.errors.LockException: task [0_192] Failed to lock
the state directory: /tmp/kafka-streams/streams_test_2/0_192
        at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
~[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
~[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
~[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
[app-name-1.2.1.jar:na]



On Sat, Mar 18, 2017 at 5:58 AM, Mahendra Kariya <mahendra.kar...@go-jek.com
> wrote:

> Thanks for the heads up Guozhang!
>
> The problem is our brokers are on 0.10.0.x. So we will have to upgrade
> them.
>
> On Sat, Mar 18, 2017 at 12:30 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
>> Hi Mahendra,
>>
>> Just a kind reminder that upgrading Streams to 0.10.2 does not necessarily
>> require you to upgrade brokers to 0.10.2 as well. Since we have added a
>> new
>> feature since 0.10.2 to allow newer versioned clients (producer, consumer,
>> streams) to talk to older versioned brokers, and for Streams specifically
>> it only requires brokers to be no older than 0.10.1.
>>
>>
>> Guozhang
>>
>>
>> On Mon, Mar 13, 2017 at 5:12 AM, Mahendra Kariya <
>> mahendra.kar...@go-jek.com
>> > wrote:
>>
>> > We are planning to migrate to the newer version of Kafka. But that's a
>> few
>> > weeks away.
>> >
>> > We will try setting the socket config and see how it turns out.
>> >
>> > Thanks a lot for your response!
>> >
>> >
>> >
>> > On Mon, Mar 13, 2017 at 3:21 PM, Eno Thereska <eno.there...@gmail.com>
>> > wrote:
>> >
>> > > Thanks,
>> > >
>> > > A couple of things:
>> > > - I’d recommend moving to 0.10.2 (latest release) if you can since
>> > several
>> > > improvements were made in the last two releases that make rebalancing
>> and
>> > > performance better.
>> > >
>> > > - When running on environments with large latency on AWS at least
>> > (haven’t
>> > > tried Google cloud), one parameter we have found useful to increase
>> > > performance is the receive and send socket size for the consumer and
>> > > producer in streams. We’d recommend setting them to 1MB like this
>> (where
>> > > “props” is your own properties object when you start streams):
>> > >
>> > > // the socket buffer needs to be large, especially when running in AWS
>> > with
>> > > // high latency. if running locally the default is fine.
>> > > props.put(ProducerConfig.SEND_BUFFER_CONFIG, 1024 * 1024);
>> > > props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
>> > >
>> > > Make sure the OS allows the larger socket size too.
>> > >
>> > > Thanks
>> > > Eno
>> > >
>> > > > On Mar 13, 2017, at 9:21 AM, Mahendra Kariya <
>> > mahendra.kar...@go-jek.com>
>> > > wrote:
>> > > >
>> > > > Hi Eno,
>> > > >
>> > > > Please find my answers inline.
>> > > >
>> > > >
>> > > > We are in the process of documenting capacity planning for streams,
>> > stay
>> > > tuned.
>> > > >
>> > > > This would be great! Looking forward to it.
>> > > >
>> > > > Could you send some more info on your problem? What Kafka version
>> are
>> > > you using?
>> > > >
>> > > > We are using Kafka 0.10.0.0.
>> > > >
>> > > > Are the VMs on the same or different hosts?
>> > > >
>> > > > The VMs are on Google Cloud. Two of them are in asia-east1-a and
>> one is
>> > > in asia-east1-c. All three are n1-standard-4 Ubuntu instances.
>> > > >
>> > > > Also what exactly do you mean by “the lag keeps fluctuating”, what
>> > > metric are you looking at?
>> > > >
>> > > > We are looking at Kafka Manager for the time being. By fluctuating,
>> I
>> > > mean the lag is few thousands at one time, we refresh it the next
>> second,
>> > > it is in few lakhs, and again refresh it and it is few thousands. I
>> > > understand this may not be very accurate. We will soon have more
>> accurate
>> > > data once we start pushing the consumer lag metric to Datadog.
>> > > >
>> > > > But on a separate note, the difference between lags on different
>> > > partitions is way too high. I have attached a tab separated file
>> herewith
>> > > which shows the consumer lag (from Kafka Manager) for the first the 50
>> > > partitions. As is clear, the lag on partition 2 is 530 while the lag
>> on
>> > > partition 18 is 23K. Note that the same VM is pulling data from both
>> the
>> > > partitions.
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > <KafkaLags.tsv>
>> > >
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Reply via email to