Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1580515079 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +public class RLMQuotaManagerConfig { Review Comment: Will add. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]
appchemist commented on code in PR #15647: URL: https://github.com/apache/kafka/pull/15647#discussion_r1580510829 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java: ## @@ -327,21 +327,27 @@ private void handleInitializeErrors(final CompletedFetch completedFetch, final E final long fetchOffset = completedFetch.nextFetchOffset(); if (error == Errors.NOT_LEADER_OR_FOLLOWER || -error == Errors.REPLICA_NOT_AVAILABLE || +error == Errors.FENCED_LEADER_EPOCH) { +log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); +requestMetadataUpdate(metadata, subscriptions, tp); +} else if (error == Errors.REPLICA_NOT_AVAILABLE || error == Errors.KAFKA_STORAGE_ERROR || -error == Errors.FENCED_LEADER_EPOCH || error == Errors.OFFSET_NOT_AVAILABLE) { log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); requestMetadataUpdate(metadata, subscriptions, tp); +subscriptions.awaitUpdate(tp); Review Comment: I missed that. For this case, there is no need to await a metadata update. I think simply initializing the PreferredReadReplica should be enough. Since `FetchUtils.requestMetadataUpdate()` is already being called, it should also be initializing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]
appchemist commented on code in PR #15647: URL: https://github.com/apache/kafka/pull/15647#discussion_r1580504115 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget, if (partitionData.currentLeader().leaderId() != -1 && partitionData.currentLeader().leaderEpoch() != -1) { partitionsWithUpdatedLeaderInfo.put(partition, new Metadata.LeaderIdAndEpoch( Optional.of(partitionData.currentLeader().leaderId()), Optional.of(partitionData.currentLeader().leaderEpoch(; +} else { +requestMetadataUpdate(metadata, subscriptions, partition); +subscriptions.awaitUpdate(partition); Review Comment: As another alternative, it could change the status to `AWAIT_UPDATE` in `FetchCollector.handleInitializeErrors()` only when it's not a KIP-951 case. Upon further thought, it seems possible to differentiate based on the following conditions. ``` completedFetch.partitionData.currentLeader().leaderId() != -1 && completedFetch.partitionData.currentLeader().leaderEpoch() != -1 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]
appchemist commented on code in PR #15647: URL: https://github.com/apache/kafka/pull/15647#discussion_r1580499797 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget, if (partitionData.currentLeader().leaderId() != -1 && partitionData.currentLeader().leaderEpoch() != -1) { partitionsWithUpdatedLeaderInfo.put(partition, new Metadata.LeaderIdAndEpoch( Optional.of(partitionData.currentLeader().leaderId()), Optional.of(partitionData.currentLeader().leaderEpoch(; +} else { +requestMetadataUpdate(metadata, subscriptions, partition); +subscriptions.awaitUpdate(partition); Review Comment: If the FetchStates is `FETCHING` as per KIP-951, the `FetchCollector.handleInitializeErrors()` method is called. I thought that in this case, it should not be changed to `AWAIT_UPDATE`. Additionally, if it's `AWAIT_UPDATE`, it will be filtered out by the following code inside the `FetchCollector.initialize()` method and will not go through `FetchCollector.handleInitializeErrors()`. ``` if (!subscriptions.hasValidPosition(tp)) { // this can happen when a rebalance happened while fetch is still in-flight log.debug("Ignoring fetched records for partition {} since it no longer has valid position", tp); return null; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]
appchemist commented on code in PR #15647: URL: https://github.com/apache/kafka/pull/15647#discussion_r1580504115 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget, if (partitionData.currentLeader().leaderId() != -1 && partitionData.currentLeader().leaderEpoch() != -1) { partitionsWithUpdatedLeaderInfo.put(partition, new Metadata.LeaderIdAndEpoch( Optional.of(partitionData.currentLeader().leaderId()), Optional.of(partitionData.currentLeader().leaderEpoch(; +} else { +requestMetadataUpdate(metadata, subscriptions, partition); +subscriptions.awaitUpdate(partition); Review Comment: As another alternative, it could change the status to AWAIT_UPDATE in FetchCollector.handleInitializeErrors() only when it's not a KIP-951 case. Upon further thought, it seems possible to differentiate based on the following conditions. ``` completedFetch.partitionData.currentLeader().leaderId() != -1 && completedFetch.partitionData.currentLeader().leaderEpoch() != -1 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]
appchemist commented on code in PR #15647: URL: https://github.com/apache/kafka/pull/15647#discussion_r1580499797 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget, if (partitionData.currentLeader().leaderId() != -1 && partitionData.currentLeader().leaderEpoch() != -1) { partitionsWithUpdatedLeaderInfo.put(partition, new Metadata.LeaderIdAndEpoch( Optional.of(partitionData.currentLeader().leaderId()), Optional.of(partitionData.currentLeader().leaderEpoch(; +} else { +requestMetadataUpdate(metadata, subscriptions, partition); +subscriptions.awaitUpdate(partition); Review Comment: If the FetchStates is FETCHING as per KIP-951, the FetchCollector.handleInitializeErrors() method is called. I thought that in this case, it should not be changed to AWAIT_UPDATE. Additionally, if it's AWAIT_UPDATE, it will be filtered out by the following code inside the FetchCollector.initialize() method and will not go through FetchCollector.handleInitializeErrors(). ``` if (!subscriptions.hasValidPosition(tp)) { // this can happen when a rebalance happened while fetch is still in-flight log.debug("Ignoring fetched records for partition {} since it no longer has valid position", tp); return null; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1580499863 ## core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java: ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +import kafka.server.QuotaType; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.utils.MockTime; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RLMQuotaManagerTest { +private final MockTime time = new MockTime(); +private final Metrics metrics = new Metrics(new MetricConfig(), Collections.emptyList(), time); +private static final QuotaType QUOTA_TYPE = QuotaType.RLMFetch$.MODULE$; +private static final String DESCRIPTION = "Tracking byte rate"; + +@Test +public void testQuotaExceeded() { +RLMQuotaManager quotaManager = new RLMQuotaManager( +new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, DESCRIPTION, time); + +assertFalse(quotaManager.isQuotaExceeded()); +quotaManager.record(500); +// Move clock by 1 sec, quota is violated +moveClock(1); +assertTrue(quotaManager.isQuotaExceeded()); + +// Move clock by another 8 secs, quota is still violated for the window +moveClock(8); +assertTrue(quotaManager.isQuotaExceeded()); + +// Move clock by 1 sec, quota is no more violated +moveClock(1); +assertFalse(quotaManager.isQuotaExceeded()); +} + +@Test +public void testQuotaUpdate() { +RLMQuotaManager quotaManager = new RLMQuotaManager( +new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, DESCRIPTION, time); + +assertFalse(quotaManager.isQuotaExceeded()); +quotaManager.record(51); +assertTrue(quotaManager.isQuotaExceeded()); + +Map fetchQuotaMetrics = metrics.metrics().entrySet().stream() +.filter(entry -> entry.getKey().name().equals("byte-rate") && entry.getKey().group().equals(QUOTA_TYPE.toString())) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +Map nonQuotaMetrics = metrics.metrics().entrySet().stream() +.filter(entry -> !entry.getKey().name().equals("byte-rate") || !entry.getKey().group().equals(QUOTA_TYPE.toString())) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +assertEquals(1, fetchQuotaMetrics.size()); +assertFalse(nonQuotaMetrics.isEmpty()); + +Map configForQuotaMetricsBeforeUpdate = extractMetricConfig(fetchQuotaMetrics); +Map configForNonQuotaMetricsBeforeUpdate = extractMetricConfig(nonQuotaMetrics); + +// Update quota to 60, quota is no more violated +Quota quota60Bytes = new Quota(60, true); +quotaManager.updateQuota(quota60Bytes); +assertFalse(quotaManager.isQuotaExceeded()); + +// Verify quota metrics were updated +Map configForQuotaMetricsAfterFirstUpdate = extractMetricConfig(fetchQuotaMetrics); +assertNotEquals(configForQuotaMetricsBeforeUpdate, configForQuotaMetricsAfterFirstUpdate); +fetchQuotaMetrics.values().forEach(metric -> assertEquals(metric.config().quota(), quota60Bytes)); Review Comment: Yes, thanks for pointing out. ## core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java: ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership.
[jira] [Commented] (KAFKA-16584) Make log processing summary configurable or debug
[ https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841056#comment-17841056 ] dujian commented on KAFKA-16584: hello [~mjsax] before create KIP, i must create a wiki ID, but “ [https://cwiki.apache.org/confluence/signup.action]” registration function turned off,can you help me > Make log processing summary configurable or debug > - > > Key: KAFKA-16584 > URL: https://issues.apache.org/jira/browse/KAFKA-16584 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.2 >Reporter: Andras Hatvani >Assignee: dujian >Priority: Major > Labels: needs-kip, newbie > > Currently *every two minutes for every stream thread* statistics will be > logged on INFO log level. > {code} > 2024-04-18T09:18:23.790+02:00 INFO 33178 --- [service] [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 > total records, ran 0 punctuators, and committed 0 total tasks since the last > update {code} > This is absolutely unnecessary and even harmful since it fills the logs and > thus storage space with unwanted and useless data. Otherwise the INFO logs > are useful and helpful, therefore it's not an option to raise the log level > to WARN. > Please make the logProcessingSummary > * either to a DEBUG level log or > * make it configurable so that it can be disabled. > This is the relevant code: > https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]
appchemist commented on PR #15647: URL: https://github.com/apache/kafka/pull/15647#issuecomment-2078650601 Thanks for review! @kirktrue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Clean up TestUtils.scala [kafka]
m1a2st commented on PR #15808: URL: https://github.com/apache/kafka/pull/15808#issuecomment-2078587040 @chia7712 thanls for your comment, I have already change these code by your comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] MINOR: fix flaky test [kafka]
github-actions[bot] commented on PR #15052: URL: https://github.com/apache/kafka/pull/15052#issuecomment-2078571664 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: enable kraft test for ReassignPartitionsIntegrationTest [kafka]
FrankYang0529 commented on PR #15675: URL: https://github.com/apache/kafka/pull/15675#issuecomment-2078515889 > btw, it would be great if you can rewrite it by new test infra. Yeah, but this class need to assign different config to different broker. I will wait for https://github.com/apache/kafka/pull/15761. https://github.com/apache/kafka/blob/025f9816f1a15d14aab25c9e8e5b03a87f0cefe2/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java#L754-L808 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1580354223 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java: ## @@ -132,63 +133,41 @@ public void close() throws Exception { } } -static class ConsumerRunnable implements Runnable { -private final String brokerAddress; -private final String groupId; -private final Properties customConfigs; +private static class ConsumerRunnable implements Runnable { private final boolean syncCommit; private final String topic; -private final String groupProtocol; -private final String assignmentStrategy; -private final Optional remoteAssignor; -private final Properties props = new Properties(); -private KafkaConsumer consumer; -private boolean configured = false; +private final KafkaConsumer consumer; private volatile boolean isShutdown = false; -public ConsumerRunnable(String brokerAddress, -String groupId, -String groupProtocol, -String topic, -String assignmentStrategy, -Optional remoteAssignor, -Optional customConfigs, -boolean syncCommit) { -this.brokerAddress = brokerAddress; -this.groupId = groupId; -this.customConfigs = customConfigs.orElse(new Properties()); +private ConsumerRunnable(String brokerAddress, + String groupId, + String groupProtocol, + String topic, + String assignmentStrategy, + Optional remoteAssignor, + Map customConfigs, + boolean syncCommit) { this.syncCommit = syncCommit; this.topic = topic; -this.groupProtocol = groupProtocol; -this.assignmentStrategy = assignmentStrategy; -this.remoteAssignor = remoteAssignor; -this.configure(); -} - -private void configure() { -configured = true; -configure(props); -props.putAll(customConfigs); -consumer = new KafkaConsumer<>(props); -} - -private void configure(Properties props) { -props.put(BOOTSTRAP_SERVERS_CONFIG, brokerAddress); -props.put(GROUP_ID_CONFIG, groupId); -props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); -props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); -props.put(GROUP_PROTOCOL_CONFIG, groupProtocol); +Map configs = new HashMap<>(); +configs.put(BOOTSTRAP_SERVERS_CONFIG, brokerAddress); +configs.put(GROUP_ID_CONFIG, groupId); +configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); +configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); +configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol); + if (Objects.equals(groupProtocol, CONSUMER.toString())) { -remoteAssignor.ifPresent(assignor -> props.put(GROUP_REMOTE_ASSIGNOR_CONFIG, assignor)); +remoteAssignor.ifPresent(assignor -> configs.put(GROUP_REMOTE_ASSIGNOR_CONFIG, assignor)); } else { -props.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy); +configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy); } +configs.putAll(customConfigs); +consumer = new KafkaConsumer<>(configs); Review Comment: Could you make sure all consumers get closed even though one of consumer gets error? We encountered resource leaks before and it makes our CI unstable. For example: ```java private static AutoCloseable run( String brokerAddress, int numberOfConsumers, String groupId, String groupProtocol, String topic, String assignmentStrategy, Optional remoteAssignor, Map customConfigs, boolean syncCommit ) { Queue> consumers = consumers(IntStream.range(0, numberOfConsumers).mapToObj(ignored -> { Map configs = new HashMap<>(); configs.put(BOOTSTRAP_SERVERS_CONFIG, brokerAddress); configs.put(GROUP_ID_CONFIG, groupId); configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol); if (Objects.equals(groupProtocol, CONSUMER.toString())) {
[PR] MINOR: fix timeouts of EosIntegrationTest [kafka]
mjsax opened a new pull request, #15811: URL: https://github.com/apache/kafka/pull/15811 Typo set's timeout way too high... Test still passes as test timeout is 10 minutes and test runs 5 minutes (with this fix, test runtime drops to 30 seconds as intended). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Clean up TestUtils.scala [kafka]
chia7712 commented on code in PR #15808: URL: https://github.com/apache/kafka/pull/15808#discussion_r1580291046 ## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ## @@ -1594,10 +1313,6 @@ object TestUtils extends Logging { } } - def createIsrChangeListener(): MockAlterPartitionListener = { Review Comment: `MockAlterPartitionListener` is used by `AbstractPartitionTest` only, so could you move it to `AbstractPartitionTest`? ## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ## @@ -548,4 +549,31 @@ class DeleteTopicTest extends QuorumTestHarness { TestUtils.waitUntilTrue(() => brokers.exists(_.asInstanceOf[KafkaServer].kafkaController.isActive), "No controller is elected") TestUtils.verifyTopicDeletion(zkClient, topic, 2, brokers) } + + private def increasePartitions[B <: KafkaBroker](admin: Admin, + topic: String, + totalPartitionCount: Int, + brokersToValidate: Seq[B] + ): Unit = { + +try { + val newPartitionSet: Map[String, NewPartitions] = Map.apply(topic -> NewPartitions.increaseTo(totalPartitionCount)) + admin.createPartitions(newPartitionSet.asJava) +} catch { + case e: ExecutionException => +throw e +} + +if (brokersToValidate.nonEmpty) { + // wait until we've propagated all partitions metadata to all brokers + val allPartitionsMetadata = waitForAllPartitionsMetadata(brokersToValidate, topic, totalPartitionCount) + + (0 until totalPartitionCount - 1).map { i => +i -> allPartitionsMetadata.get(new TopicPartition(topic, i)).map(_.leader()).getOrElse( + throw new IllegalStateException(s"Cannot get the partition leader for topic: $topic, partition: $i in server metadata cache")) Review Comment: this method return nothing, so we don't need to collect the metadata. However, it needs to check the existence of topic partition ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -613,4 +613,19 @@ class LogSegmentTest { ) } + private def checkEquals[T](s1: java.util.Iterator[T], s2: java.util.Iterator[T]): Unit = { +while (s1.hasNext && s2.hasNext) + assertEquals(s1.next, s2.next) +assertFalse(s1.hasNext, "Iterators have uneven length--first has more") +assertFalse(s2.hasNext, "Iterators have uneven length--second has more") + } + + private def writeNonsenseToFile(fileName: File, position: Long, size: Int): Unit = { +val file = new RandomAccessFile(fileName, "rw") +file.seek(position) +for (_ <- 0 until size) + file.writeByte(random.nextInt(255)) +file.close() Review Comment: could you please add try-finally? ## core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala: ## @@ -149,6 +150,23 @@ class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes */ @Test def testAcls(): Unit = { -TestUtils.verifySecureZkAcls(zkClient, 1) +verifySecureZkAcls(zkClient, 1) + } + + /** + * Verifies that all secure paths in ZK are created with the expected ACL. + */ + private def verifySecureZkAcls(zkClient: KafkaZkClient, usersWithAccess: Int): Unit = { Review Comment: ditto ## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ## @@ -548,4 +549,31 @@ class DeleteTopicTest extends QuorumTestHarness { TestUtils.waitUntilTrue(() => brokers.exists(_.asInstanceOf[KafkaServer].kafkaController.isActive), "No controller is elected") TestUtils.verifyTopicDeletion(zkClient, topic, 2, brokers) } + + private def increasePartitions[B <: KafkaBroker](admin: Admin, + topic: String, + totalPartitionCount: Int, + brokersToValidate: Seq[B] + ): Unit = { + +try { + val newPartitionSet: Map[String, NewPartitions] = Map.apply(topic -> NewPartitions.increaseTo(totalPartitionCount)) + admin.createPartitions(newPartitionSet.asJava) +} catch { Review Comment: We do nothing for this catch, so it should be fine to remove it. ## core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala: ## @@ -52,6 +56,20 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup { */ @Test def testZkAclsDisabled(): Unit = { -TestUtils.verifyUnsecureZkAcls(zkClient) +verifyUnsecureZkAcls(zkClient) + } + + /** + * Verifies that secure paths in ZK have no access control. This is + * the case when zookeeper.set.acl=false and no ACLs have been configured. + */ + private def verifyUnsecureZkAcls(zkClient: KafkaZkClient): Unit = { +
Re: [PR] MINOR: fix javadoc warnings [kafka]
chia7712 merged PR #15527: URL: https://github.com/apache/kafka/pull/15527 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove unnecessary version from excluded dependencies of clients [kafka]
showuon commented on PR #15804: URL: https://github.com/apache/kafka/pull/15804#issuecomment-2078386421 > LGTM as it won't compare the undefined version (https://github.com/johnrengelman/shadow/blob/main/src/main/groovy/com/github/jengelman/gradle/plugins/shadow/internal/AbstractDependencyFilter.groovy#L101) Thanks @chia7712 for pointing it out! I didn't know that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Clean up TestUtils.scala [kafka]
m1a2st commented on PR #15808: URL: https://github.com/apache/kafka/pull/15808#issuecomment-2078376955 @chia7712 I resolve the conflicts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: enable kraft test for ReassignPartitionsIntegrationTest [kafka]
chia7712 commented on PR #15675: URL: https://github.com/apache/kafka/pull/15675#issuecomment-2078363083 btw, it would be great if you can rewrite it by new test infra. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Clean up TestUtils.scala [kafka]
chia7712 commented on PR #15808: URL: https://github.com/apache/kafka/pull/15808#issuecomment-2078361586 @m1a2st Could you please fix the conflicts? thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
akhileshchg commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1580260431 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -786,12 +773,31 @@ public void run() throws Exception { } } +class RecoverMigrationStateFromZKEvent extends MigrationEvent { +@Override +public void run() throws Exception { +if (checkDriverState(MigrationDriverState.UNINITIALIZED, this)) { +applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); Review Comment: For my understanding, was this the line where uncaught exception is thrown? Can we handle the exception more gracefully and log and error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1580243392 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1092,6 +1097,86 @@ private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri } } +/** + * Validates if the received classic member protocols are supported by the group. + * + * @param group The ConsumerGroup. + * @param memberId The joining member id. + * @param protocolType The joining member protocol type. + * @param protocols The joining member protocol collection. + */ +private void throwIfClassicProtocolIsNotSupported( +ConsumerGroup group, +String memberId, +String protocolType, +JoinGroupRequestProtocolCollection protocols +) { +if (!group.supportsClassicProtocols(protocolType, ClassicGroupMember.plainProtocolSet(protocols))) { +throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + memberId + "'s protocols are not supported."); +} +} + +/** + * Deserialize the subscription in JoinGroupRequestProtocolCollection. + * All the protocols have the same subscription, so the method picks a random one. + * + * @param protocols The JoinGroupRequestProtocolCollection. + * @return The Subscription. + */ +private static ConsumerPartitionAssignor.Subscription deserializeSubscription( +JoinGroupRequestProtocolCollection protocols +) { +try { +return ConsumerProtocol.deserializeSubscription( +ByteBuffer.wrap(protocols.stream().findAny().get().metadata()) +); +} catch (SchemaException e) { +throw new IllegalStateException("Malformed embedded consumer protocol."); +} +} + +/** + * Validates the generation id and returns the owned partitions in the JoinGroupRequest to a consumer group. + * + * @param memberThe joining member. + * @param subscription The Subscription. + * @return The owned partitions if valid, otherwise return null. + */ +private List validateGenerationIdAndGetOwnedPartition( +ConsumerGroupMember member, +ConsumerPartitionAssignor.Subscription subscription +) { +List ownedPartitions = +toTopicPartitions(subscription.ownedPartitions(), metadataImage.topics()); +if (subscription.generationId().isPresent() && subscription.generationId().get() == member.memberEpoch()) { +return ownedPartitions; +} else { +// If the generation id is not provided or doesn't match the member epoch, it's still safe to +// accept the ownedPartitions that is a subset of the assigned partition. Otherwise, set the +// ownedPartition to be null. When a new assignment is provided, the consumer will stop fetching +// from and revoke the partitions it does not own. +if (isSubset(ownedPartitions, member.assignedPartitions())) { +return ownedPartitions; +} else { +return null; +} +} +} + +/** + * @return The ConsumerGroupHeartbeatRequestData.TopicPartitions list converted from the TopicPartitions list. + */ +private static List toTopicPartitions( +List partitions, +TopicsImage topicsImage +) { +return ConsumerGroup.topicPartitionMapFromList(partitions, topicsImage).entrySet().stream().map( Review Comment: I thought we need to sort the partitions by topic anyways, so I don't quite get how to combine(?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1580240603 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1336,6 +1422,233 @@ private CoordinatorResult consumerGr return new CoordinatorResult<>(records, response); } +/** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param contextThe request context. + * @param requestThe actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ +private CoordinatorResult consumerGroupJoin( +ConsumerGroup group, +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) throws ApiException { +final long currentTimeMs = time.milliseconds(); +final List records = new ArrayList<>(); +final String groupId = request.groupId(); +String memberId = request.memberId(); +final String instanceId = request.groupInstanceId(); +final JoinGroupRequestProtocolCollection protocols = request.protocols(); + +final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); +List ownedTopicPartitions = null; + +if (!validateClassicGroupSessionTimeout(memberId, request.sessionTimeoutMs(), responseFuture)) { +return EMPTY_RESULT; +} +throwIfConsumerGroupIsFull(group, memberId); +// TODO: need to throw an exception if group is dead? + +// Get or create the member. +if (isUnknownMember) memberId = Uuid.randomUuid().toString(); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +boolean staticMemberReplaced = false; +boolean newMemberCreated = false; +if (instanceId == null) { +if (isUnknownMember && JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) { +// If member id required, send back a response to call for another join group request with allocated member id. +log.info("Dynamic member with unknown member id joins group {}. " + +"Created a new member id {} and requesting the member to rejoin with this id.", +group.groupId(), memberId); + +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.MEMBER_ID_REQUIRED.code()) +); +return EMPTY_RESULT; +} else { +// A dynamic member (re-)joins. +throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); +newMemberCreated = !group.hasMember(memberId); + +member = group.getOrMaybeCreateMember(memberId, true); +if (!newMemberCreated) ownedTopicPartitions = validateGenerationIdAndGetOwnedPartition(member, subscription); +log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +} +} else { +throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); +member = group.staticMember(instanceId); +// A new static member joins or the existing static member rejoins. +if (isUnknownMember) { +newMemberCreated = true; +if (member == null) { +// New static member. +member = group.getOrMaybeCreateMember(memberId, true); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId); +} else { +// Replace the current static member. +staticMemberReplaced = true; +updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) +.setAssignedPartitions(member.assignedPartitions()); +ownedTopicPartitions = validateGenerationIdAndGetOwnedPartition(member, subscription); +removeMemberAndCancelTimers(records, group.groupId(), member.memberId()); +log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId); +} +} else { +// Rejoining static member. Fence the
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1580240603 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1336,6 +1422,233 @@ private CoordinatorResult consumerGr return new CoordinatorResult<>(records, response); } +/** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param contextThe request context. + * @param requestThe actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ +private CoordinatorResult consumerGroupJoin( +ConsumerGroup group, +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) throws ApiException { +final long currentTimeMs = time.milliseconds(); +final List records = new ArrayList<>(); +final String groupId = request.groupId(); +String memberId = request.memberId(); +final String instanceId = request.groupInstanceId(); +final JoinGroupRequestProtocolCollection protocols = request.protocols(); + +final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); +List ownedTopicPartitions = null; + +if (!validateClassicGroupSessionTimeout(memberId, request.sessionTimeoutMs(), responseFuture)) { +return EMPTY_RESULT; +} +throwIfConsumerGroupIsFull(group, memberId); +// TODO: need to throw an exception if group is dead? + +// Get or create the member. +if (isUnknownMember) memberId = Uuid.randomUuid().toString(); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +boolean staticMemberReplaced = false; +boolean newMemberCreated = false; +if (instanceId == null) { +if (isUnknownMember && JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) { +// If member id required, send back a response to call for another join group request with allocated member id. +log.info("Dynamic member with unknown member id joins group {}. " + +"Created a new member id {} and requesting the member to rejoin with this id.", +group.groupId(), memberId); + +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.MEMBER_ID_REQUIRED.code()) +); +return EMPTY_RESULT; +} else { +// A dynamic member (re-)joins. +throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); +newMemberCreated = !group.hasMember(memberId); + +member = group.getOrMaybeCreateMember(memberId, true); +if (!newMemberCreated) ownedTopicPartitions = validateGenerationIdAndGetOwnedPartition(member, subscription); +log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +} +} else { +throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); +member = group.staticMember(instanceId); +// A new static member joins or the existing static member rejoins. +if (isUnknownMember) { +newMemberCreated = true; +if (member == null) { +// New static member. +member = group.getOrMaybeCreateMember(memberId, true); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId); +} else { +// Replace the current static member. +staticMemberReplaced = true; +updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) +.setAssignedPartitions(member.assignedPartitions()); +ownedTopicPartitions = validateGenerationIdAndGetOwnedPartition(member, subscription); +removeMemberAndCancelTimers(records, group.groupId(), member.memberId()); +log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId); +} +} else { +// Rejoining static member. Fence the
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1580240281 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1092,6 +1097,86 @@ private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri } } +/** + * Validates if the received classic member protocols are supported by the group. + * + * @param group The ConsumerGroup. + * @param memberId The joining member id. + * @param protocolType The joining member protocol type. + * @param protocols The joining member protocol collection. + */ +private void throwIfClassicProtocolIsNotSupported( +ConsumerGroup group, +String memberId, +String protocolType, +JoinGroupRequestProtocolCollection protocols +) { +if (!group.supportsClassicProtocols(protocolType, ClassicGroupMember.plainProtocolSet(protocols))) { +throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + memberId + "'s protocols are not supported."); +} +} + +/** + * Deserialize the subscription in JoinGroupRequestProtocolCollection. + * All the protocols have the same subscription, so the method picks a random one. + * + * @param protocols The JoinGroupRequestProtocolCollection. + * @return The Subscription. + */ +private static ConsumerPartitionAssignor.Subscription deserializeSubscription( +JoinGroupRequestProtocolCollection protocols +) { +try { +return ConsumerProtocol.deserializeSubscription( +ByteBuffer.wrap(protocols.stream().findAny().get().metadata()) +); +} catch (SchemaException e) { +throw new IllegalStateException("Malformed embedded consumer protocol."); +} +} + +/** + * Validates the generation id and returns the owned partitions in the JoinGroupRequest to a consumer group. + * + * @param memberThe joining member. + * @param subscription The Subscription. + * @return The owned partitions if valid, otherwise return null. + */ +private List validateGenerationIdAndGetOwnedPartition( Review Comment: I was wondering if we can put off checking generation id to sync group and throw ILLEGAL_GENERATION there. The client will rejoin in such situation. That's what we did in the classic protocol. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1336,6 +1422,233 @@ private CoordinatorResult consumerGr return new CoordinatorResult<>(records, response); } +/** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param contextThe request context. + * @param requestThe actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ +private CoordinatorResult consumerGroupJoin( +ConsumerGroup group, +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) throws ApiException { +final long currentTimeMs = time.milliseconds(); +final List records = new ArrayList<>(); +final String groupId = request.groupId(); +String memberId = request.memberId(); +final String instanceId = request.groupInstanceId(); +final JoinGroupRequestProtocolCollection protocols = request.protocols(); + +final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); +List ownedTopicPartitions = null; + +if (!validateClassicGroupSessionTimeout(memberId, request.sessionTimeoutMs(), responseFuture)) { +return EMPTY_RESULT; +} +throwIfConsumerGroupIsFull(group, memberId); +// TODO: need to throw an exception if group is dead? + +// Get or create the member. +if (isUnknownMember) memberId = Uuid.randomUuid().toString(); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +boolean staticMemberReplaced = false; +boolean newMemberCreated = false; +if (instanceId == null) { +if (isUnknownMember && JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) { +// If member id required, send back a response to call for another join group request with allocated member id. +log.info("Dynamic member with unknown member id joins group {}. " + +"Created a new member id {} and requesting the member to rejoin with this id.", +
[jira] [Created] (KAFKA-16626) Uuid to String for subscribed topic names in assignment spec
Ritika Reddy created KAFKA-16626: Summary: Uuid to String for subscribed topic names in assignment spec Key: KAFKA-16626 URL: https://issues.apache.org/jira/browse/KAFKA-16626 Project: Kafka Issue Type: Sub-task Reporter: Ritika Reddy Assignee: Ritika Reddy In creating the assignment spec from the existing consumer subscription metadata, quite some time is spent in converting the String to a Uuid Change from Uuid to String for the subscribed topics in assignment spec and convert on the fly -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16614:Disallow @ClusterTemplate("") [kafka]
chia7712 commented on PR #15800: URL: https://github.com/apache/kafka/pull/15800#issuecomment-2078284518 @TaiJuWu Could you add a new test file (`ClusterTestExtensionsUnitTest`) to verify the function `processClusterTemplate`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1580210319 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1336,6 +1422,233 @@ private CoordinatorResult consumerGr return new CoordinatorResult<>(records, response); } +/** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param contextThe request context. + * @param requestThe actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ +private CoordinatorResult consumerGroupJoin( +ConsumerGroup group, +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) throws ApiException { +final long currentTimeMs = time.milliseconds(); +final List records = new ArrayList<>(); +final String groupId = request.groupId(); +String memberId = request.memberId(); +final String instanceId = request.groupInstanceId(); +final JoinGroupRequestProtocolCollection protocols = request.protocols(); + +final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); +List ownedTopicPartitions = null; + +if (!validateClassicGroupSessionTimeout(memberId, request.sessionTimeoutMs(), responseFuture)) { +return EMPTY_RESULT; +} +throwIfConsumerGroupIsFull(group, memberId); +// TODO: need to throw an exception if group is dead? + +// Get or create the member. +if (isUnknownMember) memberId = Uuid.randomUuid().toString(); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +boolean staticMemberReplaced = false; +boolean newMemberCreated = false; +if (instanceId == null) { +if (isUnknownMember && JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) { +// If member id required, send back a response to call for another join group request with allocated member id. +log.info("Dynamic member with unknown member id joins group {}. " + +"Created a new member id {} and requesting the member to rejoin with this id.", +group.groupId(), memberId); + +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.MEMBER_ID_REQUIRED.code()) +); +return EMPTY_RESULT; +} else { +// A dynamic member (re-)joins. +throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); +newMemberCreated = !group.hasMember(memberId); + +member = group.getOrMaybeCreateMember(memberId, true); +if (!newMemberCreated) ownedTopicPartitions = validateGenerationIdAndGetOwnedPartition(member, subscription); +log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +} +} else { +throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); +member = group.staticMember(instanceId); +// A new static member joins or the existing static member rejoins. +if (isUnknownMember) { +newMemberCreated = true; +if (member == null) { +// New static member. +member = group.getOrMaybeCreateMember(memberId, true); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId); +} else { +// Replace the current static member. +staticMemberReplaced = true; +updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) +.setAssignedPartitions(member.assignedPartitions()); +ownedTopicPartitions = validateGenerationIdAndGetOwnedPartition(member, subscription); +removeMemberAndCancelTimers(records, group.groupId(), member.memberId()); +log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId); +} +} else { +// Rejoining static member. Fence the
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1580203100 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1336,6 +1422,233 @@ private CoordinatorResult consumerGr return new CoordinatorResult<>(records, response); } +/** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param contextThe request context. + * @param requestThe actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ +private CoordinatorResult consumerGroupJoin( +ConsumerGroup group, +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) throws ApiException { +final long currentTimeMs = time.milliseconds(); +final List records = new ArrayList<>(); +final String groupId = request.groupId(); +String memberId = request.memberId(); +final String instanceId = request.groupInstanceId(); +final JoinGroupRequestProtocolCollection protocols = request.protocols(); + +final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); +List ownedTopicPartitions = null; + +if (!validateClassicGroupSessionTimeout(memberId, request.sessionTimeoutMs(), responseFuture)) { +return EMPTY_RESULT; +} +throwIfConsumerGroupIsFull(group, memberId); +// TODO: need to throw an exception if group is dead? + +// Get or create the member. +if (isUnknownMember) memberId = Uuid.randomUuid().toString(); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +boolean staticMemberReplaced = false; +boolean newMemberCreated = false; +if (instanceId == null) { +if (isUnknownMember && JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) { +// If member id required, send back a response to call for another join group request with allocated member id. +log.info("Dynamic member with unknown member id joins group {}. " + +"Created a new member id {} and requesting the member to rejoin with this id.", +group.groupId(), memberId); + +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.MEMBER_ID_REQUIRED.code()) +); +return EMPTY_RESULT; +} else { +// A dynamic member (re-)joins. +throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); +newMemberCreated = !group.hasMember(memberId); + +member = group.getOrMaybeCreateMember(memberId, true); +if (!newMemberCreated) ownedTopicPartitions = validateGenerationIdAndGetOwnedPartition(member, subscription); +log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +} +} else { +throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); +member = group.staticMember(instanceId); +// A new static member joins or the existing static member rejoins. +if (isUnknownMember) { +newMemberCreated = true; +if (member == null) { +// New static member. +member = group.getOrMaybeCreateMember(memberId, true); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId); +} else { +// Replace the current static member. +staticMemberReplaced = true; +updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) +.setAssignedPartitions(member.assignedPartitions()); +ownedTopicPartitions = validateGenerationIdAndGetOwnedPartition(member, subscription); +removeMemberAndCancelTimers(records, group.groupId(), member.memberId()); +log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId); +} +} else { +// Rejoining static member. Fence the
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on PR #15761: URL: https://github.com/apache/kafka/pull/15761#issuecomment-2078268366 @brandboat could you please fix the conflicts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16625) Reverse Lookup Partition to Member in Assignors
Ritika Reddy created KAFKA-16625: Summary: Reverse Lookup Partition to Member in Assignors Key: KAFKA-16625 URL: https://issues.apache.org/jira/browse/KAFKA-16625 Project: Kafka Issue Type: Sub-task Reporter: Ritika Reddy Assignee: Ritika Reddy Calculating unassigned partitions within the Uniform assignor is a costly process, this can be improved by using a reverse lookup map between topicIdPartition and the member -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]
chia7712 commented on code in PR #15803: URL: https://github.com/apache/kafka/pull/15803#discussion_r1580186766 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -1789,6 +1791,33 @@ public void testProcessBackgroundEventsTimesOut() throws Exception { } } +/** + * Tests that calling {@link Thread#interrupt()} before {@link KafkaConsumer#poll(Duration)} + * causes {@link InterruptException} to be thrown. + */ +@Test +public void testPollThrowsInterruptExceptionIfInterrupted() { Review Comment: Is it possible to merge this new test into https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L1263? If not, maybe we can remove the TODO. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1580183434 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1336,6 +1422,233 @@ private CoordinatorResult consumerGr return new CoordinatorResult<>(records, response); } +/** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param contextThe request context. + * @param requestThe actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ +private CoordinatorResult consumerGroupJoin( +ConsumerGroup group, +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) throws ApiException { +final long currentTimeMs = time.milliseconds(); +final List records = new ArrayList<>(); +final String groupId = request.groupId(); +String memberId = request.memberId(); +final String instanceId = request.groupInstanceId(); +final JoinGroupRequestProtocolCollection protocols = request.protocols(); + +final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); +List ownedTopicPartitions = null; + +if (!validateClassicGroupSessionTimeout(memberId, request.sessionTimeoutMs(), responseFuture)) { +return EMPTY_RESULT; +} +throwIfConsumerGroupIsFull(group, memberId); +// TODO: need to throw an exception if group is dead? + +// Get or create the member. +if (isUnknownMember) memberId = Uuid.randomUuid().toString(); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +boolean staticMemberReplaced = false; +boolean newMemberCreated = false; +if (instanceId == null) { +if (isUnknownMember && JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) { +// If member id required, send back a response to call for another join group request with allocated member id. +log.info("Dynamic member with unknown member id joins group {}. " + +"Created a new member id {} and requesting the member to rejoin with this id.", +group.groupId(), memberId); + +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.MEMBER_ID_REQUIRED.code()) +); +return EMPTY_RESULT; +} else { +// A dynamic member (re-)joins. +throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); +newMemberCreated = !group.hasMember(memberId); + +member = group.getOrMaybeCreateMember(memberId, true); +if (!newMemberCreated) ownedTopicPartitions = validateGenerationIdAndGetOwnedPartition(member, subscription); +log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +} +} else { +throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); +member = group.staticMember(instanceId); +// A new static member joins or the existing static member rejoins. +if (isUnknownMember) { +newMemberCreated = true; +if (member == null) { +// New static member. +member = group.getOrMaybeCreateMember(memberId, true); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId); +} else { +// Replace the current static member. +staticMemberReplaced = true; +updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) +.setAssignedPartitions(member.assignedPartitions()); +ownedTopicPartitions = validateGenerationIdAndGetOwnedPartition(member, subscription); +removeMemberAndCancelTimers(records, group.groupId(), member.memberId()); Review Comment: Yeah I think it makes sense. We can remove it without canceling the timers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1580183181 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1336,6 +1422,233 @@ private CoordinatorResult consumerGr return new CoordinatorResult<>(records, response); } +/** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param contextThe request context. + * @param requestThe actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ +private CoordinatorResult consumerGroupJoin( +ConsumerGroup group, +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) throws ApiException { +final long currentTimeMs = time.milliseconds(); +final List records = new ArrayList<>(); +final String groupId = request.groupId(); +String memberId = request.memberId(); +final String instanceId = request.groupInstanceId(); +final JoinGroupRequestProtocolCollection protocols = request.protocols(); + +final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); +List ownedTopicPartitions = null; + +if (!validateClassicGroupSessionTimeout(memberId, request.sessionTimeoutMs(), responseFuture)) { +return EMPTY_RESULT; +} +throwIfConsumerGroupIsFull(group, memberId); +// TODO: need to throw an exception if group is dead? + +// Get or create the member. +if (isUnknownMember) memberId = Uuid.randomUuid().toString(); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +boolean staticMemberReplaced = false; +boolean newMemberCreated = false; +if (instanceId == null) { +if (isUnknownMember && JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) { +// If member id required, send back a response to call for another join group request with allocated member id. +log.info("Dynamic member with unknown member id joins group {}. " + +"Created a new member id {} and requesting the member to rejoin with this id.", +group.groupId(), memberId); + +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.MEMBER_ID_REQUIRED.code()) +); +return EMPTY_RESULT; +} else { +// A dynamic member (re-)joins. +throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); +newMemberCreated = !group.hasMember(memberId); + +member = group.getOrMaybeCreateMember(memberId, true); +if (!newMemberCreated) ownedTopicPartitions = validateGenerationIdAndGetOwnedPartition(member, subscription); +log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +} +} else { +throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); +member = group.staticMember(instanceId); +// A new static member joins or the existing static member rejoins. +if (isUnknownMember) { +newMemberCreated = true; +if (member == null) { +// New static member. +member = group.getOrMaybeCreateMember(memberId, true); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId); +} else { +// Replace the current static member. Review Comment: If a static member joins without a member id, we replace any existing member with the same instance id; if the static member joins with a member id, then we treat it as a normal rejoin if the static member exists and the member id matches, and throw an exception otherwise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1580182980 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1336,6 +1422,233 @@ private CoordinatorResult consumerGr return new CoordinatorResult<>(records, response); } +/** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param contextThe request context. + * @param requestThe actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ +private CoordinatorResult consumerGroupJoin( +ConsumerGroup group, +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) throws ApiException { +final long currentTimeMs = time.milliseconds(); +final List records = new ArrayList<>(); +final String groupId = request.groupId(); +String memberId = request.memberId(); +final String instanceId = request.groupInstanceId(); +final JoinGroupRequestProtocolCollection protocols = request.protocols(); + +final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); +List ownedTopicPartitions = null; + +if (!validateClassicGroupSessionTimeout(memberId, request.sessionTimeoutMs(), responseFuture)) { +return EMPTY_RESULT; +} +throwIfConsumerGroupIsFull(group, memberId); +// TODO: need to throw an exception if group is dead? + +// Get or create the member. +if (isUnknownMember) memberId = Uuid.randomUuid().toString(); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +boolean staticMemberReplaced = false; +boolean newMemberCreated = false; +if (instanceId == null) { +if (isUnknownMember && JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) { +// If member id required, send back a response to call for another join group request with allocated member id. +log.info("Dynamic member with unknown member id joins group {}. " + +"Created a new member id {} and requesting the member to rejoin with this id.", +group.groupId(), memberId); + +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.MEMBER_ID_REQUIRED.code()) +); +return EMPTY_RESULT; +} else { +// A dynamic member (re-)joins. +throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); +newMemberCreated = !group.hasMember(memberId); + +member = group.getOrMaybeCreateMember(memberId, true); +if (!newMemberCreated) ownedTopicPartitions = validateGenerationIdAndGetOwnedPartition(member, subscription); Review Comment: I think we don't need to for new members, but it doesn't hurt to also do the check since we always accept the empty ownedPartitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
chia7712 commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1580175708 ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -620,16 +620,19 @@ class ZkMigrationIntegrationTest { // Alter the metadata log.info("Updating metadata with AdminClient") admin = zkCluster.createAdminClient() - alterTopicConfig(admin).all().get(60, TimeUnit.SECONDS) - alterClientQuotas(admin).all().get(60, TimeUnit.SECONDS) + alterTopicConfig(admin) + alterClientQuotas(admin) + alterBrokerConfigs(admin) // Verify the changes made to KRaft are seen in ZK log.info("Verifying metadata changes with ZK") verifyTopicConfigs(zkClient) verifyClientQuotas(zkClient) + verifyBrokerConfigs(zkClient) val nextKRaftProducerId = sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30, TimeUnit.SECONDS) assertNotEquals(nextProducerId, nextKRaftProducerId) - +} catch { + case t: Throwable => fail("Uncaught error in test", t) Review Comment: > I've seen many cases where its hard to see where a test failing due to a throw. Could you please share the error stack to me? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
chia7712 commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1580174626 ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -852,6 +855,33 @@ class ZkMigrationIntegrationTest { } } + @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array( +new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), +new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), +new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), +new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"), + )) + def testIncrementalAlterConfigsPreMigration(zkCluster: ClusterInstance): Unit = { +// Enable migration configs and restart brokers without KRaft quorum ready + zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") +zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, "1@localhost:") Review Comment: Sorry that I just notice this PR after merging the refactor-related PR :_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in metadata [kafka]
chia7712 merged PR #15806: URL: https://github.com/apache/kafka/pull/15806 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in metadata [kafka]
chia7712 commented on PR #15806: URL: https://github.com/apache/kafka/pull/15806#issuecomment-2078233197 ``` ./gradlew cleanTest :streams:test --tests StreamsAssignmentScaleTest.testHighAvailabilityTaskAssignorManyStandbys :tools:test --tests MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful --tests DeleteOffsetsConsumerGroupCommandIntegrationTest.testDeleteOffsetsOfStableConsumerGroupWithTopicOnly --tests DeleteOffsetsConsumerGroupCommandIntegrationTest.testDeleteOffsetsOfStableConsumerGroupWithTopicPartition --tests DeleteOffsetsConsumerGroupCommandIntegrationTest.testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly :storage:test --tests TopicBasedRemoteLogMetadataManagerTest.testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch --tests TransactionsWithTieredStoreTest.testSendOffsetsToTransactionTimeout --tests TransactionsWithTieredStoreTest.testAbortTransactionTimeout :connect:runtime:test --tests org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic :metadata:test --tests QuorumControllerMetricsIntegrationTest.testTimeoutMetrics :tr ogdor:test --tests CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated :connect:mirror:test --tests IdentityReplicationIntegrationTest.testSyncTopicConfigs --tests MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicateSourceDefault :core:test --tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests ReplicationQuotasTest.shouldThrottleOldSegments ``` don't notice related failure, and they pass on my local. will merge it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1580159998 ## clients/src/main/resources/common/message/VotersRecord.json: ## @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "VotersRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "Version", "type": "int16", "versions": "0+", + "about": "The version of the voters record" }, +{ "name": "Voters", "type": "[]Voter", "versions": "0+", "fields": [ + { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId", +"about": "The replica id of the voter in the topic partition" }, + { "name": "VoterUuid", "type": "uuid", "versions": "0+", Review Comment: Sounds good to me. I'll update the KIP and the other schemas that use a similar name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1580158966 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java: ## @@ -807,4 +809,62 @@ private static void writeSnapshotFooterRecord( builder.appendSnapshotFooterMessage(timestamp, snapshotFooterRecord); } } + +public static MemoryRecords withKRaftVersionRecord( +long initialOffset, +long timestamp, +int leaderEpoch, +ByteBuffer buffer, +KRaftVersionRecord kraftVersionRecord +) { +writeKRaftVersionRecord(buffer, initialOffset, timestamp, leaderEpoch, kraftVersionRecord); +buffer.flip(); +return MemoryRecords.readableRecords(buffer); +} + +private static void writeKRaftVersionRecord( +ByteBuffer buffer, +long initialOffset, +long timestamp, +int leaderEpoch, +KRaftVersionRecord kraftVersionRecord +) { +try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder( +buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, +TimestampType.CREATE_TIME, initialOffset, timestamp, +RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, +false, true, leaderEpoch, buffer.capacity()) +) { +builder.appendKRaftVersionMessage(timestamp, kraftVersionRecord); +} +} + +public static MemoryRecords withVotersRecord( +long initialOffset, +long timestamp, +int leaderEpoch, +ByteBuffer buffer, +VotersRecord votersRecord +) { +writeVotersRecord(buffer, initialOffset, timestamp, leaderEpoch, votersRecord); Review Comment: What do you mean we by "we create two MemoryRecords instances"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1580155319 ## clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java: ## @@ -44,11 +44,15 @@ public enum ControlRecordType { ABORT((short) 0), COMMIT((short) 1), -// Raft quorum related control messages. +// KRaft quorum related control messages LEADER_CHANGE((short) 2), SNAPSHOT_HEADER((short) 3), SNAPSHOT_FOOTER((short) 4), +// KRaft membership changes messages +KRAFT_VERSION((short) 5), +VOTERS((short) 6), Review Comment: Sounds. Fixed for KRAFT_VOTERS. I'll fix the rest in another PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12534) kafka-configs does not work with ssl enabled kafka broker.
[ https://issues.apache.org/jira/browse/KAFKA-12534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840985#comment-17840985 ] keith.paulson edited comment on KAFKA-12534 at 4/25/24 9:26 PM: I can reproduce this with kafka 3.6.0 and BCFKS keystores. Changing keystore and password gives {code:java} ERROR Encountered metadata publishing fault: Error updating node with new configuration: listener.name.SSL.ssl.key.password -> [hidden],listener.name.SSL.ssl.keystore.location -> /etc/ssl/ {code} {color:#910091}cert{color} {code:java} .bcfks in MetadataDelta up to 5940236 (org.apache.kafka.server.fault.LoggingFaultHandler) org.apache.kafka.common.config.ConfigException: Validation of dynamic config update of SSLFactory failed: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /etc/ssl/ {code} {color:#910091}cert{color} {code:java} .bcfks of type BCFKS {code} but on a kafka restart, that keystore/password combination works fine If I change just the keystore (ie new keystore created using same password as previous one), the cert change works {code:java} [2024-04-25 21:15:47,065] INFO [DynamicConfigPublisher broker id=1] Updating node 1 with new configuration : listener.name.SSL.ssl.keystore.location -> /etc/ssl/ {code} {color:#910091}cert{color} {code:java} .bcfks (kafka.server.metadata.DynamicConfigPublisher) {code} Not being able to change passwords is a significant limitation. was (Author: JIRAUSER299451): I can reproduce this with kafka 3.6.0 and BCFKS keystores. Changing keystore and password gives {code:java} ERROR Encountered metadata publishing fault: Error updating node with new configuration: listener.name.SSL.ssl.key.password -> [hidden],listener.name.SSL.ssl.keystore.location -> /etc/ssl/private/kafkachain.bcfks in MetadataDelta up to 5940236 (org.apache.kafka.server.fault.LoggingFaultHandler) org.apache.kafka.common.config.ConfigException: Validation of dynamic config update of SSLFactory failed: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /etc/ssl/private/kafkachain.bcfks of type BCFKS {code} but on a kafka restart, that keystore/password combination works fine If I change just the keystore (ie new keystore created using same password as previous one) {code:java} [2024-04-25 21:15:47,065] INFO [DynamicConfigPublisher broker id=1] Updating node 1 with new configuration : listener.name.SSL.ssl.keystore.location -> /etc/ssl/private/kafkachain.bcfks (kafka.server.metadata.DynamicConfigPublisher) {code} Not being able to change passwords is a significant limitation. > kafka-configs does not work with ssl enabled kafka broker. > -- > > Key: KAFKA-12534 > URL: https://issues.apache.org/jira/browse/KAFKA-12534 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.1 >Reporter: kaushik srinivas >Priority: Critical > > We are trying to change the trust store password on the fly using the > kafka-configs script for a ssl enabled kafka broker. > Below is the command used: > kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers > --entity-name 1001 --alter --add-config 'ssl.truststore.password=xxx' > But we see below error in the broker logs when the command is run. > {"type":"log", "host":"kf-2-0", "level":"INFO", > "neid":"kafka-cfd5ccf2af7f47868e83473408", "system":"kafka", > "time":"2021-03-23T12:14:40.055", "timezone":"UTC", > "log":\{"message":"data-plane-kafka-network-thread-1002-ListenerName(SSL)-SSL-2 > - org.apache.kafka.common.network.Selector - [SocketServer brokerId=1002] > Failed authentication with /127.0.0.1 (SSL handshake failed)"}} > How can anyone configure ssl certs for the kafka-configs script and succeed > with the ssl handshake in this case ? > Note : > We are trying with a single listener i.e SSL: -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12534) kafka-configs does not work with ssl enabled kafka broker.
[ https://issues.apache.org/jira/browse/KAFKA-12534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840985#comment-17840985 ] keith.paulson commented on KAFKA-12534: --- I can reproduce this with kafka 3.6.0 and BCFKS keystores. Changing keystore and password gives {code:java} ERROR Encountered metadata publishing fault: Error updating node with new configuration: listener.name.SSL.ssl.key.password -> [hidden],listener.name.SSL.ssl.keystore.location -> /etc/ssl/private/kafkachain.bcfks in MetadataDelta up to 5940236 (org.apache.kafka.server.fault.LoggingFaultHandler) org.apache.kafka.common.config.ConfigException: Validation of dynamic config update of SSLFactory failed: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /etc/ssl/private/kafkachain.bcfks of type BCFKS {code} but on a kafka restart, that keystore/password combination works fine If I change just the keystore (ie new keystore created using same password as previous one) {code:java} [2024-04-25 21:15:47,065] INFO [DynamicConfigPublisher broker id=1] Updating node 1 with new configuration : listener.name.SSL.ssl.keystore.location -> /etc/ssl/private/kafkachain.bcfks (kafka.server.metadata.DynamicConfigPublisher) {code} Not being able to change passwords is a significant limitation. > kafka-configs does not work with ssl enabled kafka broker. > -- > > Key: KAFKA-12534 > URL: https://issues.apache.org/jira/browse/KAFKA-12534 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.1 >Reporter: kaushik srinivas >Priority: Critical > > We are trying to change the trust store password on the fly using the > kafka-configs script for a ssl enabled kafka broker. > Below is the command used: > kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers > --entity-name 1001 --alter --add-config 'ssl.truststore.password=xxx' > But we see below error in the broker logs when the command is run. > {"type":"log", "host":"kf-2-0", "level":"INFO", > "neid":"kafka-cfd5ccf2af7f47868e83473408", "system":"kafka", > "time":"2021-03-23T12:14:40.055", "timezone":"UTC", > "log":\{"message":"data-plane-kafka-network-thread-1002-ListenerName(SSL)-SSL-2 > - org.apache.kafka.common.network.Selector - [SocketServer brokerId=1002] > Failed authentication with /127.0.0.1 (SSL handshake failed)"}} > How can anyone configure ssl certs for the kafka-configs script and succeed > with the ssl handshake in this case ? > Note : > We are trying with a single listener i.e SSL: -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1580152614 ## clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java: ## @@ -26,7 +26,7 @@ /** * Represents an immutable basic version range using 2 attributes: min and max, each of type short. * The min and max attributes need to satisfy 2 rules: - * - they are each expected to be >= 1, as we only consider positive version values to be valid. Review Comment: Yes. I think this was missed during the original implementation. The default value for any feature version is 0 but that cannot be expressed in the range of supported versions since it doesn't allow 0 as the min or max value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1580150231 ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -852,6 +855,33 @@ class ZkMigrationIntegrationTest { } } + @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array( +new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), +new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), +new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), +new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"), + )) + def testIncrementalAlterConfigsPreMigration(zkCluster: ClusterInstance): Unit = { +// Enable migration configs and restart brokers without KRaft quorum ready + zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") +zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, "1@localhost:") Review Comment: The config changes are killing me in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1580149700 ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -620,16 +620,19 @@ class ZkMigrationIntegrationTest { // Alter the metadata log.info("Updating metadata with AdminClient") admin = zkCluster.createAdminClient() - alterTopicConfig(admin).all().get(60, TimeUnit.SECONDS) - alterClientQuotas(admin).all().get(60, TimeUnit.SECONDS) + alterTopicConfig(admin) + alterClientQuotas(admin) + alterBrokerConfigs(admin) // Verify the changes made to KRaft are seen in ZK log.info("Verifying metadata changes with ZK") verifyTopicConfigs(zkClient) verifyClientQuotas(zkClient) + verifyBrokerConfigs(zkClient) val nextKRaftProducerId = sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30, TimeUnit.SECONDS) assertNotEquals(nextProducerId, nextKRaftProducerId) - +} catch { + case t: Throwable => fail("Uncaught error in test", t) Review Comment: Just my preference I guess. I've seen many cases where its hard to see where a test failing due to a throw. Maybe this is better in recent versions of JUnit? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]
rreddy-22 commented on code in PR #15785: URL: https://github.com/apache/kafka/pull/15785#discussion_r1580133686 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -687,7 +688,7 @@ public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignme currentAssignmentForC )); -AssignmentSpec assignmentSpec = new AssignmentSpec(members); +AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); Review Comment: Yeah it covers it. Nothing changed in the hetero path anyways -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
chia7712 commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1580130359 ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -950,16 +980,47 @@ class ZkMigrationIntegrationTest { dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get } - def alterTopicConfig(admin: Admin): AlterConfigsResult = { + def alterBrokerConfigs(admin: Admin): Unit = { +val defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, "") +val defaultBrokerConfigs = Seq( + new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "8640"), AlterConfigOp.OpType.SET), +).asJavaCollection +val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0") +val broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1") +val specificBrokerConfigs = Seq( + new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "4320"), AlterConfigOp.OpType.SET), +).asJavaCollection + +TestUtils.retry(6) { + val result = admin.incrementalAlterConfigs(Map( +defaultBrokerResource -> defaultBrokerConfigs, +broker0Resource -> specificBrokerConfigs, +broker1Resource -> specificBrokerConfigs + ).asJava) + try { +result.all().get(10, TimeUnit.SECONDS) + } catch { +case t: Throwable => fail("Alter Broker Configs had an error", t) + } +} Review Comment: I have the same question here. 1. In the test case `testIncrementalAlterConfigsPreMigration`, it should pass at once since KRaft quorum is not ready. 2. In the test `testDualWrite`, it should pass at once as we have wait for migration to begin. ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -974,7 +1035,11 @@ class ZkMigrationIntegrationTest { quotas.add(new ClientQuotaAlteration( new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava), List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava)) -admin.alterClientQuotas(quotas) +try { + admin.alterClientQuotas(quotas).all().get(10, TimeUnit.SECONDS) +} catch { + case t: Throwable => fail("Alter Client Quotas had an error", t) Review Comment: ditto. not sure why we need to catch it and then call `fail` ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -620,16 +620,19 @@ class ZkMigrationIntegrationTest { // Alter the metadata log.info("Updating metadata with AdminClient") admin = zkCluster.createAdminClient() - alterTopicConfig(admin).all().get(60, TimeUnit.SECONDS) - alterClientQuotas(admin).all().get(60, TimeUnit.SECONDS) + alterTopicConfig(admin) + alterClientQuotas(admin) + alterBrokerConfigs(admin) // Verify the changes made to KRaft are seen in ZK log.info("Verifying metadata changes with ZK") verifyTopicConfigs(zkClient) verifyClientQuotas(zkClient) + verifyBrokerConfigs(zkClient) val nextKRaftProducerId = sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30, TimeUnit.SECONDS) assertNotEquals(nextProducerId, nextKRaftProducerId) - +} catch { + case t: Throwable => fail("Uncaught error in test", t) Review Comment: why catching the error in this test case? The test case can get failed even though we don't catch it. Or you plan to make it be retryable by `TestUtils.retry`? ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -852,6 +855,33 @@ class ZkMigrationIntegrationTest { } } + @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array( +new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), +new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), +new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), +new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"), + )) + def testIncrementalAlterConfigsPreMigration(zkCluster: ClusterInstance): Unit = { +// Enable migration configs and restart brokers without KRaft quorum ready + zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") +zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, "1@localhost:") Review Comment: `RaftConfig` is renamed to `QuorumConfig` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about
[jira] [Created] (KAFKA-16624) Don't generate useless PartitionChangeRecord on older MV
Colin McCabe created KAFKA-16624: Summary: Don't generate useless PartitionChangeRecord on older MV Key: KAFKA-16624 URL: https://issues.apache.org/jira/browse/KAFKA-16624 Project: Kafka Issue Type: Bug Reporter: Colin McCabe Assignee: Colin McCabe Fix a case where we could generate useless PartitionChangeRecords on metadata versions older than 3.6-IV0. This could happen in the case where we had an ISR with only one broker in it, and we were trying to go down to a fully empty ISR. In this case, PartitionChangeBuilder would block the record to going down to a fully empty ISR (since that is not valid in these pre-KIP-966 metadata versions), but it would still emit the record, even though it had no effect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]
rreddy-22 commented on code in PR #15785: URL: https://github.com/apache/kafka/pull/15785#discussion_r1580115582 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -90,17 +91,29 @@ private Map> membersPerTopic(final AssignmentSpec assignmentS Map> membersPerTopic = new HashMap<>(); Map membersData = assignmentSpec.members(); -membersData.forEach((memberId, memberMetadata) -> { -Collection topics = memberMetadata.subscribedTopicIds(); +if (assignmentSpec.groupSubscriptionModel().equals(HOMOGENEOUS)) { +List allMembers = new ArrayList<>(membersData.keySet()); Review Comment: Thanks! Made the change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]
kirktrue commented on PR #15750: URL: https://github.com/apache/kafka/pull/15750#issuecomment-2078159039 I think I would limit it to the request state classes, just to keep the changes to a minimum -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]
kirktrue commented on code in PR #15647: URL: https://github.com/apache/kafka/pull/15647#discussion_r1580103763 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java: ## @@ -327,21 +327,27 @@ private void handleInitializeErrors(final CompletedFetch completedFetch, final E final long fetchOffset = completedFetch.nextFetchOffset(); if (error == Errors.NOT_LEADER_OR_FOLLOWER || -error == Errors.REPLICA_NOT_AVAILABLE || +error == Errors.FENCED_LEADER_EPOCH) { +log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); +requestMetadataUpdate(metadata, subscriptions, tp); +} else if (error == Errors.REPLICA_NOT_AVAILABLE || error == Errors.KAFKA_STORAGE_ERROR || -error == Errors.FENCED_LEADER_EPOCH || error == Errors.OFFSET_NOT_AVAILABLE) { log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); requestMetadataUpdate(metadata, subscriptions, tp); +subscriptions.awaitUpdate(tp); Review Comment: With this change, if the replica is not available, we will flag the partition as awaiting a metadata update. Is this a key part of this change? Why don't we want the first `if` block to also await an update? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java: ## @@ -967,7 +984,8 @@ private boolean maybeValidatePosition(Metadata.LeaderAndEpoch currentLeaderAndEp return false; } -if (position != null && !position.currentLeader.equals(currentLeaderAndEpoch)) { +if (position != null && +(!position.currentLeader.equals(currentLeaderAndEpoch) || this.fetchState.equals(FetchStates.AWAIT_UPDATE))) { Review Comment: Not sure if using the helper method shortens the line length enough to avoid wrapping 路♂️ ```suggestion if (position != null && (!position.currentLeader.equals(currentLeaderAndEpoch) || awaitingUpdate())) { ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget, if (partitionData.currentLeader().leaderId() != -1 && partitionData.currentLeader().leaderEpoch() != -1) { partitionsWithUpdatedLeaderInfo.put(partition, new Metadata.LeaderIdAndEpoch( Optional.of(partitionData.currentLeader().leaderId()), Optional.of(partitionData.currentLeader().leaderEpoch(; +} else { +requestMetadataUpdate(metadata, subscriptions, partition); +subscriptions.awaitUpdate(partition); Review Comment: With this change, we first request a metadata update, then flag our partition as awaiting the metadata update whenever we encounter a `NOT_LEADER_OR_FOLLOWER` or `FENCED_LEADER_EPOCH`. However, in the `FetchCollector.handleInitializeErrors()` method, we only only request the metadata update, but _don't_ flag the partition. Is that seeming inconsistency intentional? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]
phooq commented on PR #15750: URL: https://github.com/apache/kafka/pull/15750#issuecomment-2078128947 Thanks for the suggestion @kirktrue . I will use `toStringDetails()` then. Would you suggest I change the methods in the `***Event` classes as well, or just focus on `RequestState` for this PR? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840966#comment-17840966 ] Edoardo Comar commented on KAFKA-16622: --- Activating DEBUG logging ``` [2024-04-25 21:20:10,856] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(mygroup1,mytopic-0,13805): Skipped (OffsetSync{topicPartition=mytopic-0, upstreamOffset=1, downstreamOffset=1} is ahead of upstream consumer group 13805) (org.apache.kafka.connect.mirror.OffsetSyncStore:125) ``` The checkpoint is not emitted because the topic-partition has been mirrorred further than where the consumer group is, so until the group catches up no checkpoints will be emitted. Question for [~gregharris73] this behavior would mean that any consumers in groups that are behind the log end that are switched from consuming from source cluster to the target cluster to reprocess the entire partition ? They would have access to no translated offsets. > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15307: Update/errors for deprecated config [kafka]
Cerchie commented on code in PR #14448: URL: https://github.com/apache/kafka/pull/14448#discussion_r1580062597 ## .github/workflows/codesee-arch-diagram.yml: ## @@ -0,0 +1,23 @@ +# This workflow was added by CodeSee. Learn more at https://codesee.io/ Review Comment: removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
hachikuji commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1576879728 ## clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java: ## @@ -26,7 +26,7 @@ /** * Represents an immutable basic version range using 2 attributes: min and max, each of type short. * The min and max attributes need to satisfy 2 rules: - * - they are each expected to be >= 1, as we only consider positive version values to be valid. Review Comment: Was it a bug that we only allowed version 1 and above? I'm wondering if we really need to change it. ## clients/src/main/resources/common/message/VotersRecord.json: ## @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "VotersRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "Version", "type": "int16", "versions": "0+", + "about": "The version of the voters record" }, +{ "name": "Voters", "type": "[]Voter", "versions": "0+", "fields": [ + { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId", +"about": "The replica id of the voter in the topic partition" }, + { "name": "VoterUuid", "type": "uuid", "versions": "0+", Review Comment: Why don't we call it `VoterDirectoryId`? ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.feature.SupportedVersionRange; + +/** + * A type for representing the set of voters for a topic partition. + * + * It encapsulates static information like a voter's endpoint and their supported kraft.version. + * + * It providees functionality for converting to and from {@code VotersRecord} and for converting + * from the static configuration. + */ +final public class VoterSet { +private final Map voters; + +VoterSet(Map voters) { +if (voters.isEmpty()) { +throw new IllegalArgumentException("Voters cannot be empty"); +} + +this.voters = voters; +} + +/** + * Returns the socket address for a given voter at a given listener. + * + * @param voter the id of the voter + * @param listener the name of the listener + * @return the socket address if it exist, otherwise {@code Optional.empty()} + */ +public Optional voterAddress(int voter, String listener) { +return Optional.ofNullable(voters.get(voter)) +.flatMap(voterNode -> voterNode.address(listener)); +} + +/** + * Returns all of the voter ids. + */ +public Set voterIds() { +return voters.keySet(); +} + +/** + * Adds a voter to the voter set. + * + * This object is immutable. A new voter set is returned if the voter was added. + * + * A new voter can be added to a voter set if its id doesn't already exist in the
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1580024742 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.consumer.group; + +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import static java.util.Collections.singleton; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.common.GroupType.CONSUMER; + +public class ConsumerGroupExecutor implements AutoCloseable { Review Comment: Remove the `public` to make it be package-private ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.consumer.group; + +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import static java.util.Collections.singleton; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static
[PR] MINOR: Change the documentation of the Brokers field. [kafka]
emasab opened a new pull request, #15809: URL: https://github.com/apache/kafka/pull/15809 It doesn't contain all the brokers in the response: in case a broker is down it will be listed in some partition "Replicas" but not be present in this array. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Rename RaftConfig to QuorumConfigs [kafka]
chia7712 merged PR #15797: URL: https://github.com/apache/kafka/pull/15797 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Modified System.getProperty("line.separator") to java.lang.System.lineSeparator() [kafka]
chia7712 merged PR #15782: URL: https://github.com/apache/kafka/pull/15782 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor
[ https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840907#comment-17840907 ] Matthias J. Sax commented on KAFKA-16585: - {quote}I can use the regular Processor, but as I understand it add some overhead comparing with FixedKeyProcessor {quote} Where did you get this? The Processor itself does not have overhead. – The only think that could happen downstream is, that a unnecessary repartition step could be inserted. We are tackling this via [https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling] {quote}{color:#172b4d}Really, I think FixedKeyProcessor do not need to be "ensure that the key is not changed". IMHO there is enough to have a key from the same partition. So, if you will provide the way to generate the {color}*FixedKeyRecord*{color:#172b4d} from any local store it will be enough.{color} {quote} {color:#172b4d}Well, technically yes, but there is no simply way to enforce/check this... We would need to serialize the provided key, pipe it through the Partitioner, and compare the computed partition. Or is there some other way to do this? – This would be quite expensive to do.{color} {color:#172b4d}If you feel strong about all this, feel free to do a POC PR and write a KIP about it, and we can take it from there. I don't see a simple way to do it, and I believe that using a regular Processor is the right way to go (especially with KIP-759 on the horizon). {color} > No way to forward message from punctuation method in the FixedKeyProcessor > -- > > Key: KAFKA-16585 > URL: https://issues.apache.org/jira/browse/KAFKA-16585 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.2 >Reporter: Stanislav Spiridonov >Priority: Major > > The FixedKeyProcessorContext can forward only FixedKeyRecord. This class > doesn't have a public constructor and can be created based on existing > records. But such record usually is absent in the punctuation method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on PR #15640: URL: https://github.com/apache/kafka/pull/15640#issuecomment-2077870915 @cadonna—thanks for your review. I have made the requested changes, so please take another pass. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1579915561 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1338,7 +1339,14 @@ private CompletableFuture enqueueConsumerRebalanceListenerCallback(Consume Set partitions) { SortedSet sortedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); sortedPartitions.addAll(partitions); -CompletableBackgroundEvent event = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions); + +// We don't yet have the concept of having an expiring callback, but we will likely want that eventually. +Timer timer = time.timer(Long.MAX_VALUE); +CompletableBackgroundEvent event = new ConsumerRebalanceListenerCallbackNeededEvent( +methodName, +sortedPartitions, +timer +); Review Comment: I was able to refactor the code and this is now sans `Timer` again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1579914824 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -769,7 +730,8 @@ public void commitAsync(OffsetCommitCallback callback) { public void commitAsync(Map offsets, OffsetCommitCallback callback) { acquireAndEnsureOpen(); try { -AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets); +Timer timer = time.timer(Long.MAX_VALUE); +AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets, timer); Review Comment: I was able to refactor the code to eliminate the need for passing in a `Timer`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1579897029 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java: ## @@ -36,13 +36,14 @@ protected CompletableApplicationEvent(final Type type, final Timer timer) { super(type); this.future = new CompletableFuture<>(); Objects.requireNonNull(timer); -this.deadlineMs = timer.remainingMs() + timer.currentTimeMs(); -} -protected CompletableApplicationEvent(final Type type, final long deadlineMs) { -super(type); -this.future = new CompletableFuture<>(); -this.deadlineMs = deadlineMs; +long currentTimeMs = timer.currentTimeMs(); +long remainingMs = timer.remainingMs(); + +if (currentTimeMs > Long.MAX_VALUE - remainingMs) +this.deadlineMs = Long.MAX_VALUE; +else +this.deadlineMs = currentTimeMs + remainingMs; Review Comment: For point 1, I created a new method named `calculateDeadlineMs` that moves this code into one place. For point 2, there is no `Timer` in the event because `Timer` is not thread safe, and events cross the application/background thread boundary. I did not want to expose the `Timer` in the event to avoid its possible usage from the background thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1579899408 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java: ## @@ -16,120 +16,31 @@ */ package org.apache.kafka.clients.consumer.internals.events; -import org.apache.kafka.clients.consumer.internals.ConsumerUtils; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.internals.IdempotentCloser; -import org.apache.kafka.common.utils.LogContext; -import org.slf4j.Logger; - -import java.io.Closeable; -import java.util.LinkedList; -import java.util.List; -import java.util.Objects; -import java.util.Optional; import java.util.concurrent.BlockingQueue; /** - * An {@link EventProcessor} is the means by which events produced by thread A are - * processed by thread B. By definition, threads A and B run in parallel to - * each other, so a mechanism is needed with which to receive and process the events from the other thread. That - * communication channel is formed around {@link BlockingQueue a shared queue} into which thread A - * enqueues events and thread B reads and processes those events. + * An {@code EventProcessor} is the means by which events are processed, the meaning of which is left + * intentionally loose. This is in large part to keep the {@code EventProcessor} focused on what it means to process + * the events, and not linking itself too closely with the rest of the surrounding application. + * + * + * + * The {@code EventProcessor} is envisaged as a stateless service that acts as a conduit, receiving an event and + * dispatching to another block of code to process. The semantic meaning of each event is different, so the + * {@code EventProcessor} will need to interact with other parts of the system that maintain state. The + * implementation should not be concerned with the mechanism by which an event arrived for processing. While the + * events are shuffled around the consumer subsystem by means of {@link BlockingQueue shared queues}, it should + * be considered an anti-pattern to need to know how it arrived or what happens after its is processed. */ -public abstract class EventProcessor implements Closeable { - -private final Logger log; -private final BlockingQueue eventQueue; -private final IdempotentCloser closer; - -protected EventProcessor(final LogContext logContext, final BlockingQueue eventQueue) { -this.log = logContext.logger(EventProcessor.class); -this.eventQueue = eventQueue; -this.closer = new IdempotentCloser(); -} - -public abstract boolean process(); - -protected abstract void process(T event); - -@Override -public void close() { -closer.close(this::closeInternal, () -> log.warn("The event processor was already closed")); -} - -protected interface ProcessHandler { - -void onProcess(T event, Optional error); -} +public interface EventProcessor extends AutoCloseable { /** - * Drains all available events from the queue, and then processes them in order. If any errors are thrown while - * processing the individual events, these are submitted to the given {@link ProcessHandler}. + * Process an event that is received. */ -protected boolean process(ProcessHandler processHandler) { -closer.assertOpen("The processor was previously closed, so no further processing can occur"); - -List events = drain(); - -if (events.isEmpty()) { -log.trace("No events to process"); -return false; -} +void process(T event); -try { -log.trace("Starting processing of {} event{}", events.size(), events.size() == 1 ? "" : "s"); - -for (T event : events) { -try { -Objects.requireNonNull(event, "Attempted to process a null event"); -log.trace("Processing event: {}", event); -process(event); -processHandler.onProcess(event, Optional.empty()); -} catch (Throwable t) { -KafkaException error = ConsumerUtils.maybeWrapAsKafkaException(t); -processHandler.onProcess(event, Optional.of(error)); -} -} -} finally { -log.trace("Completed processing"); -} - -return true; -} - -/** - * It is possible for the consumer to close before complete processing all the events in the queue. In - * this case, we need to throw an exception to notify the user the consumer is closed. - */ -private void closeInternal() { -log.trace("Closing event processor"); -List incompleteEvents = drain(); - -if (incompleteEvents.isEmpty()) -return; - -KafkaException exception = new KafkaException("The consumer is closed"); - -// Check
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1579897029 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java: ## @@ -36,13 +36,14 @@ protected CompletableApplicationEvent(final Type type, final Timer timer) { super(type); this.future = new CompletableFuture<>(); Objects.requireNonNull(timer); -this.deadlineMs = timer.remainingMs() + timer.currentTimeMs(); -} -protected CompletableApplicationEvent(final Type type, final long deadlineMs) { -super(type); -this.future = new CompletableFuture<>(); -this.deadlineMs = deadlineMs; +long currentTimeMs = timer.currentTimeMs(); +long remainingMs = timer.remainingMs(); + +if (currentTimeMs > Long.MAX_VALUE - remainingMs) +this.deadlineMs = Long.MAX_VALUE; +else +this.deadlineMs = currentTimeMs + remainingMs; Review Comment: For point #1, I created a new method named `calculateDeadlineMs` that moves this code into one place. For point #2, there is no `Timer` in the event because `Timer` is not thread safe, and events cross the application/background thread boundary. I did not want to expose the `Timer` in the event to avoid its possible usage from the background thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1579894472 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -66,6 +69,7 @@ public class ConfigurationControlManager { private final TimelineHashMap> configData; private final Map staticConfig; private final ConfigResource currentController; +private final MinIsrConfigUpdatePartitionHandler minIsrConfigUpdatePartitionHandler; Review Comment: maybe more of a question for someone with more code ownership of the quorum controller code, but I wonder if it would be preferable to handle generating the replication control manager records in the `QuorumController.incrementalAlterConfigs`. That would also make it a bit easier to handle `validateOnly` which we are not currently handling. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1579888146 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource configResource, return ApiError.NONE; } +void maybeTriggerPartitionUpdateOnMinIsrChange(List records) { +List minIsrRecords = new ArrayList<>(); +Map topicMap = new HashMap<>(); +Map configRemovedTopicMap = new HashMap<>(); +records.forEach(record -> { +if (MetadataRecordType.fromId(record.message().apiKey()) == MetadataRecordType.CONFIG_RECORD) { +ConfigRecord configRecord = (ConfigRecord) record.message(); +if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) { +minIsrRecords.add(configRecord); +if (Type.forId(configRecord.resourceType()) == Type.TOPIC) { +if (configRecord.value() == null) topicMap.put(configRecord.resourceName(), configRecord.value()); +else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value()); +} +} +} +}); + +if (minIsrRecords.isEmpty()) return; +if (topicMap.size() == minIsrRecords.size()) { +// If all the min isr config updates are on the topic level, we can trigger a simpler update just on the +// updated topics. + records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate( +new ArrayList<>(topicMap.keySet()), +topicName -> topicMap.get(topicName)) +); +return; +} + +// Because it may require multiple layer look up for the min ISR config value. Build a config data copy and apply +// the config updates to it. Use this config copy for the min ISR look up. +Map> configDataCopy = new HashMap<>(configData); +SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new LogContext("dummy-config-update")); +for (ConfigRecord record : minIsrRecords) { +replayInternal(record, configDataCopy, localSnapshotRegistry); +} Review Comment: why are we calling replay here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1579886194 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource configResource, return ApiError.NONE; } +void maybeTriggerPartitionUpdateOnMinIsrChange(List records) { +List minIsrRecords = new ArrayList<>(); +Map topicMap = new HashMap<>(); +Map configRemovedTopicMap = new HashMap<>(); +records.forEach(record -> { +if (MetadataRecordType.fromId(record.message().apiKey()) == MetadataRecordType.CONFIG_RECORD) { +ConfigRecord configRecord = (ConfigRecord) record.message(); +if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) { +minIsrRecords.add(configRecord); +if (Type.forId(configRecord.resourceType()) == Type.TOPIC) { +if (configRecord.value() == null) topicMap.put(configRecord.resourceName(), configRecord.value()); +else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value()); +} +} +} +}); Review Comment: what is the behavior if the default broker config for `min.insync.replicas` is changed? I am not actually sure how that impacts the `min.insync.replicas` for existing topics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1579885684 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -144,6 +166,12 @@ void runOnce() { .map(Optional::get) .map(rm -> rm.maximumTimeToWait(currentTimeMs)) .reduce(Long.MAX_VALUE, Math::min); + +// "Complete" any events that have expired. This cleanup step should only be called after the network I/O +// thread has made at least one call to poll. This is done to emulate the behavior of the legacy consumer's +// handling of timeouts. The legacy consumer makes at least one attempt to satisfy any network requests +// before checking if a timeout has expired. Review Comment: Split into a separate method to accommodate the reworded comment. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -273,9 +301,20 @@ void cleanup() { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { sendUnsentRequests(timer); + +// Copy over the completable events to a separate list, then reap any incomplete +// events on that list. Review Comment: Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1579884608 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1270,6 +1230,20 @@ private void close(Duration timeout, boolean swallowException) { if (applicationEventHandler != null) closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); closeTimer.update(); + +if (backgroundEventReaper != null && backgroundEventQueue != null) { Review Comment: Added a brief comment. PTAL. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -128,7 +138,19 @@ void runOnce() { // Process the events—if any—that were produced by the application thread. It is possible that when processing // an event generates an error. In such cases, the processor will log an exception, but we do not want those // errors to be propagated to the caller. -applicationEventProcessor.process(); +LinkedList events = new LinkedList<>(); +applicationEventQueue.drainTo(events); + +for (ApplicationEvent event : events) { +try { +if (event instanceof CompletableApplicationEvent) + applicationEventReaper.add((CompletableApplicationEvent) event); + +applicationEventProcessor.process(event); +} catch (Throwable t) { +log.warn("Error processing event {}", t.getMessage(), t); +} +} Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1579880592 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -260,6 +279,7 @@ private ApiError incrementalAlterConfigResource( if (error.isFailure()) { return error; } +maybeTriggerPartitionUpdateOnMinIsrChange(newRecords); Review Comment: we need to support `legacyAlterConfigResource` also. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1579872828 ## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ## @@ -159,4 +160,14 @@ BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithBrokerInElr(int brokerId } return new BrokersToIsrs.PartitionsOnReplicaIterator(topicMap, false); } + +BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithElr() { +Map topicMap = new HashMap<>(); +for (Map map : elrMembers.values()) { +if (map != null) { Review Comment: when would this be null? is there a particular reason we chose to use a null array instead of an empty array? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
kirktrue commented on code in PR #15742: URL: https://github.com/apache/kafka/pull/15742#discussion_r1579868091 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -228,13 +228,16 @@ private void process(final ErrorEvent event) { } private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { -ApplicationEvent invokedEvent = invokeRebalanceCallbacks( +ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks( rebalanceListenerInvoker, event.methodName(), event.partitions(), event.future() ); applicationEventHandler.add(invokedEvent); +if (invokedEvent.error().isPresent()) { +throw invokedEvent.error().get(); Review Comment: Sorry for the delay! Yes, I'm in agreement with the perspectives you and @lianetm stated. No qualms from me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1579865601 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java: ## @@ -16,120 +16,31 @@ */ package org.apache.kafka.clients.consumer.internals.events; -import org.apache.kafka.clients.consumer.internals.ConsumerUtils; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.internals.IdempotentCloser; -import org.apache.kafka.common.utils.LogContext; -import org.slf4j.Logger; - -import java.io.Closeable; -import java.util.LinkedList; -import java.util.List; -import java.util.Objects; -import java.util.Optional; import java.util.concurrent.BlockingQueue; /** - * An {@link EventProcessor} is the means by which events produced by thread A are - * processed by thread B. By definition, threads A and B run in parallel to - * each other, so a mechanism is needed with which to receive and process the events from the other thread. That - * communication channel is formed around {@link BlockingQueue a shared queue} into which thread A - * enqueues events and thread B reads and processes those events. + * An {@code EventProcessor} is the means by which events are processed, the meaning of which is left + * intentionally loose. This is in large part to keep the {@code EventProcessor} focused on what it means to process + * the events, and not linking itself too closely with the rest of the surrounding application. + * + * + * + * The {@code EventProcessor} is envisaged as a stateless service that acts as a conduit, receiving an event and + * dispatching to another block of code to process. The semantic meaning of each event is different, so the + * {@code EventProcessor} will need to interact with other parts of the system that maintain state. The + * implementation should not be concerned with the mechanism by which an event arrived for processing. While the + * events are shuffled around the consumer subsystem by means of {@link BlockingQueue shared queues}, it should + * be considered an anti-pattern to need to know how it arrived or what happens after its is processed. */ -public abstract class EventProcessor implements Closeable { - -private final Logger log; -private final BlockingQueue eventQueue; -private final IdempotentCloser closer; - -protected EventProcessor(final LogContext logContext, final BlockingQueue eventQueue) { -this.log = logContext.logger(EventProcessor.class); -this.eventQueue = eventQueue; -this.closer = new IdempotentCloser(); -} - -public abstract boolean process(); - -protected abstract void process(T event); - -@Override -public void close() { -closer.close(this::closeInternal, () -> log.warn("The event processor was already closed")); -} - -protected interface ProcessHandler { - -void onProcess(T event, Optional error); -} +public interface EventProcessor extends AutoCloseable { /** - * Drains all available events from the queue, and then processes them in order. If any errors are thrown while - * processing the individual events, these are submitted to the given {@link ProcessHandler}. + * Process an event that is received. */ -protected boolean process(ProcessHandler processHandler) { -closer.assertOpen("The processor was previously closed, so no further processing can occur"); - -List events = drain(); - -if (events.isEmpty()) { -log.trace("No events to process"); -return false; -} +void process(T event); -try { -log.trace("Starting processing of {} event{}", events.size(), events.size() == 1 ? "" : "s"); - -for (T event : events) { -try { -Objects.requireNonNull(event, "Attempted to process a null event"); -log.trace("Processing event: {}", event); -process(event); -processHandler.onProcess(event, Optional.empty()); -} catch (Throwable t) { -KafkaException error = ConsumerUtils.maybeWrapAsKafkaException(t); -processHandler.onProcess(event, Optional.of(error)); -} -} -} finally { -log.trace("Completed processing"); -} - -return true; -} - -/** - * It is possible for the consumer to close before complete processing all the events in the queue. In - * this case, we need to throw an exception to notify the user the consumer is closed. - */ -private void closeInternal() { -log.trace("Closing event processor"); -List incompleteEvents = drain(); - -if (incompleteEvents.isEmpty()) -return; - -KafkaException exception = new KafkaException("The consumer is closed"); - -// Check
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1579864626 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java: ## @@ -36,13 +36,14 @@ protected CompletableApplicationEvent(final Type type, final Timer timer) { super(type); this.future = new CompletableFuture<>(); Objects.requireNonNull(timer); -this.deadlineMs = timer.remainingMs() + timer.currentTimeMs(); -} -protected CompletableApplicationEvent(final Type type, final long deadlineMs) { -super(type); -this.future = new CompletableFuture<>(); -this.deadlineMs = deadlineMs; +long currentTimeMs = timer.currentTimeMs(); +long remainingMs = timer.remainingMs(); + +if (currentTimeMs > Long.MAX_VALUE - remainingMs) +this.deadlineMs = Long.MAX_VALUE; +else +this.deadlineMs = currentTimeMs + remainingMs; Review Comment: `CompleteableEvent` is an interface, but I could see if I can put a static method in there to keep the logic in one place. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java: ## @@ -36,13 +36,14 @@ protected CompletableApplicationEvent(final Type type, final Timer timer) { super(type); this.future = new CompletableFuture<>(); Objects.requireNonNull(timer); -this.deadlineMs = timer.remainingMs() + timer.currentTimeMs(); -} -protected CompletableApplicationEvent(final Type type, final long deadlineMs) { -super(type); -this.future = new CompletableFuture<>(); -this.deadlineMs = deadlineMs; +long currentTimeMs = timer.currentTimeMs(); +long remainingMs = timer.remainingMs(); + +if (currentTimeMs > Long.MAX_VALUE - remainingMs) +this.deadlineMs = Long.MAX_VALUE; +else +this.deadlineMs = currentTimeMs + remainingMs; Review Comment: `CompletableEvent` is an interface, but I could see if I can put a static method in there to keep the logic in one place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] cherrypick KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
jolshan merged PR #15755: URL: https://github.com/apache/kafka/pull/15755 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16386) NETWORK_EXCEPTIONs from transaction verification are not translated
[ https://issues.apache.org/jira/browse/KAFKA-16386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-16386: --- Fix Version/s: 3.7.1 > NETWORK_EXCEPTIONs from transaction verification are not translated > --- > > Key: KAFKA-16386 > URL: https://issues.apache.org/jira/browse/KAFKA-16386 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0, 3.7.0 >Reporter: Sean Quah >Priority: Minor > Fix For: 3.8.0, 3.7.1 > > > KAFKA-14402 > ([KIP-890|https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense]) > adds verification with the transaction coordinator on Produce and > TxnOffsetCommit paths as a defense against hanging transactions. For > compatibility with older clients, retriable errors from the verification step > are translated to ones already expected and handled by existing clients. When > verification was added, we forgot to translate {{NETWORK_EXCEPTION}} s. > [~dajac] noticed this manifesting as a test failure when > tests/kafkatest/tests/core/transactions_test.py was run with an older client > (prior to the fix for KAFKA-16122): > {quote} > {{NETWORK_EXCEPTION}} is indeed returned as a partition error. The > {{TransactionManager.TxnOffsetCommitHandler}} considers it as a fatal error > so it transitions to the fatal state. > It seems that there are two cases where the server could return it: (1) When > the verification request times out or its connections is cut; or (2) in > {{AddPartitionsToTxnManager.addTxnData}} where we say that we use it because > we want a retriable error. > {quote} > The first case was triggered as part of the test. The second case happens > when there is already a verification request ({{AddPartitionsToTxn}}) in > flight with the same epoch and we want clients to try again when we're not > busy. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]
kirktrue commented on code in PR #15723: URL: https://github.com/apache/kafka/pull/15723#discussion_r1579862658 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestStateTest.java: ## @@ -48,4 +50,40 @@ public void testRequestStateSimple() { state.reset(); assertTrue(state.canSendRequest(200)); } + +@Test +public void testTrackInflightOnSuccessfulAttempt() { +testTrackInflight(RequestState::onSuccessfulAttempt); +} + +@Test +public void testTrackInflightOnFailedAttempt() { +testTrackInflight(RequestState::onFailedAttempt); +} + +private void testTrackInflight(BiConsumer onCompletedAttempt) { +RequestState state = new RequestState( +new LogContext(), +this.getClass().getSimpleName(), +100, +2, +1000, +0); + +// This is just being paranoid... +assertFalse(state.requestInFlight()); + +// When we've sent a request, the flag should update from false to true. +state.onSendAttempt(); +assertTrue(state.requestInFlight()); + +// Now we've received the response. +onCompletedAttempt.accept(state, 236); + +// When we've sent a second request with THE SAME TIMESTAMP as the previous response, Review Comment: I added back the timestamp so we could use it in the `lastSentMs` value for debugging. So the comment should make sense again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]
johnnychhsu commented on PR #15788: URL: https://github.com/apache/kafka/pull/15788#issuecomment-208202 thanks for the prompt reply @OmniaGM ! just updated to resolve the conflict. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16623) KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned
Kirk True created KAFKA-16623: - Summary: KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned Key: KAFKA-16623 URL: https://issues.apache.org/jira/browse/KAFKA-16623 Project: Kafka Issue Type: Bug Components: clients, consumer, system tests Affects Versions: 3.8.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 When running system tests for the KafkaAsyncConsumer, we occasionally see this warning: {noformat} File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner self.run() File "/usr/lib/python3.7/threading.py", line 865, in run self._target(*self._args, **self._kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", line 38, in _protected_worker self._worker(idx, node) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", line 304, in _worker handler.handle_partitions_revoked(event, node, self.logger) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", line 163, in handle_partitions_revoked (tp, node.account.hostname) AssertionError: Topic partition TopicPartition(topic='test_topic', partition=0) cannot be revoked from worker20 as it was not previously assigned to that consumer {noformat} It is unclear what is causing this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Clean up TestUtils.scala [kafka]
m1a2st commented on PR #15808: URL: https://github.com/apache/kafka/pull/15808#issuecomment-2077728033 @chia7712 please review this PR, thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14585: Move StorageTool to tools [kafka]
mimaison commented on code in PR #14847: URL: https://github.com/apache/kafka/pull/14847#discussion_r1579785296 ## metadata/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java: ## @@ -47,7 +47,7 @@ public final class MetaProperties { /** * The property that specifies the node id. Replaces broker.id in V1. */ -static final String NODE_ID_PROP = "node.id"; +public static final String NODE_ID_PROP = "node.id"; Review Comment: This is really not nice that we have to do that. I wonder if we need to wait for the `NodeIdProp` from KafkaConfig to move instead of doing that. ## tools/src/test/java/org/apache/kafka/tools/StorageToolTest.java: ## @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.common.DirectoryId; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.properties.MetaProperties; +import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble; +import org.apache.kafka.metadata.properties.MetaPropertiesVersion; +import org.apache.kafka.metadata.properties.PropertiesUtils; +import org.apache.kafka.raft.RaftConfig; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.kafka.test.TestUtils.tempFile; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith(MockitoExtension.class) +@Timeout(value = 40) +public class StorageToolTest { +private Properties newSelfManagedProperties() { +Properties properties = new Properties(); +properties.setProperty(ServerLogConfigs.LOG_DIRS_CONFIG, "/tmp/foo,/tmp/bar"); +properties.setProperty(StorageTool.PROCESS_ROLES_CONFIG, "controller"); +properties.setProperty(MetaProperties.NODE_ID_PROP, "2"); +properties.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9092"); +return properties; +} + +@Test +public void testConfigToLogDirectories() { +LogConfig config = new LogConfig(newSelfManagedProperties()); +assertEquals(new ArrayList<>(Arrays.asList("/tmp/bar", "/tmp/foo")), StorageTool.configToLogDirectories(config)); +} + +@Test +public void testConfigToLogDirectoriesWithMetaLogDir() { +Properties properties = newSelfManagedProperties(); +properties.setProperty(StorageTool.METADATA_LOG_DIR_CONFIG, "/tmp/baz"); +LogConfig config = new LogConfig(properties); +assertEquals(new ArrayList<>(Arrays.asList("/tmp/bar", "/tmp/baz", "/tmp/foo")), StorageTool.configToLogDirectories(config)); +} + +@Test +public void
Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]
kirktrue commented on PR #15750: URL: https://github.com/apache/kafka/pull/15750#issuecomment-2077701719 What about `toStringDetails()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Clean up TestUtils.scala [kafka]
m1a2st opened a new pull request, #15808: URL: https://github.com/apache/kafka/pull/15808 Because TestUtils.scala has some unused method, and some methods only use in one class, so I delete unused methods and move methods to individual class which only one class used -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Various cleanups in generator [kafka]
mimaison opened a new pull request, #15807: URL: https://github.com/apache/kafka/pull/15807 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Various cleanups in metadata [kafka]
mimaison opened a new pull request, #15806: URL: https://github.com/apache/kafka/pull/15806 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1579760563 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1338,7 +1339,14 @@ private CompletableFuture enqueueConsumerRebalanceListenerCallback(Consume Set partitions) { SortedSet sortedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); sortedPartitions.addAll(partitions); -CompletableBackgroundEvent event = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions); + +// We don't yet have the concept of having an expiring callback, but we will likely want that eventually. +Timer timer = time.timer(Long.MAX_VALUE); +CompletableBackgroundEvent event = new ConsumerRebalanceListenerCallbackNeededEvent( +methodName, +sortedPartitions, +timer +); Review Comment: I'll look into how to do this in a way that I don't find too ugly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1579756071 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -144,6 +166,12 @@ void runOnce() { .map(Optional::get) .map(rm -> rm.maximumTimeToWait(currentTimeMs)) .reduce(Long.MAX_VALUE, Math::min); + +// "Complete" any events that have expired. This cleanup step should only be called after the network I/O +// thread has made at least one call to poll. This is done to emulate the behavior of the legacy consumer's +// handling of timeouts. The legacy consumer makes at least one attempt to satisfy any network requests +// before checking if a timeout has expired. Review Comment: I'm happy to reword the comment and clean it up, but the lines that follow that comment are the raison d'être of this change. It's very subtle and easy to miss, hence the call-out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]
kirktrue commented on PR #15750: URL: https://github.com/apache/kafka/pull/15750#issuecomment-2077634416 Thanks for the work on this @phooq! > I plan to, on top the current changes I have, rename the `toStringBase` method as `getDetails` Renaming `toStringBase()` to `getDetails()` makes its purpose more vague, in my opinion Keep in mind that the naming convention `toStringBase()` is used in `ApplicationEvent`, `BackgroundEvent`, and maybe elsewhere, too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1579735209 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1270,6 +1230,20 @@ private void close(Duration timeout, boolean swallowException) { if (applicationEventHandler != null) closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); closeTimer.update(); + +if (backgroundEventReaper != null && backgroundEventQueue != null) { Review Comment: They're only `null` if there was an error in the constructor. The constructor's `finally` block calls `close()`, so we need to handle the case where the consumer wasn't fully constructed before it's closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1579733831 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -769,7 +730,8 @@ public void commitAsync(OffsetCommitCallback callback) { public void commitAsync(Map offsets, OffsetCommitCallback callback) { acquireAndEnsureOpen(); try { -AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets); +Timer timer = time.timer(Long.MAX_VALUE); +AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets, timer); Review Comment: Yeah, I went back and forth on this a few times Ultimately I wanted to force the caller to be explicit about its timeout intention, vs. having it implicitly "hidden" away in the event hierarchy. Also, to create a `Timer` in the event constructor, we'd have to pass in a `Time` object (`time.timer(Long.MAX_VALUE)`), which seemed a bit obtuse, so 路♂️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]
kirktrue commented on code in PR #15723: URL: https://github.com/apache/kafka/pull/15723#discussion_r1579725251 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java: ## @@ -98,12 +93,11 @@ public boolean canSendRequest(final long currentTimeMs) { * is a request in-flight. */ public boolean requestInFlight() { -return this.lastSentMs > -1 && this.lastReceivedMs < this.lastSentMs; +return requestInFlight; Review Comment: @philipnee—I didn't make any changes to the name as `requestInFlight` was the existing method name. Are you OK to leave this for now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned [kafka]
kirktrue commented on code in PR #15737: URL: https://github.com/apache/kafka/pull/15737#discussion_r1579722758 ## tests/kafkatest/services/verifiable_consumer.py: ## @@ -140,22 +150,31 @@ class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler): def __init__(self, node, verify_offsets, idx): super().__init__(node, verify_offsets, idx) -def handle_partitions_revoked(self, event): +def handle_partitions_revoked(self, event, node, logger): self.revoked_count += 1 self.state = ConsumerState.Rebalancing self.position = {} +revoked = [] + for topic_partition in event["partitions"]: -topic = topic_partition["topic"] -partition = topic_partition["partition"] -self.assignment.remove(TopicPartition(topic, partition)) +tp = _create_partition_from_dict(topic_partition) +assert tp in self.assignment, \ +"Topic partition %s cannot be revoked from %s as it was not previously assigned to that consumer" % \ +(tp, node.account.hostname) Review Comment: @lucasbru—this is the main functional change: ensure that an attempt to remove a partition from the local state verifies that it was previously assigned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]
phooq commented on PR #15750: URL: https://github.com/apache/kafka/pull/15750#issuecomment-2077592945 Thanks so @lianetm ! Hey @kirktrue , I plan to, on top the current changes I have, rename the `toStringBase` method as `getDetails` for `RequestState`, does this look okay to you? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: refine DeleteOffsetsConsumerGroupCommandIntegrationTest#produceRecord [kafka]
chia7712 commented on code in PR #15802: URL: https://github.com/apache/kafka/pull/15802#discussion_r1579708062 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -169,20 +201,22 @@ private static ConsumerGroupCommand.ConsumerGroupService consumerGroupService(St ); } -private void testWithConsumerGroup(String inputTopic, +private void testWithConsumerGroup(String inputTopicWithData, + String inputTopicForTest, + String inputGroup, int inputPartition, int expectedPartition, Errors expectedError, boolean isStable, Map consumerConfig) { -produceRecord(); +produceRecord(inputTopicWithData); Review Comment: We can move `produceRecord();` out of `testWithConsumerGroup` to simplify the arguments of `testWithConsumerGroup` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
Edoardo Comar created KAFKA-16622: - Summary: Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once Key: KAFKA-16622 URL: https://issues.apache.org/jira/browse/KAFKA-16622 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 3.6.2, 3.7.0, 3.8.0 Reporter: Edoardo Comar Attachments: edo-connect-mirror-maker-sourcetarget.properties We observed an excessively delayed emission of the MM2 Checkpoint record. It only gets created when the source consumer reaches the end of a topic. This does not seem reasonable. In a very simple setup : Tested with a standalone single process MirrorMaker2 mirroring between two single-node kafka clusters(mirromaker config attached) with quick refresh intervals (eg 5 sec) and a small offset.lag.max (eg 10) create a single topic in the source cluster produce data to it (e.g. 1 records) start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec between polls which commits after each poll watch the Checkpoint topic in the target cluster bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ --topic source.checkpoints.internal \ --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ --from-beginning -> no record appears in the checkpoint topic until the consumer reaches the end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: refine DeleteOffsetsConsumerGroupCommandIntegrationTest#produceRecord [kafka]
chia7712 commented on code in PR #15802: URL: https://github.com/apache/kafka/pull/15802#discussion_r1579702459 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -91,65 +91,97 @@ public void testDeleteOffsetsNonExistingGroup() { @ClusterTest public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() { -for (Map consumerConfig : consumerConfigs) { -createTopic(TOPIC); -testWithConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerConfig); -removeTopic(TOPIC); +int idx = 0; +for (Iterator> it = consumerConfigs.iterator(); it.hasNext(); idx++) { +Map consumerConfig = it.next(); +String topic = TOPIC_PREFIX + idx; +String group = GROUP_PREFIX + idx; +createTopic(topic); +testWithConsumerGroup(topic, topic, group, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerConfig); +removeTopic(topic); } } @ClusterTest public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly() { -for (Map consumerConfig : consumerConfigs) { -createTopic(TOPIC); -testWithConsumerGroup(TOPIC, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerConfig); -removeTopic(TOPIC); +int idx = 0; +for (Iterator> it = consumerConfigs.iterator(); it.hasNext(); idx++) { +Map consumerConfig = it.next(); +String topic = TOPIC_PREFIX + idx; Review Comment: Maybe we can use test case + protocol name to be unique name -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org