Rajini Sivaram created KAFKA-6710:
-------------------------------------

             Summary: Streams integration tests hang during shutdown
                 Key: KAFKA-6710
                 URL: https://issues.apache.org/jira/browse/KAFKA-6710
             Project: Kafka
          Issue Type: Bug
          Components: core, streams
    Affects Versions: 1.1.0
            Reporter: Rajini Sivaram
            Assignee: Rajini Sivaram


Builds have been timing out a lot recently and many of the logs show streams 
integration tests being run, but not completed. While running tests locally, I 
saw a failure during shutdown of {{TableTableJoinIntegrationTest}}. The test 
was stuck waiting for a broker to shutdown when a {{KafkaScheduler}} was 
attemping to delete logs. KAFKA-6624 (Commit 
#1ea07b993d75ed68f4c04282eb177bf84156e0b2) added a _Thread.sleep_ to wait for 
the time to delete each log segment inside the scheduled delete task. The 
failing streams test had 62 logs to delete and since MockTime doesn't get 
updated during the test, it would have waited for 62 minutes to complete. This 
blocks shutdown of the broker for 62 minutes. This is an issue if a streams 
integration test takes more than 30 seconds when the first delayed delete task 
is scheduled to be run.

Changing _Thread.sleep_ to _time.sleep_ fixes this test issue. But it will be 
good to know why we have a _sleep_ on a _Scheduler_ at all. With the default 
_log.segment.delete.delay.ms_ of one minute, this potentially blocks a 
scheduler thread for upto a minute when there are logs to be deleted. Couldn't 
we just break out of the loop if it is not yet time to delete the first log 
segment in the list? The log would then get deleted when the broker checks next 
time. [~junrao] [~lindong] ?

 

*Stack trace from failing test*:

{{"kafka-scheduler-8" daemon prio=5 tid=0x00007fe58dc16800 nid=0x9603 waiting 
on condition [0x0000700003f25000]}}
{{   java.lang.Thread.State: TIMED_WAITING (sleeping)}}
{{        at java.lang.Thread.sleep(Native Method)}}
{{        at 
kafka.log.LogManager.kafka$log$LogManager$$deleteLogs(LogManager.scala:717)}}
{{        at 
kafka.log.LogManager$$anonfun$3.apply$mcV$sp(LogManager.scala:406)}}
{{        at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)}}
{{        at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)}}
{{        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)}}
{{        at java.util.concurrent.FutureTask.run(FutureTask.java:262)}}
{{        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)}}
{{        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)}}
{{        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)}}
{{        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)}}
{{        at java.lang.Thread.run(Thread.java:745)}}{{}}

{{}}{{"Test worker" prio=5 tid=0x00007fe58db72000 nid=0x5203 waiting on 
condition [0x0000700001cbd000]}}
{{   java.lang.Thread.State: TIMED_WAITING (parking)}}
{{        at sun.misc.Unsafe.park(Native Method)}}
{{        - parking to wait for  <0x0000000780fb8918> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}}
{{        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)}}
{{        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)}}
{{        at 
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1468)}}
{{        at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:98)}}
{{        at 
kafka.server.KafkaServer$$anonfun$shutdown$5.apply$mcV$sp(KafkaServer.scala:569)}}
{{        at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:85)}}
{{        at kafka.server.KafkaServer.shutdown(KafkaServer.scala:569)}}
{{        at 
org.apache.kafka.streams.integration.utils.KafkaEmbedded.stop(KafkaEmbedded.java:129)}}
{{        at 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.stop(EmbeddedKafkaCluster.java:126)}}
{{        at 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.after(EmbeddedKafkaCluster.java:158)}}
{{        at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:50)}}
{{        at org.junit.rules.RunRules.evaluate(RunRules.java:20)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to