[ 
https://issues.apache.org/jira/browse/KAFKA-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17034006#comment-17034006
 ] 

Chris Egerton commented on KAFKA-9374:
--------------------------------------

Hi [~tombentley], sorry for the delay in reply.

 

After some thought, it seems that setting a timeout on interactions with 
connectors but keeping those actions synchronous within the herder's tick 
method isn't really a viable approach. There doesn't seem to be a good value to 
use for that timeout; if it's too conservative it may be impossible to start 
some connectors that have to do heavy-duty initialization on startup, and if 
it's too liberal there will still be the original problem (for however long the 
timeout is) of the worker being effectively disabled during that period, and 
potentially even dropping out of the group due.

Instead, in [https://github.com/apache/kafka/pull/8069], I've made changes to 
make most connector interactions (specifically, calls to the start, {{stop}}, 
{{config}}, {{validate}}, and {{initialize}} methods) completely asynchronous 
and handle any follow-up logic via callback. In the {{DistributedHerder}} 
class, this callback adds a new herder request to the queue, which helps keep 
the class thread-safe and preserves some of the guarantees provided by the 
current {{tick}} model.

Unfortunately, this means that status tracking for connectors becomes... 
difficult. If we don't establish a timeout for any of our connector 
interactions, we also then don't have a good metric for know if/when to update 
the status of a connector to {{FAILED}}. At this point, the best we may be able 
to do is include log messages detailing when certain connector interactions are 
scheduled, and when those interactions are complete. That should at least 
provide a decent method for diagnosing via log files whether a connector is 
blocking and effectively a zombie. In the future, a KIP may be warranted for 
adding a new metric to track the number and types of zombie connectors/tasks.

This also still leaves the door open for zombie thread creation; any connector 
that blocks in any of the aforementioned methods will still be taking up a 
thread until/unless it returns control to the framework.

> Worker can be disabled by blocked connectors
> --------------------------------------------
>
>                 Key: KAFKA-9374
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9374
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1
>            Reporter: Chris Egerton
>            Assignee: Chris Egerton
>            Priority: Major
>
> If a connector hangs during any of its {{initialize}}, {{start}}, {{stop}}, 
> \{taskConfigs}}, {{taskClass}}, {{version}}, {{config}}, or {{validate}} 
> methods, the worker will be disabled for some types of requests thereafter, 
> including connector creation, connector reconfiguration, and connector 
> deletion.
>  -This only occurs in distributed mode and is due to the threading model used 
> by the 
> [DistributedHerder|https://github.com/apache/kafka/blob/03f763df8a8d9482d8c099806336f00cf2521465/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java]
>  class.- This affects both distributed and standalone mode. Distributed 
> herders perform some connector work synchronously in their {{tick}} thread, 
> which also handles group membership and some REST requests. The majority of 
> the herder methods for the standalone herder are {{synchronized}}, including 
> those for creating, updating, and deleting connectors; as long as one of 
> those methods blocks, all subsequent calls to any of these methods will also 
> be blocked.
>  
> One potential solution could be to treat connectors that fail to start, stop, 
> etc. in time similarly to tasks that fail to stop within the [task graceful 
> shutdown timeout 
> period|https://github.com/apache/kafka/blob/03f763df8a8d9482d8c099806336f00cf2521465/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L121-L126]
>  by handling all connector interactions on a separate thread, waiting for 
> them to complete within a timeout, and abandoning the thread (and 
> transitioning the connector to the {{FAILED}} state, if it has been created 
> at all) if that timeout expires.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to