[ 
https://issues.apache.org/jira/browse/KAFKA-8790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qinghui Xu updated KAFKA-8790:
------------------------------
    Description: 
We have a kafka (source) connector that's copying data from some kafka cluster 
to the target cluster. The connector is deployed to a bunch of workers running 
on mesos, thus the lifecycle of the workers are managed by mesos. Workers 
should be recovered by mesos in case of failure, and then source tasks will 
rely on kafka connect's KafkaOffsetBackingStore to recover the offsets to 
proceed.

Recently we witness some unrecoverable situation, though: worker is not doing 
anything after some network reset on the host where the worker is running. More 
specifically, it seems that the kafka connect tasks' on that worker stop to 
poll source kafka cluster, because the consumers are stuck in a rebalance state.

After some digging, we found that the thread to handle the source task offset 
recovery is dead, which makes the all rebalancing tasks stuck in the state of 
reading back the offset. The log we saw in our connect task:
{code:java}
2019-08-12 14:29:28,089 ERROR Unexpected exception in Thread[KafkaBasedLog Work 
Thread - kc_replicator_offsets,5,main] 
(org.apache.kafka.connect.util.KafkaBasedLog)
org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times 
in 30001ms{code}
As far as I can see 
([https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L339]),
 the thread will be dead in case of error, while the worker is still alive, 
which means a worker without the thread to recover offset thus all tasks on 
that worker are not recoverable and will stuck in case of failure.

 

Solution to fix this issue will ideally the following:
 * Make the KafkaBasedLog Work Thread recover from some recoverable errors such 
as `TimeoutException` or more generally `RetriableException` (maybe 
`KafkaException` is too generic to handle, and some of them might be 
unrecoverable)
 * If several consecutive retries in the first case fail (we might consider it 
unrecoverable), we might want the worker to be shut down so that tasks can be 
rebalanced elsewhere.
 * In case of an unrecoverable error (such as a generic `Throwable`), 
"KafkaBasedLog Work Thread" would die, but it need to trigger the worker's 
shutdown (eg. a finally clause to call System.exit) in order to avoid the 
worker's tasks idling after failure. Then the worker lifecycle management (in 
our case, it's mesos) will restart the worker elsewhere

 

  was:
We have a kafka (source) connector that's copying data from some kafka cluster 
to the target cluster. The connector is deployed to a bunch of workers running 
on mesos, thus the lifecycle of the workers are managed by mesos. Workers 
should be recovered by mesos in case of failure, and then source tasks will 
rely on kafka connect's KafkaOffsetBackingStore to recover the offsets to 
proceed.

Recently we witness some unrecoverable situation, though: worker is not doing 
anything after some network reset on the host where the worker is running. More 
specifically, it seems that the kafka connect tasks' on that worker stop to 
poll source kafka cluster, because the consumers are stuck in a rebalance state.

After some digging, we found that the thread to handle the source task offset 
recovery is dead, which makes the all rebalancing tasks stuck in the state of 
reading back the offset. The log we saw in our connect task:
{code:java}
2019-08-12 14:29:28,089 ERROR Unexpected exception in Thread[KafkaBasedLog Work 
Thread - kc_replicator_offsets,5,main] 
(org.apache.kafka.connect.util.KafkaBasedLog)
org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times 
in 30001ms{code}
As far as I can see 
([https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L339]),
 the thread will be dead in case of error, while the worker is still alive, 
which means a worker without the thread to recover offset thus all tasks on 
that worker are not recoverable and will stuck in case of failure.

 

Solution to fix this issue will ideally either of the following:
 * Make the KafkaBasedLog Work Thread recoverable from error
 * Or KafkaBasedLog Work Thread death should make the worker exit (a finally 
clause to call System.exit), then the worker lifecycle management (in our case, 
it's mesos) will restart the worker elsewhere

 


> [kafka-connect] KafkaBaseLog.WorkThread not recoverable
> -------------------------------------------------------
>
>                 Key: KAFKA-8790
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8790
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Qinghui Xu
>            Priority: Major
>
> We have a kafka (source) connector that's copying data from some kafka 
> cluster to the target cluster. The connector is deployed to a bunch of 
> workers running on mesos, thus the lifecycle of the workers are managed by 
> mesos. Workers should be recovered by mesos in case of failure, and then 
> source tasks will rely on kafka connect's KafkaOffsetBackingStore to recover 
> the offsets to proceed.
> Recently we witness some unrecoverable situation, though: worker is not doing 
> anything after some network reset on the host where the worker is running. 
> More specifically, it seems that the kafka connect tasks' on that worker stop 
> to poll source kafka cluster, because the consumers are stuck in a rebalance 
> state.
> After some digging, we found that the thread to handle the source task offset 
> recovery is dead, which makes the all rebalancing tasks stuck in the state of 
> reading back the offset. The log we saw in our connect task:
> {code:java}
> 2019-08-12 14:29:28,089 ERROR Unexpected exception in Thread[KafkaBasedLog 
> Work Thread - kc_replicator_offsets,5,main] 
> (org.apache.kafka.connect.util.KafkaBasedLog)
> org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by 
> times in 30001ms{code}
> As far as I can see 
> ([https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L339]),
>  the thread will be dead in case of error, while the worker is still alive, 
> which means a worker without the thread to recover offset thus all tasks on 
> that worker are not recoverable and will stuck in case of failure.
>  
> Solution to fix this issue will ideally the following:
>  * Make the KafkaBasedLog Work Thread recover from some recoverable errors 
> such as `TimeoutException` or more generally `RetriableException` (maybe 
> `KafkaException` is too generic to handle, and some of them might be 
> unrecoverable)
>  * If several consecutive retries in the first case fail (we might consider 
> it unrecoverable), we might want the worker to be shut down so that tasks can 
> be rebalanced elsewhere.
>  * In case of an unrecoverable error (such as a generic `Throwable`), 
> "KafkaBasedLog Work Thread" would die, but it need to trigger the worker's 
> shutdown (eg. a finally clause to call System.exit) in order to avoid the 
> worker's tasks idling after failure. Then the worker lifecycle management (in 
> our case, it's mesos) will restart the worker elsewhere
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to