Sagar Rao created KAFKA-14401:
---------------------------------

             Summary: Connector/Tasks reading offsets can get stuck if 
underneath WorkThread dies
                 Key: KAFKA-14401
                 URL: https://issues.apache.org/jira/browse/KAFKA-14401
             Project: Kafka
          Issue Type: Bug
            Reporter: Sagar Rao


When a connector or task tries to read the offsets from the offsets topic, it 
issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
underneath KafkaBackingStore. KafkaBackingStore invokes 
`KafkaBasedLog#readToEnd` method and passes the Callback. This method 
essentially adds the Callback to a Queue of callbacks that are being managed.

Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
callback queue and executes them and it does this in an infinite loop. However, 
there is an enclosing try/catch block around the while loop. If there's an 
exception thrown which is not caught by any of the other catch blocks, the 
control goes to the outermost catch block and the WorkThread is terminated. 
However, the connectors/tasks are not aware of this and they would keep 
submitting callbacks to KafkaBasedLog with nobody processing them. This can be 
seen in the thread dumps as well:

 

```

"task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
tid=0x00007f8d9c037000 nid=0x5d00 waiting on condition  [0x00007f8dc08cd000]
   java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
    - parking to wait for  <0x000000070345c9a8> (a 
java.util.concurrent.CountDownLatch$Sync)
    at 
java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
    at 
java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
    at 
org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)

```

 

We need a mechanism to restart the WorkThread if it dies. This could be done in 
the outermost catch block for example.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to