[ 
https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522031#comment-16522031
 ] 

Ted Yu commented on KAFKA-7088:
-------------------------------

Maybe set log level to DEBUG when you have a chance.

Thanks

> Kafka streams thread waits infinitely on transaction init
> ---------------------------------------------------------
>
>                 Key: KAFKA-7088
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7088
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 1.0.1
>         Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 
> 20:07:43 UTC 2018 
> kafka-streams (client) 1.0.1
> kafka broker 1.1.0
> Java version:
> OpenJDK Runtime Environment (build 1.8.0_171-b10)
> OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode)
> kakfa config overrides:
> num.stream.threads: 6
> session.timeout.ms: 10000
> request.timeout.ms: 11000
> fetch.max.wait.ms: 500
> max.poll.records: 1000
> topic has 24 partitions
>            Reporter: Lukasz Gluchowski
>            Priority: Major
>              Labels: eos
>
> A kafka stream application thread stops processing without any feedback. The 
> topic has 24 partitions and I noticed that processing stopped only for some 
> partitions. I will describe what happened to partition:10. The application is 
> still running (now for about 8 hours) and that thread is hanging there and no 
> rebalancing that took place.
> There is no error (we have a custom `Thread.UncaughtExceptionHandler` which 
> was not called). I noticed that after couple of minutes stream stopped 
> processing (at offset 32606948 where log-end-offset is 33472402). 
> Broker itself is not reporting any active consumer in that consumer group and 
> the only info I was able to gather was from thread dump:
> {code:java}
> "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33"
>  #113 prio=5 os_prio=0 tid=0x00007fe07c6349b0 nid=0xf7a waiting on condition 
> [0x00007fe0215d4000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000000fec6a2f8> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at 
> org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:151)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744){code}
>  
> I tried restarting application once but the situation repeated. Thread read 
> some data, committed offset and stopped processing, leaving that thread in 
> wait state.
> FYI: we have EOS enabled



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to