[
https://issues.apache.org/jira/browse/KAFKA-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16952395#comment-16952395
]
Chris Egerton commented on KAFKA-8370:
--------------------------------------
Looks like https://issues.apache.org/jira/browse/KAFKA-8125 was duplicated
later on by https://issues.apache.org/jira/browse/KAFKA-8875, which has
actually already been resolved. So at least on versions that have the fix for
8875, the InvalidReplicationFactorException shouldn't be an issue for Connect.
> Kafka Connect should check for existence of internal topics before attempting
> to create them
> --------------------------------------------------------------------------------------------
>
> Key: KAFKA-8370
> URL: https://issues.apache.org/jira/browse/KAFKA-8370
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 0.11.0.0
> Reporter: Randall Hauch
> Assignee: Randall Hauch
> Priority: Major
>
> The Connect worker doesn't current check for the existence of the internal
> topics, and instead is issuing a CreateTopic request and handling a
> TopicExistsException. However, this can cause problems when the number of
> brokers is fewer than the replication factor, *even if the topic already
> exists* and the partitions of those topics all remain available on the
> remaining brokers.
> One problem of the current approach is that the broker checks the requested
> replication factor before checking for the existence of the topic, resulting
> in unexpected exceptions when the topic does exist:
> {noformat}
> connect | [2019-05-14 19:24:25,166] ERROR Uncaught exception in herder
> work thread, exiting:
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> connect | org.apache.kafka.connect.errors.ConnectException: Error while
> attempting to create/find topic(s) 'connect-offsets'
> connect | at
> org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
> connect | at
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99)
> connect | at
> org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:127)
> connect | at
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109)
> connect | at
> org.apache.kafka.connect.runtime.Worker.start(Worker.java:164)
> connect | at
> org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:114)
> connect | at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:214)
> connect | at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> connect | at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> connect | at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> connect | at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> connect | at java.lang.Thread.run(Thread.java:748)
> connect | Caused by: java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication
> factor: 3 larger than available brokers: 2.
> connect | at
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> connect | at
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> connect | at
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> connect | at
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> connect | at
> org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228)
> connect | ... 11 more
> connect | Caused by:
> org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication
> factor: 3 larger than available brokers: 2.
> connect | [2019-05-14 19:24:25,168] INFO Kafka Connect stopping
> (org.apache.kafka.connect.runtime.Connect)
> {noformat}
> Instead of always issuing a CreateTopic request, the worker's admin client
> should first check whether the topic exists, and if not *then* attempt to
> create the topic.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)