[
https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sergio Troiano updated KAFKA-16015:
-----------------------------------
Affects Version/s: 3.5.1
> kafka-leader-election timeout values always overwritten by default values
> --------------------------------------------------------------------------
>
> Key: KAFKA-16015
> URL: https://issues.apache.org/jira/browse/KAFKA-16015
> Project: Kafka
> Issue Type: Bug
> Components: admin, tools
> Affects Versions: 3.5.1, 3.6.1
> Reporter: Sergio Troiano
> Assignee: Sergio Troiano
> Priority: Minor
>
> Using the *kafka-leader-election.sh* I was getting random timeouts like these:
> {code:java}
> Error completing leader election (PREFERRED) for partition:
> sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The
> request timed out.
> Error completing leader election (PREFERRED) for partition:
> __CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException:
> The request timed out.
> Error completing leader election (PREFERRED) for partition:
> __KafkaCruiseControlModelTrainingSamples-18:
> org.apache.kafka.common.errors.TimeoutException: The request timed out.
> Error completing leader election (PREFERRED) for partition:
> __KafkaCruiseControlPartitionMetricSamples-8:
> org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
> These timeouts were raised from the client side as the controller always
> finished with all the Kafka leader elections.
> One pattern I detected was always the timeouts were raised after about 15
> seconds.
>
> So i checked this command has an option to pass configurations
> {code:java}
> Option Description
> ------ -----------
> --admin.config <String: config file> Configuration properties files to pass
> to the admin client {code}
> I created the file in order to increment the values of *request.timeout.ms*
> and *default.api.timeout.ms.* So even after these changes the timeouts were
> raising after 15 seconds.
> So I checked the source code and I came across with a bug, no matter the
> value we pass to the timeouts the default values were ALWAYS overwriting them.
>
> This is the[3.6
> branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
> {code:java}
> object LeaderElectionCommand extends Logging {
> def main(args: Array[String]): Unit = {
> run(args, 30.second)
> } def run(args: Array[String], timeout: Duration): Unit = {
> val commandOptions = new LeaderElectionCommandOptions(args)
> CommandLineUtils.maybePrintHelpOrVersion(
> commandOptions,
> "This tool attempts to elect a new leader for a set of topic
> partitions. The type of elections supported are preferred replicas and
> unclean replicas."
> ) validate(commandOptions) val electionType =
> commandOptions.options.valueOf(commandOptions.electionType) val
> jsonFileTopicPartitions =
> Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map {
> path =>
> parseReplicaElectionData(Utils.readFileAsString(path))
> } val singleTopicPartition = (
> Option(commandOptions.options.valueOf(commandOptions.topic)),
> Option(commandOptions.options.valueOf(commandOptions.partition))
> ) match {
> case (Some(topic), Some(partition)) => Some(Set(new
> TopicPartition(topic, partition)))
> case _ => None
> } /* Note: No need to look at --all-topic-partitions as we want this
> to be None if it is use.
> * The validate function should be checking that this option is required
> if the --topic and --path-to-json-file
> * are not specified.
> */
> val topicPartitions =
> jsonFileTopicPartitions.orElse(singleTopicPartition) val adminClient = {
> val props =
> Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map
> { config =>
> Utils.loadProps(config)
> }.getOrElse(new Properties()) props.setProperty(
> AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
> commandOptions.options.valueOf(commandOptions.bootstrapServer)
> )
> props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG,
> timeout.toMillis.toString)
> props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,
> (timeout.toMillis / 2).toString) Admin.create(props)
> } {code}
> As we can see the default timeout is 30 seconds, and the request timeout is
> 30/2 which validate the 15 seconds timeout.
> Also we can see in the code how the custom values passed by the config file
> are overwritten by the defaults.
>
>
> The proposal is easy, we need to use the defaults values only when the
> timeouts were not defined by the config file, for example like this:
> {code:java}
> if
> (!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) {
> props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG,
> timeout.toMillis.toString)
> }
> if (!props.containsKey(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)) {
> props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,
> (timeout.toMillis / 2).toString)
> } {code}
>
> I tested it and now I am able to modify the timeouts and make my application
> to catch the result of the command properly.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)