[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module
fvaleri commented on code in PR #13122: URL: https://github.com/apache/kafka/pull/13122#discussion_r1073197282 ## tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java: ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class LogDirsCommand { +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String[] args) throws ExecutionException, InterruptedException, IOException { +ArgumentParser parser = ArgumentParsers Review Comment: Yes, I'll open a dedicated PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
satishd commented on PR #13040: URL: https://github.com/apache/kafka/pull/13040#issuecomment-1386596236 @ijuma It looks like the changes that were done in the conversions were accidentally dropped in my local repo while doing a few rebases in my trunk. I should have checked that before pushing to PR and pinging for review. Sorry about that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia opened a new pull request, #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store
vcrfxia opened a new pull request, #13126: URL: https://github.com/apache/kafka/pull/13126 [KIP-889](https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores) proposed the introduction of versioned key-value stores, as well as a RocksDB-based implementation. The RocksDB implementation will consist of a "latest value store" for storing the latest record version associated with each key, in addition to multiple "segment stores" to store older record versions. Within a segment store, multiple record versions for the same key will be combined into a single bytes array "value" associated with the key and stored to RocksDB. This PR introduces the utility class that will be used to manage the value format of these segment stores, i.e., how multiple record versions for the same key will be combined into a single bytes array "value." Follow-up PRs will introduce the versioned store implementation itself (which calls heavily upon this utility class). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
ijuma commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1071365497 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -2104,7 +2105,7 @@ object UnifiedLog extends Logging { // (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state // from the first segment. if (recordVersion.value < RecordBatch.MAGIC_VALUE_V2 || - (producerStateManager.latestSnapshotOffset.isEmpty && reloadFromCleanShutdown)) { + (producerStateManager.latestSnapshotOffset.asScala.isEmpty && reloadFromCleanShutdown)) { // To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the Review Comment: Can we call !isPresent instead of asScala.isEmpty? ## core/src/test/scala/unit/kafka/log/LogLoaderTest.scala: ## @@ -557,7 +558,7 @@ class LogLoaderTest { _topicId = None, keepPartitionMetadataFile = true) -verify(stateManager).removeStraySnapshots(any[Seq[Long]]) + verify(stateManager).removeStraySnapshots((any[java.util.List[java.lang.Long]])) Review Comment: Are the extra parenthesis around `any` needed? We have a few similar examples in this file. ## core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala: ## @@ -340,8 +342,7 @@ class ProducerStateManagerTest { // After reloading from the snapshot, the transaction should still be considered late val reloadedStateManager = new ProducerStateManager(partition, logDir, maxTransactionTimeoutMs, producerStateManagerConfig, time) -reloadedStateManager.truncateAndReload(logStartOffset = 0L, - logEndOffset = stateManager.mapEndOffset, currentTimeMs = time.milliseconds()) +reloadedStateManager.truncateAndReload(0L,stateManager.mapEndOffset, time.milliseconds()) Review Comment: Nit: space missing after `,`. ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManagerConfig.java: ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Collections; +import java.util.Set; + +public class ProducerStateManagerConfig { +public static final Set RECONFIGURABLE_CONFIGS = Collections.singleton("producer.id.expiration.ms"); +private volatile int producerIdExpirationMs; + +public ProducerStateManagerConfig(int producerIdExpirationMs) { +this.producerIdExpirationMs = producerIdExpirationMs; +} + +public void updateProducerIdExpirationMs(int producerIdExpirationMs) { Review Comment: Nit: use `set` instead of `update`? ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -680,20 +680,20 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock synchronized { -producerStateManager.activeProducers.map { case (producerId, producerIdEntry) => - (producerId, producerIdEntry.lastSeq) +producerStateManager.activeProducers.asScala.map { case (producerId, producerIdEntry) => + (producerId.toLong, producerIdEntry.lastSeq) } - } + }.toMap Review Comment: Can we avoid this `toMap` copy? Same for the case a few lines below. ## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ## @@ -652,7 +652,7 @@ class TransactionsTest extends IntegrationTestHarness { producer.commitTransaction() var producerStateEntry = -brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.head._2 +brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.asScala.head._2 Review Comment: Can we use `.get(0)` instead of `asScala.head`? There's one other similar example in this file. ## core/src/test/scala/unit/kafka/log/LogTestUtils.scala: ## @@ -247,7 +246,7 @@ object LogTestUtils { } def listProducerSnapshotOffsets(logDir: File): Seq[Long] = -
[GitHub] [kafka] ijuma commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
ijuma commented on PR #13078: URL: https://github.com/apache/kafka/pull/13078#issuecomment-1386277046 @jolshan From what you're saying, there are some thread safety bugs, but their impact is likely minor and hence why we haven't noticed them. From my perspective, we really need to document the concurrency model for this class more clearly and the class would be more robust if we made it thread safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14302) Infinite probing rebalance if a changelog topic got emptied
[ https://issues.apache.org/jira/browse/KAFKA-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-14302: Priority: Critical (was: Major) > Infinite probing rebalance if a changelog topic got emptied > --- > > Key: KAFKA-14302 > URL: https://issues.apache.org/jira/browse/KAFKA-14302 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.1 >Reporter: Damien Gasparina >Priority: Critical > Fix For: 3.5.0 > > Attachments: image-2022-10-14-12-04-01-190.png, logs.tar.gz2 > > > If a store, with a changelog topic, has been fully emptied, it could generate > infinite probing rebalance. > > The scenario is the following: > * A Kafka Streams application, deployed on many instances, have a store with > a changelog > * Many entries are pushed into the changelog, thus the Log end Offset is > high, let's say 20,000 > * Then, the store got emptied, either due to data retention (windowing) or > tombstone > * Then an instance of the application is restarted, and its local disk is > deleted (e.g. Kubernetes without Persistent Volume) > * After restart, the application restores the store from the changelog, but > does not write a checkpoint file as there are no data > * As there are no checkpoint entries, this instance specify a taskOffsetSums > with offset set to 0 in the subscriptionUserData > * The group leader, during the assignment, then compute a lag of 20,000 (end > offsets - task offset), which is greater than the default acceptable lag, > thus decide to schedule a probing rebalance > * In ther next probing rebalance, nothing changed, so... new probing > rebalance > > I was able to reproduce locally with a simple topology: > > {code:java} > var table = streamsBuilder.stream("table"); > streamsBuilder > .stream("stream") > .join(table, (eSt, eTb) -> eSt.toString() + eTb.toString(), > JoinWindows.of(Duration.ofSeconds(5))) > .to("output");{code} > > > > Due to this issue, application having an empty changelog are experiencing > frequent rebalance: > !image-2022-10-14-12-04-01-190.png! > > With assignments similar to: > {code:java} > [hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3] INFO > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - > stream-thread > [hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3-consumer] > Assigned tasks [0_5, 0_4, 0_3, 0_2, 0_1, 0_0] including stateful [0_5, 0_4, > 0_3, 0_2, 0_1, 0_0] to clients as: > d0e2d556-2587-48e8-b9ab-43a4e8207be6=[activeTasks: ([]) standbyTasks: ([0_0, > 0_1, 0_2, 0_3, 0_4, 0_5])] > 8323d214-4c56-470f-bace-e4291cdf10eb=[activeTasks: ([0_0, 0_1, 0_2, 0_3, 0_4, > 0_5]) standbyTasks: ([])].{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation
jolshan commented on code in PR #13107: URL: https://github.com/apache/kafka/pull/13107#discussion_r1072903070 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -390,7 +390,7 @@ class ReplicaManager(val config: KafkaConfig, // epoch, a sentinel value (NoEpoch) is used and bypass the epoch validation. if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete || requestLeaderEpoch == LeaderAndIsr.NoEpoch || - requestLeaderEpoch > currentLeaderEpoch) { + requestLeaderEpoch >= currentLeaderEpoch) { Review Comment: Seems like the concern there was reassignment. I think equal to the leader epoch is ok though because if we are a replica for the current leader epoch then we can't send a stop replica unless the reassignment was cancelled? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation
jolshan commented on code in PR #13107: URL: https://github.com/apache/kafka/pull/13107#discussion_r1072900020 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -390,7 +390,7 @@ class ReplicaManager(val config: KafkaConfig, // epoch, a sentinel value (NoEpoch) is used and bypass the epoch validation. if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete || requestLeaderEpoch == LeaderAndIsr.NoEpoch || - requestLeaderEpoch > currentLeaderEpoch) { + requestLeaderEpoch >= currentLeaderEpoch) { Review Comment: Looks like this is part of KIP-570. I will take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation
jolshan commented on code in PR #13107: URL: https://github.com/apache/kafka/pull/13107#discussion_r1072895900 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -390,7 +390,7 @@ class ReplicaManager(val config: KafkaConfig, // epoch, a sentinel value (NoEpoch) is used and bypass the epoch validation. if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete || requestLeaderEpoch == LeaderAndIsr.NoEpoch || - requestLeaderEpoch > currentLeaderEpoch) { + requestLeaderEpoch >= currentLeaderEpoch) { Review Comment: Just curious -- was the epoch check put in place because we were concerned about stale stop replicas? Just trying to figure out why we need it and the implications for adding the current epoch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation
jolshan commented on code in PR #13107: URL: https://github.com/apache/kafka/pull/13107#discussion_r1072893785 ## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ## @@ -1971,16 +1971,22 @@ object TestUtils extends Logging { ) } + def currentIsr(admin: Admin, partition: TopicPartition): Set[Int] = { +val description = admin.describeTopics(Set(partition.topic).asJava) + .allTopicNames + .get + .asScala Review Comment: nit: can we put a new line here to distinguish the two a bit more? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
jolshan commented on PR #13078: URL: https://github.com/apache/kafka/pull/13078#issuecomment-1386151839 @divijvaidya I'm also a bit confused by this comment: > In cases when we have just one (or two) producers, this metric would be highly unreliable (not just stale) as it provides an "approximation" of size(). It is not un-common to produce data from limited set of producers (with a large number of consumers) and hence, I would incline towards sticking to current approach of keeping this metric accurate. We will keep the IDs of any producer used within the last day or so. If there are two producers running in a steady state, the metric will stay steady. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
jolshan commented on PR #13078: URL: https://github.com/apache/kafka/pull/13078#issuecomment-1386149959 > Has anyone checked that we always acquire a lock when we call methods like activeProducers and isEmpty? I wonder if this class has thread safety bugs. @ijuma I actually did take a peak when working on KIP-890 and other producer ID fun. There is a lock on every usage of active producers (except its usage in a log message). As for isEmpty, there are two usages (one in UnifiedLog -- to rebuild the log and one in LogLoader) and neither is protected via lock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
jolshan commented on PR #12972: URL: https://github.com/apache/kafka/pull/12972#issuecomment-1386129325 I noticed that we don't consider the stability of responses. (I assume though that if the request is unstable, the response is too.) Just curious if there are any potential gaps with this approach and the reasoning behind only the request stability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
jolshan commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1072868862 ## core/src/main/scala/kafka/server/ApiVersionManager.scala: ## @@ -86,14 +100,7 @@ class DefaultApiVersionManager( finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava, finalizedFeatures.epoch, controllerApiVersions.orNull, -listenerType) - } - - override def enabledApis: collection.Set[ApiKeys] = { Review Comment: Do we no longer need these methods? Or are they just handled elsewhere? (Ie, ApiKeys) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13117: KAFKA-14621 Disallow authorizers during ZK migration
cmccabe commented on PR #13117: URL: https://github.com/apache/kafka/pull/13117#issuecomment-1386112450 merged to 3.4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
jolshan commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1072862469 ## clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json: ## @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 68, + "type": "response", + "name": "ConsumerGroupHeartbeatResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED + // - NOT_COORDINATOR + // - COORDINATOR_NOT_AVAILABLE + // - COORDINATOR_LOAD_IN_PROGRESS + // - INVALID_REQUEST + // - UNKNOWN_MEMBER_ID + // - FENCED_MEMBER_EPOCH + // - UNSUPPORTED_ASSIGNOR + // - UNRELEASED_INSTANCE_ID + // - GROUP_MAX_SIZE_REACHED Review Comment: You are referring to GROUP_MAX_SIZE_REACHED? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
jolshan commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1072861634 ## clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json: ## @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 68, + "type": "response", + "name": "ConsumerGroupHeartbeatResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED + // - NOT_COORDINATOR + // - COORDINATOR_NOT_AVAILABLE + // - COORDINATOR_LOAD_IN_PROGRESS + // - INVALID_REQUEST + // - UNKNOWN_MEMBER_ID + // - FENCED_MEMBER_EPOCH + // - UNSUPPORTED_ASSIGNOR + // - UNRELEASED_INSTANCE_ID + // - GROUP_MAX_SIZE_REACHED + "fields": [ +{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, +{ "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or 0 if there was no error" }, +{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." }, +{ "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The member id generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." }, +{ "name": "MemberEpoch", "type": "int32", "versions": "0+", + "about": "The member epoch." }, +{ "name": "ShouldComputeAssignment", "type": "bool", "versions": "0+", + "about": "True if the member should compute the assignment for the group." }, +{ "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+", + "about": "The heartbeat interval in milliseconds." }, +{ "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "null if not provided; the assignment otherwise.", "fields": [ + { "name": "Error", "type": "int8", "versions": "0+", +"about": "The assigned error." }, + { "name": "AssignedTopicPartitions", "type": "[]TopicPartitions", "versions": "0+", +"about": "The partitions assigned to the member that can be used immediately." }, + { "name": "PendingTopicPartitions", "type": "[]TopicPartitions", "versions": "0+", +"about": "The partitions assigned to the member that cannot be used because they are not released by their former owners yet." }, + { "name": "MetadataVersion", "type": "int16", "versions": "0+", +"about": "The version of the metadata." }, + { "name": "MetadataBytes", "type": "bytes", "versions": "0+", +"about": "The assigned metadata." } +]} + ], + "commonStructs": [ Review Comment: See TopicPartitions used in line 52, 54. If we didn't have the common struct, we'd have to define the name and the nested fields each time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
jolshan commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1072860670 ## clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json: ## @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 68, + "type": "response", + "name": "ConsumerGroupHeartbeatResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED + // - NOT_COORDINATOR + // - COORDINATOR_NOT_AVAILABLE + // - COORDINATOR_LOAD_IN_PROGRESS + // - INVALID_REQUEST + // - UNKNOWN_MEMBER_ID + // - FENCED_MEMBER_EPOCH + // - UNSUPPORTED_ASSIGNOR + // - UNRELEASED_INSTANCE_ID + // - GROUP_MAX_SIZE_REACHED + "fields": [ +{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, +{ "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or 0 if there was no error" }, +{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." }, +{ "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The member id generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." }, +{ "name": "MemberEpoch", "type": "int32", "versions": "0+", + "about": "The member epoch." }, +{ "name": "ShouldComputeAssignment", "type": "bool", "versions": "0+", + "about": "True if the member should compute the assignment for the group." }, +{ "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+", + "about": "The heartbeat interval in milliseconds." }, +{ "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "null if not provided; the assignment otherwise.", "fields": [ + { "name": "Error", "type": "int8", "versions": "0+", +"about": "The assigned error." }, + { "name": "AssignedTopicPartitions", "type": "[]TopicPartitions", "versions": "0+", +"about": "The partitions assigned to the member that can be used immediately." }, + { "name": "PendingTopicPartitions", "type": "[]TopicPartitions", "versions": "0+", +"about": "The partitions assigned to the member that cannot be used because they are not released by their former owners yet." }, + { "name": "MetadataVersion", "type": "int16", "versions": "0+", +"about": "The version of the metadata." }, + { "name": "MetadataBytes", "type": "bytes", "versions": "0+", +"about": "The assigned metadata." } +]} + ], + "commonStructs": [ Review Comment: Common structs can be used multiple times when writing the json. It prevents repeated text in the fields. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on PR #13112: URL: https://github.com/apache/kafka/pull/13112#issuecomment-1386095725 Took a first pass. I think the one thing that is tricky is the conversions between Java and scala. It may not be avoidable though. I'll take another pass soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13084: KAFKA-14598: Fix flaky ConnectRestApiTest
gharris1727 commented on PR #13084: URL: https://github.com/apache/kafka/pull/13084#issuecomment-1386092163 Thanks @ashwinpankaj for following up, I think that this is good after one last nit comment. > To test this theory, in my latest revision I have set retry_on_exc to True in start_and_wait_to_start_listening(). I also added a 40 second sleep in [RestServer.initializeResources()](https://github.com/ashwinpankaj/kafka/blob/568c443a3ec31fd682133620cb38fdefcfe0b82f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java#L294). ConnectRestApiTests passed inspite of the delay. I was going to suggest something like this, thanks for verifying that the fix actually stabilizes the test! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13084: KAFKA-14598: Fix flaky ConnectRestApiTest
gharris1727 commented on code in PR #13084: URL: https://github.com/apache/kafka/pull/13084#discussion_r1072856520 ## tests/kafkatest/tests/connect/connect_rest_test.py: ## @@ -90,7 +90,8 @@ def test_rest_api(self, connect_protocol, metadata_quorum): self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) self.cc.set_external_configs(lambda node: self.render("connect-file-external.properties", node=node)) -self.cc.start() +self.logger.info("Waiting till Connect REST server is listening") +self.cc.start(mode=ConnectServiceBase.STARTUP_MODE_LISTEN) Review Comment: I think this is unnecessary for the stabilization fix, and actually weakens the test. Because this is actually creating the connectors in distributed mode, I think it would be smart to wait for the cluster to actually join the cluster. So we can revert these two lines and leave it as it was. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1072855094 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -310,9 +307,9 @@ class KafkaApis(val requestChannel: RequestChannel, if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME && partitionState.deletePartition) { val leaderEpoch = if (partitionState.leaderEpoch >= 0) - Some(partitionState.leaderEpoch) Review Comment: Ah I see the interface is java. Are we planning to move everything to Java though? It's a bit tricky since the GroupCoordinatorAdaptor was done in scala. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1072842347 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -357,8 +354,9 @@ class KafkaApis(val requestChannel: RequestChannel, new UpdateMetadataResponse(new UpdateMetadataResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))) } else { val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest) - if (deletedPartitions.nonEmpty) -groupCoordinator.handleDeletedPartitions(deletedPartitions, requestLocal) + if (deletedPartitions.nonEmpty) { +groupCoordinator.onPartitionsDeleted(deletedPartitions.asJava, requestLocal.bufferSupplier) Review Comment: I see in this method we convert back to scala. Is there a way to avoid? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1072840736 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -310,9 +307,9 @@ class KafkaApis(val requestChannel: RequestChannel, if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME && partitionState.deletePartition) { val leaderEpoch = if (partitionState.leaderEpoch >= 0) - Some(partitionState.leaderEpoch) Review Comment: What was the reasoning for this change? Are we trying to move over to java? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1072839205 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ## @@ -511,4 +532,57 @@ class GroupCoordinatorAdapter( future } + + override def partitionFor(groupId: String): Int = { Review Comment: Curious about these overrides -- did the defaults not work and it was ok before because we were only using the adapter and not using it as the group coordinator? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
jolshan commented on code in PR #13112: URL: https://github.com/apache/kafka/pull/13112#discussion_r1072836630 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -282,8 +283,12 @@ class BrokerServer( tokenManager.startup() // does nothing, we just need a token manager in order to compile right now... // Create group coordinator, but don't start it until we've started replica manager. - // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue Review Comment: Did we decide to remove the Streams tests comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #13117: KAFKA-14621 Disallow authorizers during ZK migration
mumrah merged PR #13117: URL: https://github.com/apache/kafka/pull/13117 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
jeffkbkim commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1072528536 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -619,6 +619,9 @@ object KafkaConfig { val PasswordEncoderKeyLengthProp = "password.encoder.key.length" val PasswordEncoderIterationsProp = "password.encoder.iterations" + /** Internal Configurations **/ + val UnreleasedApisEnableProd = "unreleased.apis.enable" Review Comment: should this be Prop? ## clients/src/main/java/org/apache/kafka/common/protocol/Errors.java: ## @@ -372,7 +375,10 @@ public enum Errors { FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new), INELIGIBLE_REPLICA(107, "The new ISR contains at least one ineligible replica.", IneligibleReplicaException::new), NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated the partition state but the leader has changed.", NewLeaderElectedException::new), -OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to tiered storage.", OffsetMovedToTieredStorageException::new); +OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to tiered storage.", OffsetMovedToTieredStorageException::new), +FENCED_MEMBER_EPOCH(110, "The member epoch is fenced by the group coordinator. The member must abandon all its partitions and rejoins.", FencedMemberEpochException::new), +UNRELEASED_INSTANCE_ID(111, "The instance ID is still used by another member in the consumer group. That member must leave first.", UnreleasedInstanceIdException::new), +UNSUPPORTED_ASSIGNOR(112, "The assignor used by the member or its version range are not supported by the consumer group.", UnsupportedAssignorException::new); Review Comment: nit: is not how's "The selected assignor or its version range is not supported by the consumer group."? ## core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala: ## @@ -68,14 +68,18 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { } finally socket.close() } - def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, listenerName: ListenerName = cluster.clientListener()): Unit = { + def validateApiVersionsResponse( +apiVersionsResponse: ApiVersionsResponse, +listenerName: ListenerName = cluster.clientListener(), +shouldIncludeUnreleasedApi: Boolean = false Review Comment: nit: Apis ## clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json: ## @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 68, + "type": "response", + "name": "ConsumerGroupHeartbeatResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED + // - NOT_COORDINATOR + // - COORDINATOR_NOT_AVAILABLE + // - COORDINATOR_LOAD_IN_PROGRESS + // - INVALID_REQUEST + // - UNKNOWN_MEMBER_ID + // - FENCED_MEMBER_EPOCH + // - UNSUPPORTED_ASSIGNOR + // - UNRELEASED_INSTANCE_ID + // - GROUP_MAX_SIZE_REACHED Review Comment: i noticed this is missing in the KIP. should we include it? ## clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json: ## @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ +
[jira] [Comment Edited] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
[ https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17676546#comment-17676546 ] Alexandre Dupriez edited comment on KAFKA-14139 at 1/17/23 8:25 PM: Hi, [~hachikuji] , thank you for reporting this scenario and the very clear description of the issue. Is this something which is still prioritized and are you welcoming additional contributors on it? was (Author: hangleton): Hi, Jason, thank you for reporting this scenario and the very clear description of the issue. Is this something which is still prioritized and are you welcoming additional contributors on it? > Replaced disk can lead to loss of committed data even with non-empty ISR > > > Key: KAFKA-14139 > URL: https://issues.apache.org/jira/browse/KAFKA-14139 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Major > Fix For: 3.5.0 > > > We have been thinking about disk failure cases recently. Suppose that a disk > has failed and the user needs to restart the disk from an empty state. The > concern is whether this can lead to the unnecessary loss of committed data. > For normal topic partitions, removal from the ISR during controlled shutdown > buys us some protection. After the replica is restarted, it must prove its > state to the leader before it can be added back to the ISR. And it cannot > become a leader until it does so. > An obvious exception to this is when the replica is the last member in the > ISR. In this case, the disk failure itself has compromised the committed > data, so some amount of loss must be expected. > We have been considering other scenarios in which the loss of one disk can > lead to data loss even when there are replicas remaining which have all of > the committed entries. One such scenario is this: > Suppose we have a partition with two replicas: A and B. Initially A is the > leader and it is the only member of the ISR. > # Broker B catches up to A, so A attempts to send an AlterPartition request > to the controller to add B into the ISR. > # Before the AlterPartition request is received, replica B has a hard > failure. > # The current controller successfully fences broker B. It takes no action on > this partition since B is already out of the ISR. > # Before the controller receives the AlterPartition request to add B, it > also fails. > # While the new controller is initializing, suppose that replica B finishes > startup, but the disk has been replaced (all of the previous state has been > lost). > # The new controller sees the registration from broker B first. > # Finally, the AlterPartition from A arrives which adds B back into the ISR > even though it has an empty log. > (Credit for coming up with this scenario goes to [~junrao] .) > I tested this in KRaft and confirmed that this sequence is possible (even if > perhaps unlikely). There are a few ways we could have potentially detected > the issue. First, perhaps the leader should have bumped the leader epoch on > all partitions when B was fenced. Then the inflight AlterPartition would be > doomed no matter when it arrived. > Alternatively, we could have relied on the broker epoch to distinguish the > dead broker's state from that of the restarted broker. This could be done by > including the broker epoch in both the `Fetch` request and in > `AlterPartition`. > Finally, perhaps even normal kafka replication should be using a unique > identifier for each disk so that we can reliably detect when it has changed. > For example, something like what was proposed for the metadata quorum here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes.] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14627) Modernize Connect plugin discovery
Greg Harris created KAFKA-14627: --- Summary: Modernize Connect plugin discovery Key: KAFKA-14627 URL: https://issues.apache.org/jira/browse/KAFKA-14627 Project: Kafka Issue Type: New Feature Components: KafkaConnect Reporter: Greg Harris Assignee: Greg Harris https://cwiki.apache.org/confluence/display/KAFKA/KIP-898%3A+Modernize+Connect+plugin+discovery -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac merged pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface
dajac merged PR #12902: URL: https://github.com/apache/kafka/pull/12902 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation
hachikuji commented on code in PR #13107: URL: https://github.com/apache/kafka/pull/13107#discussion_r1072693482 ## core/src/main/scala/kafka/controller/ControllerChannelManager.scala: ## @@ -436,17 +436,22 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topicPartition: TopicPartition, deletePartition: Boolean): Unit = { -// A sentinel (-2) is used as an epoch if the topic is queued for deletion. It overrides -// any existing epoch. -val leaderEpoch = metadataInstance.leaderEpoch(topicPartition) - brokerIds.filter(_ >= 0).foreach { brokerId => val result = stopReplicaRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty) - val alreadyDelete = result.get(topicPartition).exists(_.deletePartition) + val updatedDeletePartition = deletePartition || result.get(topicPartition).exists(_.deletePartition) + + // A sentinel (-2) is used as an epoch if the replica is to be deleted. + // It overrides any existing epoch. + val leaderEpoch = if (updatedDeletePartition) { Review Comment: Yeah, `DeletePartition` is set whenever the replica should be deleted, which would be the case after cancellation for all adding replicas. It does not necessarily imply topic deletion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
philipnee commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1072654169 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -300,9 +301,13 @@ void runOnce() { try { transactionManager.maybeResolveSequences(); +RuntimeException lastError = transactionManager.lastError(); +if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) { +return; Review Comment: I'm thinking not, because we aren't adding a new producer. @jolshan thoughts? ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -155,7 +155,7 @@ private enum State { private boolean isTransitionValid(State source, State target) { switch (target) { case UNINITIALIZED: -return source == READY; +return source == READY || source == ABORTABLE_ERROR; Review Comment: hmm good point, I guess upon re-initializing (transition from UNINITIALIZED to INITIALIZING state), should we check the previous error to ensure a valid transition? Maybe in `initializeTransactions` we examine the previous error and make the next transition? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation
hachikuji commented on PR #13107: URL: https://github.com/apache/kafka/pull/13107#issuecomment-1385840547 @jolshan @dajac This patch has been updated to loosen the epoch check on the broker side. The original approach seemed a little risky in the case a reassignment is cancelled and resubmitted. It might be possible for a`StopReplica` request corresponding to the cancellation to get ordered after the `LeaderAndIsr` for the resubmitted reassignment. With the loosened check, that would not be possible since a new reassignment would have a leader epoch bump: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L747. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14625) CheckpointFile read and write API consistency
[ https://issues.apache.org/jira/browse/KAFKA-14625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677880#comment-17677880 ] Ismael Juma commented on KAFKA-14625: - Personally I'm not convinced this needs to be changed. > CheckpointFile read and write API consistency > -- > > Key: KAFKA-14625 > URL: https://issues.apache.org/jira/browse/KAFKA-14625 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Satish Duggana >Priority: Major > > ` CheckpointFile` has the below read and write APIs, write expects a > Collection of items, but read returns a List of elements. It is better to > look into these APIs and its usages and see whether consistency can be > brought without introducing any extra collection conversions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14190) Corruption of Topic IDs with pre-2.8.0 ZK admin clients
[ https://issues.apache.org/jira/browse/KAFKA-14190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677877#comment-17677877 ] Justine Olshan commented on KAFKA-14190: ^ +1 to the points above. I think that this was covered in point 1 for recovery, but the ZNode can be directly written to to give it the correct (original) topic ID. This is of course risky as a poorly formatted ZNode will block any topic operations. You can get the old ZNode via: {code:java} get /brokers/topics/{topic}{code} and copying the entire output `output`. In the output, the incorrect topic ID should be replaced with the old one (in partition.metadata file) to get `modified_output`. Then the ZNode can be set via : {code:java} set /brokers/topics/{topic} modified_output{code} Then the controller needs to be bounced via deleteall /controller and the affected brokers (ones receiving the log messages) also need a restart. Alternatively, shutting down the broker and deleting the paritition.metadata file will recreate the file with the new ID and allow processing to continue. > Corruption of Topic IDs with pre-2.8.0 ZK admin clients > --- > > Key: KAFKA-14190 > URL: https://issues.apache.org/jira/browse/KAFKA-14190 > Project: Kafka > Issue Type: Bug > Components: admin, core, zkclient >Affects Versions: 2.8.0, 3.1.0, 2.8.1, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.2.1 >Reporter: Alexandre Dupriez >Assignee: Divij Vaidya >Priority: Major > > h3. Scope > The problem reported below has been verified to occur in Zookeeper mode. It > has not been attempted with Kraft controllers, although it is unlikely to be > reproduced in Kraft mode given the nature of the issue and clients involved. > h3. Problem Description > The ID of a topic is lost when an AdminClient of version < 2.8.0 is used to > increase the number of partitions of that topic for a cluster with version >= > 2.8.0. This results in the controller re-creating the topic IDs upon restart, > eventually conflicting with the topic ID of broker’s {{partition.metadata}} > files in the partition directories of the impacted topic, leading to an > availability loss of the partitions which do not accept leadership / > follower-ship when the topic ID indicated by a {{LeaderAndIsr}} request > differ from their own locally cached ID. > One mitigation post-corruption is to substitute the stale topic ID in the > {{partition.metadata}} files with the new topic ID referenced by the > controller, or alternatively, delete the {{partition.metadata}} file > altogether. This requires a restart of the brokers which are assigned the > partitions of the impacted topic. > h3. Steps to reproduce > 1. Set-up and launch a two-nodes Kafka cluster in Zookeeper mode. > 2. Create a topic e.g. via {{kafka-topics.sh}} > {noformat} > ./bin/kafka-topics.sh --bootstrap-server :9092 --create --topic myTopic > --partitions 2 --replication-factor 2{noformat} > 3. Capture the topic ID using a 2.8.0+ client. > {noformat} > ./kafka/bin/kafka-topics.sh --bootstrap-server :9092 --topic myTopic > --describe > Topic: myTopic TopicId: jKTRaM_TSNqocJeQI2aYOQ PartitionCount: 2 > ReplicationFactor: 2 Configs: segment.bytes=1073741824 > Topic: myTopic Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0,1 > Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1{noformat} > 4. Restart one of the broker. This will make each broker create the > {{partition.metadata}} files in the partition directories since it will > already have loaded the {{Log}} instance in memory. > 5. Using a *pre-2.8.0* client library, run the following command. > {noformat} > ./kafka/bin/kafka-topics.sh --zookeeper :2181 --alter --topic myTopic > --partitions 3{noformat} > 6. Using a 2.8.0+ client library, describe the topic via Zookeeper and notice > the absence of topic ID from the output, where it is otherwise expected. > {noformat} > ./kafka/bin/kafka-topics.sh —zookeeper :2181 —describe —topic myTopic > Topic: myTopic PartitionCount: 3 ReplicationFactor: 2 Configs: > Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1 > Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1 > Topic: myTopic Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0{noformat} > 7. Using a 2.8.0+ client library, describe the topic via a broker endpoint > and notice the topic ID changed. > {noformat} > ./kafka/bin/kafka-topics.sh —bootstrap-server :9093 —describe —topic myTopic > Topic: myTopic TopicId: nI-JQtPwQwGiylMfm8k13w PartitionCount: 3 > ReplicationFactor: 2 Configs: segment.bytes=1073741824 > Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0 > Topic: myTopic Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0 > Topic: myTopic Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0{noformat} > 8. Restart the controller. > 9.
[jira] [Commented] (KAFKA-14625) CheckpointFile read and write API consistency
[ https://issues.apache.org/jira/browse/KAFKA-14625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677875#comment-17677875 ] Sagar Rao commented on KAFKA-14625: --- hey [~satish.duggana] , i am assuming you plan to work on this? Nonetheless, I took a look at the 2 APIs and the places from where it's been called. The main difference that I see is that `CommittedOffsetsFile` functions with a map while the other 2 scala classes `OffsetCheckpointFile` and `LeaderEpochCheckpointFile` interoperate with java lists and scala sequences. So, the main difference is in `CommittedOffsetsFile` which operates with entrySet of the `partitionToConsumedOffsets`. I am thinking if we can maintain another list of TopicPartitionOffsets which gets stores the topic/partitions and the map `partitionToConsumedOffsets` stores the TopicPartitionOffsets object keyed by partition (same key as today). We can keep updating the list as and when the entries are added/removed in the map and when we want to sync, we can pass the List as is. A very crude idea in this java program: {code:java} public class TestEquality { static class TopicPartitionOffsets { Integer tp; Long offset; public TopicPartitionOffsets(Integer tp, Long offset) { this.tp = tp; this.offset = offset; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; TopicPartitionOffsets that = (TopicPartitionOffsets) o; return Objects.equals(tp, that.tp) && Objects.equals(offset, that.offset); } @Override public int hashCode() { return Objects.hash(tp, offset); } @Override public String toString() { return "TopicPartitionOffsets{" + "tp=" + tp + ", offset=" + offset + '}'; } } public static void main(String[] args) { Map partitionToConsumedOffsets = new ConcurrentHashMap<>(); LinkedList topicPartitionOffsets = new LinkedList<>(); TopicPartitionOffsets tp1 = new TopicPartitionOffsets(1, 100L); partitionToConsumedOffsets.put(1, tp1); topicPartitionOffsets.add(tp1); System.out.println("partitionToConsumedOffsets:" + partitionToConsumedOffsets + ", topicPartitionOffsets:" + topicPartitionOffsets); tp1.offset = 200L; System.out.println("partitionToConsumedOffsets:" + partitionToConsumedOffsets + ", topicPartitionOffsets:" + topicPartitionOffsets); TopicPartitionOffsets tp2 = partitionToConsumedOffsets.get(1); tp2.offset = 300L; System.out.println("partitionToConsumedOffsets:" + partitionToConsumedOffsets + ", topicPartitionOffsets:" + topicPartitionOffsets); topicPartitionOffsets.remove(tp2); partitionToConsumedOffsets.remove(1); System.out.println("partitionToConsumedOffsets:" + partitionToConsumedOffsets + ", topicPartitionOffsets:" + topicPartitionOffsets); } } {code} I am using a LinkedList here so that removing from the List becomes easier(adds extra time complexity though). And also, the 2 updates on the data structures should happen in an atomic fashion i.e if one fails the other one doesn't fail. > CheckpointFile read and write API consistency > -- > > Key: KAFKA-14625 > URL: https://issues.apache.org/jira/browse/KAFKA-14625 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Satish Duggana >Priority: Major > > ` CheckpointFile` has the below read and write APIs, write expects a > Collection of items, but read returns a List of elements. It is better to > look into these APIs and its usages and see whether consistency can be > brought without introducing any extra collection conversions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on pull request #13111: KAFKA-14190: Update Zk TopicId from locally stored cache in controller
jolshan commented on PR #13111: URL: https://github.com/apache/kafka/pull/13111#issuecomment-1385825635 What you say makes sense Colin. I do think its a bit tricky to make such a big code change to support folks using older and deprecated tools. I also understand the point of view of the pain this causes though. (It's caused me quite a bit of pain!) I am interested to see if there are any other options here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang closed pull request #11367: MINOR: Do not copy on range for in-memory shared store in stream stream left/out joins
guozhangwang closed pull request #11367: MINOR: Do not copy on range for in-memory shared store in stream stream left/out joins URL: https://github.com/apache/kafka/pull/11367 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
vamossagar12 commented on code in PR #13095: URL: https://github.com/apache/kafka/pull/13095#discussion_r1072496029 ## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; +import java.util.List; +import java.util.Random; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * This class records the average end to end latency for a single message to travel through Kafka + * + * broker_list = location of the bootstrap broker for both the producer and the consumer + * num_messages = # messages to send + * producer_acks = See ProducerConfig.ACKS_DOC + * message_size_bytes = size of each message in bytes + * + * e.g. [localhost:9092 test 1 1 20] + */ +public class EndToEndLatency { + +private final static long POLL_TIMEOUT_MS = 6; + +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (TerseException e) { +System.err.println(e.getMessage()); +return 1; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +// Visible for testing +static void execute(String... args) throws Exception { +if (args.length != 5 && args.length != 6) { +throw new TerseException("USAGE: java " + EndToEndLatency.class.getName() ++ " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file"); +} + +String brokers = args[0]; +String topic = args[1]; +int numMessages = Integer.parseInt(args[2]); +String acks = args[3]; +int messageSizeBytes = Integer.parseInt(args[4]); +Optional propertiesFile = args.length > 5 ? (Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : Optional.empty(); + +if (!Arrays.asList("1", "all").contains(acks)) { +throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all"); +} + +try (KafkaConsumer consumer = createKafkaConsumer(propertiesFile, brokers); + KafkaProducer producer = createKafkaProducer(propertiesFile, brokers, acks)) { + +if (!consumer.listTopics().containsKey(topic)) { +createTopic(propertiesFile, brokers, topic); +} +setupConsumer(topic, consumer); +double totalTime = 0.0; +long[] latencies = new long[numMessages]; +Random random = new Random(0); + +for (int i = 0; i < numMessages; i++) { +byte[] message = randomBytesOfLen(random, messageSizeBytes); +long begin = System.nanoTime(); +//Send message (of random bytes) synchronously then immediately poll for it +producer.send(new ProducerRecord<>(topic, message)).get(); +ConsumerRecords records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); +long elapsed = System.nanoTime() - begin; + +
[jira] [Commented] (KAFKA-13709) Document exactly-once support for source connectors
[ https://issues.apache.org/jira/browse/KAFKA-13709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677833#comment-17677833 ] ASF GitHub Bot commented on KAFKA-13709: C0urante merged PR #478: URL: https://github.com/apache/kafka-site/pull/478 > Document exactly-once support for source connectors > --- > > Key: KAFKA-13709 > URL: https://issues.apache.org/jira/browse/KAFKA-13709 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Add documentation for the support for exactly-once source connectors > introduced in > [KIP-618|https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors]. > This includes but is not limited to: > * How to safely perform a rolling upgrade to enable exactly-once source > support for an existing cluster > * Any new APIs that connector authors can/should leverage for their source > connectors that need clarification beyond what can be included in a Javadoc > (for example, how to know what to return from > {{{}SourceConnector::exactlyOnceSupport{}}}, and an example on how to define > custom transaction boundaries for a connector) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] fvaleri commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
fvaleri commented on code in PR #13095: URL: https://github.com/apache/kafka/pull/13095#discussion_r1072418814 ## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; +import java.util.List; +import java.util.Random; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * This class records the average end to end latency for a single message to travel through Kafka + * + * broker_list = location of the bootstrap broker for both the producer and the consumer + * num_messages = # messages to send + * producer_acks = See ProducerConfig.ACKS_DOC + * message_size_bytes = size of each message in bytes + * + * e.g. [localhost:9092 test 1 1 20] + */ +public class EndToEndLatency { + +private final static long POLL_TIMEOUT_MS = 6; + +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (TerseException e) { +System.err.println(e.getMessage()); +return 1; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +// Visible for testing +static void execute(String... args) throws Exception { +if (args.length != 5 && args.length != 6) { +throw new TerseException("USAGE: java " + EndToEndLatency.class.getName() ++ " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file"); +} + +String brokers = args[0]; +String topic = args[1]; +int numMessages = Integer.parseInt(args[2]); +String acks = args[3]; +int messageSizeBytes = Integer.parseInt(args[4]); +Optional propertiesFile = args.length > 5 ? (Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : Optional.empty(); + +if (!Arrays.asList("1", "all").contains(acks)) { +throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all"); +} + +try (KafkaConsumer consumer = createKafkaConsumer(propertiesFile, brokers); + KafkaProducer producer = createKafkaProducer(propertiesFile, brokers, acks)) { + +if (!consumer.listTopics().containsKey(topic)) { +createTopic(propertiesFile, brokers, topic); +} +setupConsumer(topic, consumer); +double totalTime = 0.0; +long[] latencies = new long[numMessages]; +Random random = new Random(0); + +for (int i = 0; i < numMessages; i++) { +byte[] message = randomBytesOfLen(random, messageSizeBytes); +long begin = System.nanoTime(); +//Send message (of random bytes) synchronously then immediately poll for it +producer.send(new ProducerRecord<>(topic, message)).get(); +ConsumerRecords records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); +long elapsed = System.nanoTime() - begin; + +
[GitHub] [kafka] fvaleri commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
fvaleri commented on code in PR #13095: URL: https://github.com/apache/kafka/pull/13095#discussion_r1072418814 ## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; +import java.util.List; +import java.util.Random; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * This class records the average end to end latency for a single message to travel through Kafka + * + * broker_list = location of the bootstrap broker for both the producer and the consumer + * num_messages = # messages to send + * producer_acks = See ProducerConfig.ACKS_DOC + * message_size_bytes = size of each message in bytes + * + * e.g. [localhost:9092 test 1 1 20] + */ +public class EndToEndLatency { + +private final static long POLL_TIMEOUT_MS = 6; + +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (TerseException e) { +System.err.println(e.getMessage()); +return 1; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +// Visible for testing +static void execute(String... args) throws Exception { +if (args.length != 5 && args.length != 6) { +throw new TerseException("USAGE: java " + EndToEndLatency.class.getName() ++ " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file"); +} + +String brokers = args[0]; +String topic = args[1]; +int numMessages = Integer.parseInt(args[2]); +String acks = args[3]; +int messageSizeBytes = Integer.parseInt(args[4]); +Optional propertiesFile = args.length > 5 ? (Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : Optional.empty(); + +if (!Arrays.asList("1", "all").contains(acks)) { +throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all"); +} + +try (KafkaConsumer consumer = createKafkaConsumer(propertiesFile, brokers); + KafkaProducer producer = createKafkaProducer(propertiesFile, brokers, acks)) { + +if (!consumer.listTopics().containsKey(topic)) { +createTopic(propertiesFile, brokers, topic); +} +setupConsumer(topic, consumer); +double totalTime = 0.0; +long[] latencies = new long[numMessages]; +Random random = new Random(0); + +for (int i = 0; i < numMessages; i++) { +byte[] message = randomBytesOfLen(random, messageSizeBytes); +long begin = System.nanoTime(); +//Send message (of random bytes) synchronously then immediately poll for it +producer.send(new ProducerRecord<>(topic, message)).get(); +ConsumerRecords records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); +long elapsed = System.nanoTime() - begin; + +
[GitHub] [kafka] vamossagar12 commented on pull request #12802: KAFKA-14311: Connect Worker clean shutdown does not cleanly stop connectors/tasks
vamossagar12 commented on PR #12802: URL: https://github.com/apache/kafka/pull/12802#issuecomment-1385618036 hey @C0urante bumping this one. Plz review whenever you get the chance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
vamossagar12 commented on code in PR #13095: URL: https://github.com/apache/kafka/pull/13095#discussion_r1072366317 ## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * This class records the average end to end latency for a single message to travel through Kafka + * + * broker_list = location of the bootstrap broker for both the producer and the consumer + * num_messages = # messages to send + * producer_acks = See ProducerConfig.ACKS_DOC + * message_size_bytes = size of each message in bytes + * + * e.g. [localhost:9092 test 1 1 20] + */ +public class EndToEndLatency { + +private final static long POLL_TIMEOUT_MS = 6; + +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +if (args.length != 5 && args.length != 6) { +System.err.println("USAGE: java " + EndToEndLatency.class.getName() ++ " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file"); +return 1; +} +execute(args); +return 0; +} catch (TerseException e) { +System.err.println(e.getMessage()); +return 1; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +// Visible for testing +static void execute(String... args) throws Exception { +String brokers = args[0]; +String topic = args[1]; +int numMessages = Integer.parseInt(args[2]); +String acks = args[3]; +int messageSizeBytes = Integer.parseInt(args[4]); +String propertiesFile = args.length > 5 ? args[5] : null; Review Comment: Thanks @ijuma , @fvaleri I made the changes. Let me know how's it looking now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
ijuma commented on PR #13078: URL: https://github.com/apache/kafka/pull/13078#issuecomment-1385597771 Has anyone checked that we always acquire a lock when we call methods like `activeProducers` and `isEmpty`? I wonder if this class has thread safety bugs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
ijuma commented on PR #13078: URL: https://github.com/apache/kafka/pull/13078#issuecomment-1385589558 @divijvaidya Producer ids don't change often, do I am not sure what you mean by: > In cases when we have just one (or two) producers, this metric would be highly unreliable (not just stale) as it provides an "approximation" of size(). It is not un-common to produce data from limited set of producers (with a large number of consumers) and hence, I would incline towards sticking to current approach of keeping this metric accurate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module
clolov commented on code in PR #13122: URL: https://github.com/apache/kafka/pull/13122#discussion_r1072326336 ## tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java: ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class LogDirsCommand { +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String[] args) throws ExecutionException, InterruptedException, IOException { +ArgumentParser parser = ArgumentParsers Review Comment: If it isn't too big of a problem may I have the 2 classes and see how far I can go with them at least in this pull request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module
fvaleri commented on code in PR #13122: URL: https://github.com/apache/kafka/pull/13122#discussion_r1072305087 ## tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java: ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class LogDirsCommand { +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String[] args) throws ExecutionException, InterruptedException, IOException { +ArgumentParser parser = ArgumentParsers Review Comment: > If we are completely moving to Java does this mean that CommandLineUtils needs to be rewritten first? Yes, I already created `CommandLineUtils` as part of the `DumpLogSegments` migration, which also includes `CommandDefaultOptions`, but I still have some work to do before opening the PR. Let me know if you want a separate PR with just these 2 shared classes. That said, I think we will need to have tools->core temporary dependency to avoid duplicating critical code like `GroupMetadataManager.formatRecordKeyAndValue(record)` and `TransactionLog.formatRecordKeyAndValue(record)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] 51n15t9r commented on pull request #12620: KAFKA-14206: upgrade zookeeper version to 3.7.1
51n15t9r commented on PR #12620: URL: https://github.com/apache/kafka/pull/12620#issuecomment-1385546015 @ijuma , @showuon - Sorry to bring this up in a closed thread. Other than the vulnerabilities, zookeeper 3.6 is also EOL since December 2022. Would it be useful to spend the efforts for Zookeeper 3.7.1 upgrades in the meantime? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module
fvaleri commented on code in PR #13122: URL: https://github.com/apache/kafka/pull/13122#discussion_r1072305087 ## tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java: ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class LogDirsCommand { +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String[] args) throws ExecutionException, InterruptedException, IOException { +ArgumentParser parser = ArgumentParsers Review Comment: > If we are completely moving to Java does this mean that CommandLineUtils needs to be rewritten first? Yes, I already created `CommandLineUtils` as part of the `DumpLogSegments` migration, which also includes `CommandDefaultOptions`, but I still have some work to do before opening the PR. Let me know if you want a separate PR with just these 3 shared classes. That said, I think we will need to have tools->core temporary dependency to avoid duplicating critical code like `GroupMetadataManager.formatRecordKeyAndValue(record)` and `TransactionLog.formatRecordKeyAndValue(record)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module
clolov commented on code in PR #13122: URL: https://github.com/apache/kafka/pull/13122#discussion_r1072271288 ## tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java: ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class LogDirsCommand { +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String[] args) throws ExecutionException, InterruptedException, IOException { +ArgumentParser parser = ArgumentParsers Review Comment: Sorry, then may I get a bit of clarification on how do you envision this to work? Do you envision that a Java compatible copy of `CommandDefaultOptions` is written? I am asking this because I was battling with moving ConsoleConsumer and there I ran into the following problem: ``` abstract class CommandDefaultOptions(val args: Array[String], allowCommandOptionAbbreviation: Boolean = false) { val parser = new OptionParser(allowCommandOptionAbbreviation) ... ``` ``` private static class ConsumerConfig extends CommandDefaultOptions { ConsumerConfig(String... args) { super(args, false) ... = this.parser(); <--- Cannot access joptsimple.OptionParser ``` Is it in general that we are only trying to move the commands from Scala to Java or do we want a complete break from Scala classes? If we are completely moving to Java does this mean that `CommandLineUtils` needs to be rewritten first? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module
clolov commented on code in PR #13122: URL: https://github.com/apache/kafka/pull/13122#discussion_r1072271288 ## tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java: ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class LogDirsCommand { +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String[] args) throws ExecutionException, InterruptedException, IOException { +ArgumentParser parser = ArgumentParsers Review Comment: Sorry, then may I get a bit of clarification on how do you envision this to work? Do you envision that a Java compatible copy of `CommandDefaultOptions` is written? I am asking this because I was battling with moving ConsoleConsumer and there I ran into the following problem: ``` abstract class CommandDefaultOptions(val args: Array[String], allowCommandOptionAbbreviation: Boolean = false) { val parser = new OptionParser(allowCommandOptionAbbreviation) ... ``` ``` private static class ConsumerConfig extends CommandDefaultOptions { ConsumerConfig(String... args) { super(args, false) ... = this.parser(); <--- Cannot access joptsimple.OptionParser ``` Is it in general that we are only trying to move the commands from Scala to Java or do we want a complete break from Scala classes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module
mimaison commented on code in PR #13122: URL: https://github.com/apache/kafka/pull/13122#discussion_r1072229304 ## tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java: ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class LogDirsCommand { +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String[] args) throws ExecutionException, InterruptedException, IOException { +ArgumentParser parser = ArgumentParsers Review Comment: We should not change the argument parsing library in this PR. This is likely to slightly change the usage and could potentially break existing commands/scripts used by users. Yes ideally we should use a single library but we can do that after (and it's likely to require a KIP). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java
ivanyu commented on code in PR #13067: URL: https://github.com/apache/kafka/pull/13067#discussion_r1072224151 ## server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.core.Meter; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.Timer; +import org.apache.kafka.common.utils.Sanitizer; + +public class KafkaMetricsGroup { +private final Class klass; + +public KafkaMetricsGroup(final Class klass) { +this.klass = klass; +} + +/** + * Creates a new MetricName object for gauges, meters, etc. created for this + * metrics group. + * @param name Descriptive name of the metric. + * @param tags Additional attributes which mBean will have. + * @return Sanitized metric name object. + */ +public MetricName metricName(final String name, final Map tags) { +final String pkg; +if (klass.getPackage() == null) { +pkg = ""; +} else { +pkg = klass.getPackage().getName(); +} +final String simpleName = klass.getSimpleName().replaceAll("\\$$", ""); +return explicitMetricName(pkg, simpleName, name, tags); +} + +public final MetricName explicitMetricName(final String group, final String typeName, Review Comment: Can be static, yes. Made it so -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java
ivanyu commented on code in PR #13067: URL: https://github.com/apache/kafka/pull/13067#discussion_r1072223766 ## server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.core.Meter; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.Timer; +import org.apache.kafka.common.utils.Sanitizer; + +public class KafkaMetricsGroup { +private final Class klass; + +public KafkaMetricsGroup(final Class klass) { +this.klass = klass; +} + +/** + * Creates a new MetricName object for gauges, meters, etc. created for this + * metrics group. + * @param name Descriptive name of the metric. + * @param tags Additional attributes which mBean will have. + * @return Sanitized metric name object. + */ +public MetricName metricName(final String name, final Map tags) { +final String pkg; +if (klass.getPackage() == null) { +pkg = ""; +} else { +pkg = klass.getPackage().getName(); +} +final String simpleName = klass.getSimpleName().replaceAll("\\$$", ""); +return explicitMetricName(pkg, simpleName, name, tags); +} + +public final MetricName explicitMetricName(final String group, final String typeName, + final String name, final Map tags) { +final StringBuilder nameBuilder = new StringBuilder(); +nameBuilder.append(group); +nameBuilder.append(":type="); +nameBuilder.append(typeName); + +if (!name.isEmpty()) { +nameBuilder.append(",name="); +nameBuilder.append(name); +} + +final String scope = toScope(tags).orElse(null); +final Optional tagsName = toMBeanName(tags); +tagsName.ifPresent(s -> nameBuilder.append(",").append(s)); + +return new MetricName(group, typeName, name, scope, nameBuilder.toString()); +} + +public final Gauge newGauge(final String name, final Gauge metric, final Map tags) { +return KafkaYammerMetrics.defaultRegistry().newGauge(metricName(name, tags), metric); +} + +public final Gauge newGauge(final String name, final Gauge metric) { +return newGauge(name, metric, Collections.emptyMap()); +} + +public final Meter newMeter(final String name, final String eventType, +final TimeUnit timeUnit, final Map tags) { +return KafkaYammerMetrics.defaultRegistry().newMeter(metricName(name, tags), eventType, timeUnit); +} + +public final Meter newMeter(final String name, final String eventType, +final TimeUnit timeUnit) { +return newMeter(name, eventType, timeUnit, Collections.emptyMap()); +} + +public final Meter newMeter(final MetricName metricName, final String eventType, final TimeUnit timeUnit) { +return KafkaYammerMetrics.defaultRegistry().newMeter(metricName, eventType, timeUnit); +} + +public final Histogram newHistogram(final String name, final boolean biased, final Map tags) { +return KafkaYammerMetrics.defaultRegistry().newHistogram(metricName(name, tags), biased); +} + +public final Histogram newHistogram(final String name) { +return newHistogram(name, true, Collections.emptyMap()); +} + +public final Histogram newHistogram(final String name, final boolean biased) { +return newHistogram(name, biased, Collections.emptyMap()); +} + +public final Histogram newHistogram(final String name, final Map tags) { +return newHistogram(name, true, tags); +} + +public final Timer newTimer(final String name, final TimeUnit durationUnit, final TimeUnit rateUnit, +final Map
[GitHub] [kafka] ivanyu commented on a diff in pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java
ivanyu commented on code in PR #13067: URL: https://github.com/apache/kafka/pull/13067#discussion_r1072223447 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -293,14 +295,14 @@ class Partition(val topicPartition: TopicPartition, private var controllerEpoch: Int = KafkaController.InitialControllerEpoch this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] " - private val tags = Map("topic" -> topic, "partition" -> partitionId.toString) + private val tags = Map("topic" -> topic, "partition" -> partitionId.toString).asJava - newGauge("UnderReplicated", () => if (isUnderReplicated) 1 else 0, tags) - newGauge("InSyncReplicasCount", () => if (isLeader) partitionState.isr.size else 0, tags) - newGauge("UnderMinIsr", () => if (isUnderMinIsr) 1 else 0, tags) - newGauge("AtMinIsr", () => if (isAtMinIsr) 1 else 0, tags) - newGauge("ReplicasCount", () => if (isLeader) assignmentState.replicationFactor else 0, tags) - newGauge("LastStableOffsetLag", () => log.map(_.lastStableOffsetLag).getOrElse(0), tags) + Partition.metricsGroup.newGauge("UnderReplicated", () => if (isUnderReplicated) 1 else 0, tags) Review Comment: Removed (by adding `import Partition.metricsGroup`). ## server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.core.Meter; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.Timer; +import org.apache.kafka.common.utils.Sanitizer; + +public class KafkaMetricsGroup { +private final Class klass; + +public KafkaMetricsGroup(final Class klass) { +this.klass = klass; +} + +/** + * Creates a new MetricName object for gauges, meters, etc. created for this + * metrics group. + * @param name Descriptive name of the metric. + * @param tags Additional attributes which mBean will have. + * @return Sanitized metric name object. + */ +public MetricName metricName(final String name, final Map tags) { +final String pkg; Review Comment: Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14576) Move ConsoleConsumer to tools
[ https://issues.apache.org/jira/browse/KAFKA-14576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov reassigned KAFKA-14576: - Assignee: Christo Lolov > Move ConsoleConsumer to tools > - > > Key: KAFKA-14576 > URL: https://issues.apache.org/jira/browse/KAFKA-14576 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Christo Lolov >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.
satishd commented on PR #13046: URL: https://github.com/apache/kafka/pull/13046#issuecomment-1385392475 Thanks @ijuma for the updated comments. Addressed them with inline and update the PR with latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
satishd commented on PR #13040: URL: https://github.com/apache/kafka/pull/13040#issuecomment-1385391553 Thanks @ijuma for the review comments. Addressed them inline and updated the PR with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
satishd commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1072167191 ## core/src/main/scala/kafka/log/LogLoader.scala: ## @@ -191,7 +192,7 @@ class LogLoader( // Reload all snapshots into the ProducerStateManager cache, the intermediate ProducerStateManager used // during log recovery may have deleted some files without the LogLoader.producerStateManager instance witnessing the // deletion. -producerStateManager.removeStraySnapshots(segments.baseOffsets.toSeq) +producerStateManager.removeStraySnapshots(segments.baseOffsets.toSeq.map(x => Long.box(x)).asJava) Review Comment: Good catch! Avoided `Seq` conversion by using Collection as arg. We do not need these conversions when we move/rewrite LogSegment code, which is planned in few weeks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
satishd commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1072167191 ## core/src/main/scala/kafka/log/LogLoader.scala: ## @@ -191,7 +192,7 @@ class LogLoader( // Reload all snapshots into the ProducerStateManager cache, the intermediate ProducerStateManager used // during log recovery may have deleted some files without the LogLoader.producerStateManager instance witnessing the // deletion. -producerStateManager.removeStraySnapshots(segments.baseOffsets.toSeq) +producerStateManager.removeStraySnapshots(segments.baseOffsets.toSeq.map(x => Long.box(x)).asJava) Review Comment: Good catch! Avoided `Seq` conversion by using Collection as arg. We do not need these conversions when we convert LogSegment, which is planned in few weeks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
divijvaidya commented on PR #13078: URL: https://github.com/apache/kafka/pull/13078#issuecomment-1385362966 **Proposal: Using concurrent map for ProducerStateManager.producers (currently a mutable.Map)** **Pros** - Simplifies code (prevents future bugs by accidental update to map without updating metrics) to implement KIP-847 - Does not have any locking concerns since calculation of size() in a concurrent map does't require locks. [From Java docs for `concurrentMap`](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ConcurrentHashMap.html): > Bear in mind that the results of aggregate status methods including size, isEmpty, and containsValue are typically useful only when a map is not undergoing concurrent updates in other threads. Otherwise the results of these methods reflect transient states that may be adequate for monitoring or estimation purposes, but not for program control. **Cons** - The metrics for number of producers may be inaccurate when multiple threads are modifying the map concurrently. `producers` map is modified during `truncation` (when the replica is catching up) or when a producer is expired due to timeout or on every completed transaction. That means it is modified fairly frequently. Hence, possibility of the `producerId` metric displaying inaccurate (usually off-by-one since there are at max 2 threads modifying this map) information is fairly high. **My opinion** In cases when we have just one (or two) producers, this metric would be highly unreliable (not just stale) as it provides an "approximation" of size(). It is not un-common to produce data from limited set of producers (with a large number of consumers) and hence, I would incline towards sticking to current approach of keeping this metric accurate. Having said that, I don't have a strong opinion here. We can always come back and make this metric accurate if we find that inaccuracy is causing problems while troubleshooting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module
fvaleri commented on code in PR #13122: URL: https://github.com/apache/kafka/pull/13122#discussion_r1071392892 ## tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java: ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class LogDirsCommand { +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String[] args) throws ExecutionException, InterruptedException, IOException { +ArgumentParser parser = ArgumentParsers Review Comment: I think that we should keep using `joptsimple` with the `CommandDefaultOptions` abstract class, rather than migrating all commands to `argparse4j`. The vast majority of commands do not use `argparse4j`. Then, the `joptsimple` (current) no-args output is very different: ```sh This tool helps to query log directory usage on the specified brokers. Option Description -- --- --bootstrap-serverbootstrapping --broker-list The list of brokers to be queried in the form "0,1,2". All brokers in the cluster will be queried if no broker list is specified --command-config passed to Admin Client. --describe Describe the specified log directories on the specified brokers. --help Print usage information. --topic-listThe list of topics to be queried in the form "topic1,topic2,topic3". All topics will be queried if no topic list is specified (default: ) --version Display Kafka version. ``` This is the new no-args output with `argparse4j`: ```sh usage: kafka-log-dirs [-h] --bootstrap-server BOOTSTRAP-SERVER [--command-config COMMAND-CONFIG] [--topic-list TOPIC-LIST] [--broker-list BROKER-LIST] kafka-log-dirs: error: argument --bootstrap-server is required ``` @ijuma @mimaison WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module
fvaleri commented on code in PR #13122: URL: https://github.com/apache/kafka/pull/13122#discussion_r1071392892 ## tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java: ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class LogDirsCommand { +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String[] args) throws ExecutionException, InterruptedException, IOException { +ArgumentParser parser = ArgumentParsers Review Comment: I think that we should keep using `joptsimple` with the `CommandDefaultOptions` abstract class, rather than migrating all commands to `argparse4j`. The vast majority of commands do not use `argparse4j`. Then, the `joptsimple` (current) no-args output is very different: ```sh This tool helps to query log directory usage on the specified brokers. Option Description -- --- --bootstrap-serverbootstrapping --broker-list The list of brokers to be queried in the form "0,1,2". All brokers in the cluster will be queried if no broker list is specified --command-config passed to Admin Client. --describe Describe the specified log directories on the specified brokers. --help Print usage information. --topic-listThe list of topics to be queried in the form "topic1,topic2,topic3". All topics will be queried if no topic list is specified (default: ) --version Display Kafka version. ``` New output: ```sh usage: kafka-log-dirs [-h] --bootstrap-server BOOTSTRAP-SERVER [--command-config COMMAND-CONFIG] [--topic-list TOPIC-LIST] [--broker-list BROKER-LIST] kafka-log-dirs: error: argument --bootstrap-server is required ``` @ijuma @mimaison WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yufeiyan1220 opened a new pull request, #13125: KAFKA-14626 Kafka Consumer Coordinator does not cleanup all metrics
yufeiyan1220 opened a new pull request, #13125: URL: https://github.com/apache/kafka/pull/13125 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14626) Kafka Consumer Coordinator does not cleanup all metrics
Feiyan Yu created KAFKA-14626: - Summary: Kafka Consumer Coordinator does not cleanup all metrics Key: KAFKA-14626 URL: https://issues.apache.org/jira/browse/KAFKA-14626 Project: Kafka Issue Type: Bug Reporter: Feiyan Yu Getting inspired by [KAFKA-9306|https://issues.apache.org/jira/browse/KAFKA-9306], I found that there is no logic to remove metrics registered in `ConsumerCoordinatorMetrics` when `ConsumerCoordinator` is closed, which might -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module
clolov commented on code in PR #13122: URL: https://github.com/apache/kafka/pull/13122#discussion_r1072024066 ## tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java: ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class LogDirsCommand { +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String[] args) throws ExecutionException, InterruptedException, IOException { +ArgumentParser parser = ArgumentParsers Review Comment: To be honest, I used `argparse4j` because I looked at https://github.com/apache/kafka/pull/13080/files as a reference. I don't have a strong opinion either way, but the first output you presented seems nicer to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module
clolov commented on code in PR #13122: URL: https://github.com/apache/kafka/pull/13122#discussion_r1072022574 ## tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java: ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class LogDirsCommand { +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String[] args) throws ExecutionException, InterruptedException, IOException { Review Comment: Okay, I will do this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module
clolov commented on code in PR #13122: URL: https://github.com/apache/kafka/pull/13122#discussion_r1072022248 ## tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java: ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + Review Comment: Yup, this is my bad, I will add it in subsequent commits. ## tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java: ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class LogDirsCommand { +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (Throwable e) { Review Comment: Sure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #13082: MINOR: Clarify docs for Streams config max.warmup.replicas.
lucasbru commented on code in PR #13082: URL: https://github.com/apache/kafka/pull/13082#discussion_r1072016340 ## docs/streams/developer-guide/config-streams.html: ## @@ -778,10 +778,21 @@ rack.aware.assignment.tagsmax.warmup.replicas - The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping - the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker - traffic and cluster state can be used for high availability. Increasing this will allow Streams to warm up more tasks at once, speeding up the time - for the reassigned warmups to restore sufficient state for them to be transitioned to active tasks. Must be at least 1. + +The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping +the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker +traffic and cluster state can be used for high availability. Increasing this will allow Streams to warm up more tasks at once, speeding up the time +for the reassigned warmups to restore sufficient state for them to be transitioned to active tasks. Must be at least 1. + + +Note that one warmup replica corresponds to one Stream Task. Furthermore, note that each warmup replica can only be promoted to an active Task during Review Comment: @littlehorse-eng You need apache http with a include_module and rewrite_module being loaded. Maybe this can help: https://github.com/lucasbru/kafka-site-docker -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] keashem commented on pull request #13124: MINOR: Rename testDeadToDeadIllegalTransition to testDeadToDeadTransition in GroupMetadataTest
keashem commented on PR #13124: URL: https://github.com/apache/kafka/pull/13124#issuecomment-1385052326 @hachikuji plz take a look~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] keashem opened a new pull request, #13124: MINOR: Rename testDeadToDeadIllegalTransition to testDeadToDeadTransition in GroupMetadataTest
keashem opened a new pull request, #13124: URL: https://github.com/apache/kafka/pull/13124 The Dead GroupState has a valid previous state: Dead, so transition from Dead to Dead won't throw IllegalStateException. The unit test testDeadToDeadIllegalTransition should be renamed testDeadToDeadTransition. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org