[ 
https://issues.apache.org/jira/browse/KAFKA-12270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17277633#comment-17277633
 ] 

Randall Hauch commented on KAFKA-12270:
---------------------------------------

Ok, I didn't really like that the first PR added another admin client call to 
(re)describe the topic in question when the `createTopic(...)` method returned 
false, meaning the topic was not created (because it already existed by the 
time the create topic request was made/received by Kafka).

So, I created an alternative PR that changes how the `TopicAdmin` creates a 
topic and returns precisely which topic names were created AND which were found 
to already exist. This allows the `WorkerSourceTask` to know exactly what 
happend and to log it accordingly.

This new method in `TopicAdmin` is called `createOrFindTopics(...)`, and it is 
the old implementation of `createTopics(...)` with only slight modification; 
the previously-existing `createTopics(...)` and `createTopic(...)` methods were 
changed to delegate to the new method. Thus the behavior of the existing 
methods remains unchanged (for the multiple places where they are called), but 
we get the more precise results in `WorkerSourceTask`.

> Kafka Connect may fail a task when racing to create topic
> ---------------------------------------------------------
>
>                 Key: KAFKA-12270
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12270
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.6.0, 2.7.0, 2.8.0
>            Reporter: Randall Hauch
>            Assignee: Randall Hauch
>            Priority: Critical
>
> When a source connector configured with many tasks and to use the new topic 
> creation feature is run, it is possible that multiple tasks will attempt to 
> write to the same topic, will see that the topic does not exist, and then 
> race to create the topic. The topic is only created once, but some tasks 
> might fail with:
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Task failed to create new 
> topic (name=TOPICX, numPartitions=8, replicationFactor=3, 
> replicasAssignments=null, configs={cleanup.policy=delete}). Ensure that the 
> task is authorized to create topics or that the topic exists and restart the 
> task
>   at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.maybeCreateTopic(WorkerSourceTask.java:436)
>   at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:364)
>   at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)
>   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> ... {code}
> The reason appears to be that the WorkerSourceTask throws an exception if the 
> topic creation failed, and does not account for the fact that the topic may 
> have been created between the time the WorkerSourceTask lists existing topics 
> and tries to create the topic.
>  
> See in particular: 
> https://github.com/apache/kafka/blob/5c562efb2d76407011ea88c1ca1b2355079935bc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L415-L423



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to