[ 
https://issues.apache.org/jira/browse/KAFKA-12270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17277520#comment-17277520
 ] 

Randall Hauch commented on KAFKA-12270:
---------------------------------------

I don't think it's because the exception is retriable, because the 
`TopicAdmin.createTopics(...)` method explicitly catches a 
`TopicExistsException` and simply returns a response without the topic name, 
signaling that the topic was not created. I think the response is adequate to 
know whether my request created the topic (or topics), but it's less than ideal 
if the caller simply wants to know whether the topic "was created or already 
existed".

We can't easily change the response, but we could potentially add an overloaded 
method that takes a flag as to whether existing topics should be included in 
the response. I'm just not sure that's worth it. WDYT?

BTW, I've added a PR that keeps the TopicAdmin methods the same and instead 
just re-describes the topic. This should be an infrequent occurrence, but I'm 
happy to eliminate the re-describe if you think that's a better approach.

> Kafka Connect may fail a task when racing to create topic
> ---------------------------------------------------------
>
>                 Key: KAFKA-12270
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12270
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.6.0, 2.7.0, 2.8.0
>            Reporter: Randall Hauch
>            Assignee: Randall Hauch
>            Priority: Critical
>
> When a source connector configured with many tasks and to use the new topic 
> creation feature is run, it is possible that multiple tasks will attempt to 
> write to the same topic, will see that the topic does not exist, and then 
> race to create the topic. The topic is only created once, but some tasks 
> might fail with:
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Task failed to create new 
> topic (name=TOPICX, numPartitions=8, replicationFactor=3, 
> replicasAssignments=null, configs={cleanup.policy=delete}). Ensure that the 
> task is authorized to create topics or that the topic exists and restart the 
> task
>   at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.maybeCreateTopic(WorkerSourceTask.java:436)
>   at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:364)
>   at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)
>   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> ... {code}
> The reason appears to be that the WorkerSourceTask throws an exception if the 
> topic creation failed, and does not account for the fact that the topic may 
> have been created between the time the WorkerSourceTask lists existing topics 
> and tries to create the topic.
>  
> See in particular: 
> https://github.com/apache/kafka/blob/5c562efb2d76407011ea88c1ca1b2355079935bc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L415-L423



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to