[ https://issues.apache.org/jira/browse/KAFKA-14401 ]


    Sagar Rao deleted comment on KAFKA-14401:
    -----------------------------------

was (Author: sagarrao):
There are a couple of routes that can be taken here:

 

1) As discussed, we try to reinitiate the WorkThread in case the work thread 
dies due to the unhandled exception. This could be useful to have the offsets 
read making progress and the connectors don't realise it. Problem is if it's a 
fatal exception, we might still re-create the thread. This could be both a good 
thing and a bad thing tbh. Of-course we would still log the exception(already 
happening).

2) Extending the idea from pt#1, we can create a single threaded thread pool 
which would take care of recreating the thread if it dies. It has the same pros 
and cons as #1.

3) The other extreme could be to not accept any offsets request and  fail the 
worker somehow. 

> Connector/Tasks reading offsets can get stuck if underneath WorkThread dies
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-14401
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14401
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Sagar Rao
>            Assignee: Sagar Rao
>            Priority: Major
>
> When a connector or task tries to read the offsets from the offsets topic, it 
> issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
> underneath KafkaBackingStore. KafkaBackingStore invokes 
> `KafkaBasedLog#readToEnd` method and passes the Callback. This method 
> essentially adds the Callback to a Queue of callbacks that are being managed.
> Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
> callback queue and executes them and it does this in an infinite loop. 
> However, there is an enclosing try/catch block around the while loop. If 
> there's an exception thrown which is not caught by any of the other catch 
> blocks, the control goes to the outermost catch block and the WorkThread is 
> terminated. However, the connectors/tasks are not aware of this and they 
> would keep submitting callbacks to KafkaBasedLog with nobody processing them. 
> This can be seen in the thread dumps as well:
>  
> {code:java}
> "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
> tid=0x00007f8d9c037000 nid=0x5d00 waiting on condition  [0x00007f8dc08cd000]
>    java.lang.Thread.State: WAITING (parking)
>     at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
>     - parking to wait for  <0x000000070345c9a8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>     at 
> java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
>     at 
> java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
>     at 
> org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
>  {code}
>  
> We need a mechanism to fail all such offset read requests. That is because 
> even if we restart the thread, chances are it will still fail with the same 
> error so the offset fetch would be stuck perennially.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to