Hi,

I'm embedding the kafka server (0.7.2) in an application container.   I've
noticed that if I try to start the server without zookeeper being
available, by default it gets a zk connection timeout after 6 seconds, and
then throws an Exception out of KafkaServer.startup()....E.g., I see this
stack trace:

Exception in thread "main"
org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to
zookeeper server within timeout: 6000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:84)
at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44)
at kafka.log.LogManager.<init>(LogManager.scala:93)
at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
        ....
        ....

So that's ok, I can catch the exception, and then shut everything down
gracefully, in this case.  However, when I do this, it seems there is a
daemon thread still around, which doesn't quit, and so the server never
actually exits the jvm.  Specifically, this thread seems to hang around:

"kafka-logcleaner-0" prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on
condition [112c07000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <7f40d4be8> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:680)

Looking at the code in kafka.log.LogManager(), it does seem like it starts
up the scheduler to clean logs, before then trying to connect to zk (and in
this case fail):

  /* Schedule the cleanup task to delete old logs */
  if(scheduler != null) {
    info("starting log cleaner every " + logCleanupIntervalMs + " ms")
    scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
  }

So this scheduler does not appear to be stopped if startup fails.  However,
if I catch the above RuntimeException, and then call
KafkaServer.shutdown(), then it will stop the scheduler, and all is good.

However, it seems odd that if I get an exception when calling
KafkaServer.startup(), that I should still have to do a
KafkaServer.shutdown().  Rather, wouldn't it be better to have it
internally cleanup after itself if startup() gets an exception?  I'm not
sure I can reliably call shutdown() after a failed startup()....

Thoughts?

Jason

Reply via email to