Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-04-25 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1580515079


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+public class RLMQuotaManagerConfig {

Review Comment:
   Will add.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]

2024-04-25 Thread via GitHub


appchemist commented on code in PR #15647:
URL: https://github.com/apache/kafka/pull/15647#discussion_r1580510829


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:
##
@@ -327,21 +327,27 @@ private void handleInitializeErrors(final CompletedFetch 
completedFetch, final E
 final long fetchOffset = completedFetch.nextFetchOffset();
 
 if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
-error == Errors.REPLICA_NOT_AVAILABLE ||
+error == Errors.FENCED_LEADER_EPOCH) {
+log.debug("Error in fetch for partition {}: {}", tp, 
error.exceptionName());
+requestMetadataUpdate(metadata, subscriptions, tp);
+} else if (error == Errors.REPLICA_NOT_AVAILABLE ||
 error == Errors.KAFKA_STORAGE_ERROR ||
-error == Errors.FENCED_LEADER_EPOCH ||
 error == Errors.OFFSET_NOT_AVAILABLE) {
 log.debug("Error in fetch for partition {}: {}", tp, 
error.exceptionName());
 requestMetadataUpdate(metadata, subscriptions, tp);
+subscriptions.awaitUpdate(tp);

Review Comment:
   I missed that. For this case, there is no need to await a metadata update.
   I think simply initializing the PreferredReadReplica should be enough.
   Since `FetchUtils.requestMetadataUpdate()` is already being called, it 
should also be initializing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]

2024-04-25 Thread via GitHub


appchemist commented on code in PR #15647:
URL: https://github.com/apache/kafka/pull/15647#discussion_r1580504115


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget,
 if (partitionData.currentLeader().leaderId() != -1 && 
partitionData.currentLeader().leaderEpoch() != -1) {
 partitionsWithUpdatedLeaderInfo.put(partition, new 
Metadata.LeaderIdAndEpoch(
 
Optional.of(partitionData.currentLeader().leaderId()), 
Optional.of(partitionData.currentLeader().leaderEpoch(;
+} else {
+requestMetadataUpdate(metadata, subscriptions, 
partition);
+subscriptions.awaitUpdate(partition);

Review Comment:
   As another alternative, it could change the status to `AWAIT_UPDATE` in 
`FetchCollector.handleInitializeErrors()` only when it's not a KIP-951 case.
   Upon further thought, it seems possible to differentiate based on the 
following conditions.
   ```
   completedFetch.partitionData.currentLeader().leaderId() != -1 && 
completedFetch.partitionData.currentLeader().leaderEpoch() != -1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]

2024-04-25 Thread via GitHub


appchemist commented on code in PR #15647:
URL: https://github.com/apache/kafka/pull/15647#discussion_r1580499797


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget,
 if (partitionData.currentLeader().leaderId() != -1 && 
partitionData.currentLeader().leaderEpoch() != -1) {
 partitionsWithUpdatedLeaderInfo.put(partition, new 
Metadata.LeaderIdAndEpoch(
 
Optional.of(partitionData.currentLeader().leaderId()), 
Optional.of(partitionData.currentLeader().leaderEpoch(;
+} else {
+requestMetadataUpdate(metadata, subscriptions, 
partition);
+subscriptions.awaitUpdate(partition);

Review Comment:
   If the FetchStates is `FETCHING` as per KIP-951, the 
`FetchCollector.handleInitializeErrors()` method is called.
   I thought that in this case, it should not be changed to `AWAIT_UPDATE`.
   Additionally, if it's `AWAIT_UPDATE`, it will be filtered out by the 
following code inside the `FetchCollector.initialize()` method and will not go 
through `FetchCollector.handleInitializeErrors()`.
   
   ```
   if (!subscriptions.hasValidPosition(tp)) {
  // this can happen when a rebalance happened while fetch is still 
in-flight
  log.debug("Ignoring fetched records for partition {} since it no longer 
has valid position", tp);
  return null;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]

2024-04-25 Thread via GitHub


appchemist commented on code in PR #15647:
URL: https://github.com/apache/kafka/pull/15647#discussion_r1580504115


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget,
 if (partitionData.currentLeader().leaderId() != -1 && 
partitionData.currentLeader().leaderEpoch() != -1) {
 partitionsWithUpdatedLeaderInfo.put(partition, new 
Metadata.LeaderIdAndEpoch(
 
Optional.of(partitionData.currentLeader().leaderId()), 
Optional.of(partitionData.currentLeader().leaderEpoch(;
+} else {
+requestMetadataUpdate(metadata, subscriptions, 
partition);
+subscriptions.awaitUpdate(partition);

Review Comment:
   As another alternative, it could change the status to AWAIT_UPDATE in 
FetchCollector.handleInitializeErrors() only when it's not a KIP-951 case.
   Upon further thought, it seems possible to differentiate based on the 
following conditions.
   ```
   completedFetch.partitionData.currentLeader().leaderId() != -1 && 
completedFetch.partitionData.currentLeader().leaderEpoch() != -1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]

2024-04-25 Thread via GitHub


appchemist commented on code in PR #15647:
URL: https://github.com/apache/kafka/pull/15647#discussion_r1580499797


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget,
 if (partitionData.currentLeader().leaderId() != -1 && 
partitionData.currentLeader().leaderEpoch() != -1) {
 partitionsWithUpdatedLeaderInfo.put(partition, new 
Metadata.LeaderIdAndEpoch(
 
Optional.of(partitionData.currentLeader().leaderId()), 
Optional.of(partitionData.currentLeader().leaderEpoch(;
+} else {
+requestMetadataUpdate(metadata, subscriptions, 
partition);
+subscriptions.awaitUpdate(partition);

Review Comment:
   If the FetchStates is FETCHING as per KIP-951, the 
FetchCollector.handleInitializeErrors() method is called.
   I thought that in this case, it should not be changed to AWAIT_UPDATE.
   Additionally, if it's AWAIT_UPDATE, it will be filtered out by the following 
code inside the FetchCollector.initialize() method and will not go through 
FetchCollector.handleInitializeErrors().
   
   ```
   if (!subscriptions.hasValidPosition(tp)) {
  // this can happen when a rebalance happened while fetch is still 
in-flight
  log.debug("Ignoring fetched records for partition {} since it no longer 
has valid position", tp);
  return null;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-04-25 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1580499863


##
core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java:
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RLMQuotaManagerTest {
+private final MockTime time = new MockTime();
+private final Metrics metrics = new Metrics(new MetricConfig(), 
Collections.emptyList(), time);
+private static final QuotaType QUOTA_TYPE = QuotaType.RLMFetch$.MODULE$;
+private static final String DESCRIPTION = "Tracking byte rate";
+
+@Test
+public void testQuotaExceeded() {
+RLMQuotaManager quotaManager = new RLMQuotaManager(
+new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, 
DESCRIPTION, time);
+
+assertFalse(quotaManager.isQuotaExceeded());
+quotaManager.record(500);
+// Move clock by 1 sec, quota is violated
+moveClock(1);
+assertTrue(quotaManager.isQuotaExceeded());
+
+// Move clock by another 8 secs, quota is still violated for the window
+moveClock(8);
+assertTrue(quotaManager.isQuotaExceeded());
+
+// Move clock by 1 sec, quota is no more violated
+moveClock(1);
+assertFalse(quotaManager.isQuotaExceeded());
+}
+
+@Test
+public void testQuotaUpdate() {
+RLMQuotaManager quotaManager = new RLMQuotaManager(
+new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, 
DESCRIPTION, time);
+
+assertFalse(quotaManager.isQuotaExceeded());
+quotaManager.record(51);
+assertTrue(quotaManager.isQuotaExceeded());
+
+Map fetchQuotaMetrics = 
metrics.metrics().entrySet().stream()
+.filter(entry -> entry.getKey().name().equals("byte-rate") && 
entry.getKey().group().equals(QUOTA_TYPE.toString()))
+.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+Map nonQuotaMetrics = 
metrics.metrics().entrySet().stream()
+.filter(entry -> !entry.getKey().name().equals("byte-rate") || 
!entry.getKey().group().equals(QUOTA_TYPE.toString()))
+.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+assertEquals(1, fetchQuotaMetrics.size());
+assertFalse(nonQuotaMetrics.isEmpty());
+
+Map configForQuotaMetricsBeforeUpdate = 
extractMetricConfig(fetchQuotaMetrics);
+Map configForNonQuotaMetricsBeforeUpdate = 
extractMetricConfig(nonQuotaMetrics);
+
+// Update quota to 60, quota is no more violated
+Quota quota60Bytes = new Quota(60, true);
+quotaManager.updateQuota(quota60Bytes);
+assertFalse(quotaManager.isQuotaExceeded());
+
+// Verify quota metrics were updated
+Map configForQuotaMetricsAfterFirstUpdate = 
extractMetricConfig(fetchQuotaMetrics);
+assertNotEquals(configForQuotaMetricsBeforeUpdate, 
configForQuotaMetricsAfterFirstUpdate);
+fetchQuotaMetrics.values().forEach(metric -> 
assertEquals(metric.config().quota(), quota60Bytes));

Review Comment:
   Yes, thanks for pointing out.



##
core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java:
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.

[jira] [Commented] (KAFKA-16584) Make log processing summary configurable or debug

2024-04-25 Thread dujian (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841056#comment-17841056
 ] 

dujian commented on KAFKA-16584:


hello  [~mjsax] 

before create KIP, i must create a wiki ID, but “ 
[https://cwiki.apache.org/confluence/signup.action]”  registration function 
turned off,can you help me

> Make log processing summary configurable or debug
> -
>
> Key: KAFKA-16584
> URL: https://issues.apache.org/jira/browse/KAFKA-16584
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Andras Hatvani
>Assignee: dujian
>Priority: Major
>  Labels: needs-kip, newbie
>
> Currently *every two minutes for every stream thread* statistics will be 
> logged on INFO log level. 
> {code}
> 2024-04-18T09:18:23.790+02:00  INFO 33178 --- [service] [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 
> total records, ran 0 punctuators, and committed 0 total tasks since the last 
> update {code}
> This is absolutely unnecessary and even harmful since it fills the logs and 
> thus storage space with unwanted and useless data. Otherwise the INFO logs 
> are useful and helpful, therefore it's not an option to raise the log level 
> to WARN.
> Please make the logProcessingSummary 
> * either to a DEBUG level log or
> * make it configurable so that it can be disabled.
> This is the relevant code: 
> https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]

2024-04-25 Thread via GitHub


appchemist commented on PR #15647:
URL: https://github.com/apache/kafka/pull/15647#issuecomment-2078650601

   Thanks for review! @kirktrue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Clean up TestUtils.scala [kafka]

2024-04-25 Thread via GitHub


m1a2st commented on PR #15808:
URL: https://github.com/apache/kafka/pull/15808#issuecomment-2078587040

   @chia7712 thanls for your comment, I have already change these code by your 
comment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] MINOR: fix flaky test [kafka]

2024-04-25 Thread via GitHub


github-actions[bot] commented on PR #15052:
URL: https://github.com/apache/kafka/pull/15052#issuecomment-2078571664

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: enable kraft test for ReassignPartitionsIntegrationTest [kafka]

2024-04-25 Thread via GitHub


FrankYang0529 commented on PR #15675:
URL: https://github.com/apache/kafka/pull/15675#issuecomment-2078515889

   > btw, it would be great if you can rewrite it by new test infra.
   
   Yeah, but this class need to assign different config to different broker. I 
will wait for https://github.com/apache/kafka/pull/15761.
   
   
https://github.com/apache/kafka/blob/025f9816f1a15d14aab25c9e8e5b03a87f0cefe2/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java#L754-L808


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1580354223


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java:
##
@@ -132,63 +133,41 @@ public void close() throws Exception {
 }
 }
 
-static class ConsumerRunnable implements Runnable {
-private final String brokerAddress;
-private final String groupId;
-private final Properties customConfigs;
+private static class ConsumerRunnable implements Runnable {
 private final boolean syncCommit;
 private final String topic;
-private final String groupProtocol;
-private final String assignmentStrategy;
-private final Optional remoteAssignor;
-private final Properties props = new Properties();
-private KafkaConsumer consumer;
-private boolean configured = false;
+private final KafkaConsumer consumer;
 private volatile boolean isShutdown = false;
 
-public ConsumerRunnable(String brokerAddress,
-String groupId,
-String groupProtocol,
-String topic,
-String assignmentStrategy,
-Optional remoteAssignor,
-Optional customConfigs,
-boolean syncCommit) {
-this.brokerAddress = brokerAddress;
-this.groupId = groupId;
-this.customConfigs = customConfigs.orElse(new Properties());
+private ConsumerRunnable(String brokerAddress,
+ String groupId,
+ String groupProtocol,
+ String topic,
+ String assignmentStrategy,
+ Optional remoteAssignor,
+ Map customConfigs,
+ boolean syncCommit) {
 this.syncCommit = syncCommit;
 this.topic = topic;
-this.groupProtocol = groupProtocol;
-this.assignmentStrategy = assignmentStrategy;
-this.remoteAssignor = remoteAssignor;
 
-this.configure();
-}
-
-private void configure() {
-configured = true;
-configure(props);
-props.putAll(customConfigs);
-consumer = new KafkaConsumer<>(props);
-}
-
-private void configure(Properties props) {
-props.put(BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
-props.put(GROUP_ID_CONFIG, groupId);
-props.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
-props.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
-props.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
+Map configs = new HashMap<>();
+configs.put(BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
+configs.put(GROUP_ID_CONFIG, groupId);
+configs.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
+
 if (Objects.equals(groupProtocol, CONSUMER.toString())) {
-remoteAssignor.ifPresent(assignor -> 
props.put(GROUP_REMOTE_ASSIGNOR_CONFIG, assignor));
+remoteAssignor.ifPresent(assignor -> 
configs.put(GROUP_REMOTE_ASSIGNOR_CONFIG, assignor));
 } else {
-props.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
assignmentStrategy);
+configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
assignmentStrategy);
 }
+configs.putAll(customConfigs);
+consumer = new KafkaConsumer<>(configs);

Review Comment:
   Could you make sure all consumers get closed even though one of consumer 
gets error? We encountered resource leaks before and it makes our CI unstable. 
For example:
   ```java
   private static AutoCloseable run(
   String brokerAddress,
   int numberOfConsumers,
   String groupId,
   String groupProtocol,
   String topic,
   String assignmentStrategy,
   Optional remoteAssignor,
   Map customConfigs,
   boolean syncCommit
   ) {
   Queue> consumers = 
consumers(IntStream.range(0, numberOfConsumers).mapToObj(ignored -> {
   Map configs = new HashMap<>();
   configs.put(BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
   configs.put(GROUP_ID_CONFIG, groupId);
   configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
   if (Objects.equals(groupProtocol, CONSUMER.toString())) {
   

[PR] MINOR: fix timeouts of EosIntegrationTest [kafka]

2024-04-25 Thread via GitHub


mjsax opened a new pull request, #15811:
URL: https://github.com/apache/kafka/pull/15811

   Typo set's timeout way too high... Test still passes as test timeout is 10 
minutes and test runs 5 minutes (with this fix, test runtime drops to 30 
seconds as intended).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Clean up TestUtils.scala [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15808:
URL: https://github.com/apache/kafka/pull/15808#discussion_r1580291046


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -1594,10 +1313,6 @@ object TestUtils extends Logging {
 }
   }
 
-  def createIsrChangeListener(): MockAlterPartitionListener = {

Review Comment:
   `MockAlterPartitionListener` is used by `AbstractPartitionTest` only, so 
could you move it to `AbstractPartitionTest`?



##
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##
@@ -548,4 +549,31 @@ class DeleteTopicTest extends QuorumTestHarness {
 TestUtils.waitUntilTrue(() => 
brokers.exists(_.asInstanceOf[KafkaServer].kafkaController.isActive), "No 
controller is elected")
 TestUtils.verifyTopicDeletion(zkClient, topic, 2, brokers)
   }
+
+  private def increasePartitions[B <: KafkaBroker](admin: Admin,
+   topic: String,
+   totalPartitionCount: Int,
+   brokersToValidate: Seq[B]
+  ): Unit = {
+
+try {
+  val newPartitionSet: Map[String, NewPartitions] = Map.apply(topic -> 
NewPartitions.increaseTo(totalPartitionCount))
+  admin.createPartitions(newPartitionSet.asJava)
+} catch {
+  case e: ExecutionException =>
+throw e
+}
+
+if (brokersToValidate.nonEmpty) {
+  // wait until we've propagated all partitions metadata to all brokers
+  val allPartitionsMetadata = 
waitForAllPartitionsMetadata(brokersToValidate, topic, totalPartitionCount)
+
+  (0 until totalPartitionCount - 1).map { i =>
+i -> allPartitionsMetadata.get(new TopicPartition(topic, 
i)).map(_.leader()).getOrElse(
+  throw new IllegalStateException(s"Cannot get the partition leader 
for topic: $topic, partition: $i in server metadata cache"))

Review Comment:
   this method return nothing, so we don't need to collect the metadata. 
However, it needs to check the existence of topic partition



##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -613,4 +613,19 @@ class LogSegmentTest {
 )
   }
 
+  private def checkEquals[T](s1: java.util.Iterator[T], s2: 
java.util.Iterator[T]): Unit = {
+while (s1.hasNext && s2.hasNext)
+  assertEquals(s1.next, s2.next)
+assertFalse(s1.hasNext, "Iterators have uneven length--first has more")
+assertFalse(s2.hasNext, "Iterators have uneven length--second has more")
+  }
+
+  private def writeNonsenseToFile(fileName: File, position: Long, size: Int): 
Unit = {
+val file = new RandomAccessFile(fileName, "rw")
+file.seek(position)
+for (_ <- 0 until size)
+  file.writeByte(random.nextInt(255))
+file.close()

Review Comment:
   could you please add try-finally?



##
core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala:
##
@@ -149,6 +150,23 @@ class SaslPlainSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTes
*/
   @Test
   def testAcls(): Unit = {
-TestUtils.verifySecureZkAcls(zkClient, 1)
+verifySecureZkAcls(zkClient, 1)
+  }
+
+  /**
+   * Verifies that all secure paths in ZK are created with the expected ACL.
+   */
+  private def verifySecureZkAcls(zkClient: KafkaZkClient, usersWithAccess: 
Int): Unit = {

Review Comment:
   ditto



##
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##
@@ -548,4 +549,31 @@ class DeleteTopicTest extends QuorumTestHarness {
 TestUtils.waitUntilTrue(() => 
brokers.exists(_.asInstanceOf[KafkaServer].kafkaController.isActive), "No 
controller is elected")
 TestUtils.verifyTopicDeletion(zkClient, topic, 2, brokers)
   }
+
+  private def increasePartitions[B <: KafkaBroker](admin: Admin,
+   topic: String,
+   totalPartitionCount: Int,
+   brokersToValidate: Seq[B]
+  ): Unit = {
+
+try {
+  val newPartitionSet: Map[String, NewPartitions] = Map.apply(topic -> 
NewPartitions.increaseTo(totalPartitionCount))
+  admin.createPartitions(newPartitionSet.asJava)
+} catch {

Review Comment:
   We do nothing for this catch, so it should be fine to remove it.



##
core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala:
##
@@ -52,6 +56,20 @@ class SaslPlainPlaintextConsumerTest extends 
BaseConsumerTest with SaslSetup {
*/
   @Test
   def testZkAclsDisabled(): Unit = {
-TestUtils.verifyUnsecureZkAcls(zkClient)
+verifyUnsecureZkAcls(zkClient)
+  }
+
+  /**
+   * Verifies that secure paths in ZK have no access control. This is
+   * the case when zookeeper.set.acl=false and no ACLs have been configured.
+   */
+  private def verifyUnsecureZkAcls(zkClient: KafkaZkClient): Unit = {
+

Re: [PR] MINOR: fix javadoc warnings [kafka]

2024-04-25 Thread via GitHub


chia7712 merged PR #15527:
URL: https://github.com/apache/kafka/pull/15527


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove unnecessary version from excluded dependencies of clients [kafka]

2024-04-25 Thread via GitHub


showuon commented on PR #15804:
URL: https://github.com/apache/kafka/pull/15804#issuecomment-2078386421

   > LGTM as it won't compare the undefined version 
(https://github.com/johnrengelman/shadow/blob/main/src/main/groovy/com/github/jengelman/gradle/plugins/shadow/internal/AbstractDependencyFilter.groovy#L101)
   
   Thanks @chia7712 for pointing it out! I didn't know that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Clean up TestUtils.scala [kafka]

2024-04-25 Thread via GitHub


m1a2st commented on PR #15808:
URL: https://github.com/apache/kafka/pull/15808#issuecomment-2078376955

   @chia7712 I resolve the conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: enable kraft test for ReassignPartitionsIntegrationTest [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on PR #15675:
URL: https://github.com/apache/kafka/pull/15675#issuecomment-2078363083

   btw, it would be great if you can rewrite it by new test infra.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Clean up TestUtils.scala [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on PR #15808:
URL: https://github.com/apache/kafka/pull/15808#issuecomment-2078361586

   @m1a2st Could you please fix the conflicts? thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-25 Thread via GitHub


akhileshchg commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1580260431


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -786,12 +773,31 @@ public void run() throws Exception {
 }
 }
 
+class RecoverMigrationStateFromZKEvent extends MigrationEvent {
+@Override
+public void run() throws Exception {
+if (checkDriverState(MigrationDriverState.UNINITIALIZED, this)) {
+applyMigrationOperation("Recovering migration state from ZK", 
zkMigrationClient::getOrCreateMigrationRecoveryState);

Review Comment:
   For my understanding, was this the line where uncaught exception is thrown? 
Can we handle the exception more gracefully and log and error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580243392


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1092,6 +1097,86 @@ private void 
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
 }
 }
 
+/**
+ * Validates if the received classic member protocols are supported by the 
group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberId  The joining member id.
+ * @param protocolType  The joining member protocol type.
+ * @param protocols The joining member protocol collection.
+ */
+private void throwIfClassicProtocolIsNotSupported(
+ConsumerGroup group,
+String memberId,
+String protocolType,
+JoinGroupRequestProtocolCollection protocols
+) {
+if (!group.supportsClassicProtocols(protocolType, 
ClassicGroupMember.plainProtocolSet(protocols))) {
+throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + 
memberId + "'s protocols are not supported.");
+}
+}
+
+/**
+ * Deserialize the subscription in JoinGroupRequestProtocolCollection.
+ * All the protocols have the same subscription, so the method picks a 
random one.
+ *
+ * @param protocols The JoinGroupRequestProtocolCollection.
+ * @return The Subscription.
+ */
+private static ConsumerPartitionAssignor.Subscription 
deserializeSubscription(
+JoinGroupRequestProtocolCollection protocols
+) {
+try {
+return ConsumerProtocol.deserializeSubscription(
+ByteBuffer.wrap(protocols.stream().findAny().get().metadata())
+);
+} catch (SchemaException e) {
+throw new IllegalStateException("Malformed embedded consumer 
protocol.");
+}
+}
+
+/**
+ * Validates the generation id and returns the owned partitions in the 
JoinGroupRequest to a consumer group.
+ *
+ * @param memberThe joining member.
+ * @param subscription  The Subscription.
+ * @return The owned partitions if valid, otherwise return null.
+ */
+private List 
validateGenerationIdAndGetOwnedPartition(
+ConsumerGroupMember member,
+ConsumerPartitionAssignor.Subscription subscription
+) {
+List 
ownedPartitions =
+toTopicPartitions(subscription.ownedPartitions(), 
metadataImage.topics());
+if (subscription.generationId().isPresent() && 
subscription.generationId().get() == member.memberEpoch()) {
+return ownedPartitions;
+} else {
+// If the generation id is not provided or doesn't match the 
member epoch, it's still safe to
+// accept the ownedPartitions that is a subset of the assigned 
partition. Otherwise, set the
+// ownedPartition to be null. When a new assignment is provided, 
the consumer will stop fetching
+// from and revoke the partitions it does not own.
+if (isSubset(ownedPartitions, member.assignedPartitions())) {
+return ownedPartitions;
+} else {
+return null;
+}
+}
+}
+
+/**
+ * @return The ConsumerGroupHeartbeatRequestData.TopicPartitions list 
converted from the TopicPartitions list.
+ */
+private static List 
toTopicPartitions(
+List partitions,
+TopicsImage topicsImage
+) {
+return ConsumerGroup.topicPartitionMapFromList(partitions, 
topicsImage).entrySet().stream().map(

Review Comment:
   I thought we need to sort the partitions by topic anyways, so I don't quite 
get how to combine(?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580240603


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+// A dynamic member (re-)joins.
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+newMemberCreated = !group.hasMember(memberId);
+
+member = group.getOrMaybeCreateMember(memberId, true);
+if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+}
+} else {
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+member = group.staticMember(instanceId);
+// A new static member joins or the existing static member rejoins.
+if (isUnknownMember) {
+newMemberCreated = true;
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, true);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+} else {
+// Replace the current static member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+log.info("[GroupId {}] Static member {} with instance id 
{} re-joins the consumer group.", groupId, memberId, instanceId);
+}
+} else {
+// Rejoining static member. Fence the 

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580240603


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+// A dynamic member (re-)joins.
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+newMemberCreated = !group.hasMember(memberId);
+
+member = group.getOrMaybeCreateMember(memberId, true);
+if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+}
+} else {
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+member = group.staticMember(instanceId);
+// A new static member joins or the existing static member rejoins.
+if (isUnknownMember) {
+newMemberCreated = true;
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, true);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+} else {
+// Replace the current static member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+log.info("[GroupId {}] Static member {} with instance id 
{} re-joins the consumer group.", groupId, memberId, instanceId);
+}
+} else {
+// Rejoining static member. Fence the 

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580240281


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1092,6 +1097,86 @@ private void 
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
 }
 }
 
+/**
+ * Validates if the received classic member protocols are supported by the 
group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberId  The joining member id.
+ * @param protocolType  The joining member protocol type.
+ * @param protocols The joining member protocol collection.
+ */
+private void throwIfClassicProtocolIsNotSupported(
+ConsumerGroup group,
+String memberId,
+String protocolType,
+JoinGroupRequestProtocolCollection protocols
+) {
+if (!group.supportsClassicProtocols(protocolType, 
ClassicGroupMember.plainProtocolSet(protocols))) {
+throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + 
memberId + "'s protocols are not supported.");
+}
+}
+
+/**
+ * Deserialize the subscription in JoinGroupRequestProtocolCollection.
+ * All the protocols have the same subscription, so the method picks a 
random one.
+ *
+ * @param protocols The JoinGroupRequestProtocolCollection.
+ * @return The Subscription.
+ */
+private static ConsumerPartitionAssignor.Subscription 
deserializeSubscription(
+JoinGroupRequestProtocolCollection protocols
+) {
+try {
+return ConsumerProtocol.deserializeSubscription(
+ByteBuffer.wrap(protocols.stream().findAny().get().metadata())
+);
+} catch (SchemaException e) {
+throw new IllegalStateException("Malformed embedded consumer 
protocol.");
+}
+}
+
+/**
+ * Validates the generation id and returns the owned partitions in the 
JoinGroupRequest to a consumer group.
+ *
+ * @param memberThe joining member.
+ * @param subscription  The Subscription.
+ * @return The owned partitions if valid, otherwise return null.
+ */
+private List 
validateGenerationIdAndGetOwnedPartition(

Review Comment:
   I was wondering if we can put off checking generation id to sync group and 
throw ILLEGAL_GENERATION there. The client will rejoin in such situation. 
That's what we did in the classic protocol.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+ 

[jira] [Created] (KAFKA-16626) Uuid to String for subscribed topic names in assignment spec

2024-04-25 Thread Ritika Reddy (Jira)
Ritika Reddy created KAFKA-16626:


 Summary: Uuid to String for subscribed topic names in assignment 
spec
 Key: KAFKA-16626
 URL: https://issues.apache.org/jira/browse/KAFKA-16626
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ritika Reddy
Assignee: Ritika Reddy


In creating the assignment spec from the existing consumer subscription 
metadata, quite some time is spent in converting the String to a Uuid

Change from Uuid to String for the subscribed topics in assignment spec and 
convert on the fly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16614:Disallow @ClusterTemplate("") [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on PR #15800:
URL: https://github.com/apache/kafka/pull/15800#issuecomment-2078284518

   @TaiJuWu Could you add a new test file (`ClusterTestExtensionsUnitTest`) to 
verify the function `processClusterTemplate`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580210319


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+// A dynamic member (re-)joins.
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+newMemberCreated = !group.hasMember(memberId);
+
+member = group.getOrMaybeCreateMember(memberId, true);
+if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+}
+} else {
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+member = group.staticMember(instanceId);
+// A new static member joins or the existing static member rejoins.
+if (isUnknownMember) {
+newMemberCreated = true;
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, true);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+} else {
+// Replace the current static member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+log.info("[GroupId {}] Static member {} with instance id 
{} re-joins the consumer group.", groupId, memberId, instanceId);
+}
+} else {
+// Rejoining static member. Fence the 

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580203100


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+// A dynamic member (re-)joins.
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+newMemberCreated = !group.hasMember(memberId);
+
+member = group.getOrMaybeCreateMember(memberId, true);
+if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+}
+} else {
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+member = group.staticMember(instanceId);
+// A new static member joins or the existing static member rejoins.
+if (isUnknownMember) {
+newMemberCreated = true;
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, true);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+} else {
+// Replace the current static member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+log.info("[GroupId {}] Static member {} with instance id 
{} re-joins the consumer group.", groupId, memberId, instanceId);
+}
+} else {
+// Rejoining static member. Fence the 

Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on PR #15761:
URL: https://github.com/apache/kafka/pull/15761#issuecomment-2078268366

   @brandboat could you please fix the conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16625) Reverse Lookup Partition to Member in Assignors

2024-04-25 Thread Ritika Reddy (Jira)
Ritika Reddy created KAFKA-16625:


 Summary: Reverse Lookup Partition to Member in Assignors
 Key: KAFKA-16625
 URL: https://issues.apache.org/jira/browse/KAFKA-16625
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ritika Reddy
Assignee: Ritika Reddy


Calculating unassigned partitions within the Uniform assignor is a costly 
process, this can be improved by using a reverse lookup map between 
topicIdPartition and the member



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15803:
URL: https://github.com/apache/kafka/pull/15803#discussion_r1580186766


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1789,6 +1791,33 @@ public void testProcessBackgroundEventsTimesOut() throws 
Exception {
 }
 }
 
+/**
+ * Tests that calling {@link Thread#interrupt()} before {@link 
KafkaConsumer#poll(Duration)}
+ * causes {@link InterruptException} to be thrown.
+ */
+@Test
+public void testPollThrowsInterruptExceptionIfInterrupted() {

Review Comment:
   Is it possible to merge this new test into 
https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L1263?
 If not, maybe we can remove the TODO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580183434


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+// A dynamic member (re-)joins.
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+newMemberCreated = !group.hasMember(memberId);
+
+member = group.getOrMaybeCreateMember(memberId, true);
+if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+}
+} else {
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+member = group.staticMember(instanceId);
+// A new static member joins or the existing static member rejoins.
+if (isUnknownMember) {
+newMemberCreated = true;
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, true);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+} else {
+// Replace the current static member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());

Review Comment:
   Yeah I think it makes sense. We can remove it without canceling the timers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to 

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580183181


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+// A dynamic member (re-)joins.
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+newMemberCreated = !group.hasMember(memberId);
+
+member = group.getOrMaybeCreateMember(memberId, true);
+if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+}
+} else {
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+member = group.staticMember(instanceId);
+// A new static member joins or the existing static member rejoins.
+if (isUnknownMember) {
+newMemberCreated = true;
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, true);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+} else {
+// Replace the current static member.

Review Comment:
   If a static member joins without a member id, we replace any existing member 
with the same instance id; 
   if the static member joins with a member id, then we treat it as a normal 
rejoin if the static member exists and the member id matches, and throw an 
exception otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580182980


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+// A dynamic member (re-)joins.
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+newMemberCreated = !group.hasMember(memberId);
+
+member = group.getOrMaybeCreateMember(memberId, true);
+if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);

Review Comment:
   I think we don't need to for new members, but it doesn't hurt to also do the 
check since we always accept the empty ownedPartitions. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1580175708


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -620,16 +620,19 @@ class ZkMigrationIntegrationTest {
   // Alter the metadata
   log.info("Updating metadata with AdminClient")
   admin = zkCluster.createAdminClient()
-  alterTopicConfig(admin).all().get(60, TimeUnit.SECONDS)
-  alterClientQuotas(admin).all().get(60, TimeUnit.SECONDS)
+  alterTopicConfig(admin)
+  alterClientQuotas(admin)
+  alterBrokerConfigs(admin)
 
   // Verify the changes made to KRaft are seen in ZK
   log.info("Verifying metadata changes with ZK")
   verifyTopicConfigs(zkClient)
   verifyClientQuotas(zkClient)
+  verifyBrokerConfigs(zkClient)
   val nextKRaftProducerId = 
sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30, 
TimeUnit.SECONDS)
   assertNotEquals(nextProducerId, nextKRaftProducerId)
-
+} catch {
+  case t: Throwable => fail("Uncaught error in test", t)

Review Comment:
   >  I've seen many cases where its hard to see where a test failing due to a 
throw. 
   
   Could you please share the error stack to me?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1580174626


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -852,6 +855,33 @@ class ZkMigrationIntegrationTest {
 }
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testIncrementalAlterConfigsPreMigration(zkCluster: ClusterInstance): 
Unit = {
+// Enable migration configs and restart brokers without KRaft quorum ready
+
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
"1@localhost:")

Review Comment:
   Sorry that I just notice this PR after merging the refactor-related PR :_



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in metadata [kafka]

2024-04-25 Thread via GitHub


chia7712 merged PR #15806:
URL: https://github.com/apache/kafka/pull/15806


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in metadata [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on PR #15806:
URL: https://github.com/apache/kafka/pull/15806#issuecomment-2078233197

   ```
   ./gradlew cleanTest :streams:test --tests 
StreamsAssignmentScaleTest.testHighAvailabilityTaskAssignorManyStandbys 
:tools:test --tests 
MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful --tests 
DeleteOffsetsConsumerGroupCommandIntegrationTest.testDeleteOffsetsOfStableConsumerGroupWithTopicOnly
 --tests 
DeleteOffsetsConsumerGroupCommandIntegrationTest.testDeleteOffsetsOfStableConsumerGroupWithTopicPartition
 --tests 
DeleteOffsetsConsumerGroupCommandIntegrationTest.testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly
 :storage:test --tests 
TopicBasedRemoteLogMetadataManagerTest.testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch
 --tests TransactionsWithTieredStoreTest.testSendOffsetsToTransactionTimeout 
--tests TransactionsWithTieredStoreTest.testAbortTransactionTimeout 
:connect:runtime:test --tests 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
 :metadata:test --tests 
QuorumControllerMetricsIntegrationTest.testTimeoutMetrics :tr
 ogdor:test --tests CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated 
:connect:mirror:test --tests 
IdentityReplicationIntegrationTest.testSyncTopicConfigs --tests 
MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicateSourceDefault
 :core:test --tests ConsumerBounceTest.testConsumptionWithBrokerFailures 
--tests ReplicationQuotasTest.shouldThrottleOldSegments
   ```
   don't notice related failure, and they pass on my local. will merge it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-25 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1580159998


##
clients/src/main/resources/common/message/VotersRecord.json:
##
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "VotersRecord",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "Version", "type": "int16", "versions": "0+",
+  "about": "The version of the voters record" },
+{ "name": "Voters", "type": "[]Voter", "versions": "0+", "fields": [
+  { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+"about": "The replica id of the voter in the topic partition" },
+  { "name": "VoterUuid", "type": "uuid", "versions": "0+",

Review Comment:
   Sounds good to me. I'll update the KIP and the other schemas that use a 
similar name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-25 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1580158966


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java:
##
@@ -807,4 +809,62 @@ private static void writeSnapshotFooterRecord(
 builder.appendSnapshotFooterMessage(timestamp, 
snapshotFooterRecord);
 }
 }
+
+public static MemoryRecords withKRaftVersionRecord(
+long initialOffset,
+long timestamp,
+int leaderEpoch,
+ByteBuffer buffer,
+KRaftVersionRecord kraftVersionRecord
+) {
+writeKRaftVersionRecord(buffer, initialOffset, timestamp, leaderEpoch, 
kraftVersionRecord);
+buffer.flip();
+return MemoryRecords.readableRecords(buffer);
+}
+
+private static void writeKRaftVersionRecord(
+ByteBuffer buffer,
+long initialOffset,
+long timestamp,
+int leaderEpoch,
+KRaftVersionRecord kraftVersionRecord
+) {
+try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+TimestampType.CREATE_TIME, initialOffset, timestamp,
+RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
+false, true, leaderEpoch, buffer.capacity())
+) {
+builder.appendKRaftVersionMessage(timestamp, kraftVersionRecord);
+}
+}
+
+public static MemoryRecords withVotersRecord(
+long initialOffset,
+long timestamp,
+int leaderEpoch,
+ByteBuffer buffer,
+VotersRecord votersRecord
+) {
+writeVotersRecord(buffer, initialOffset, timestamp, leaderEpoch, 
votersRecord);

Review Comment:
   What do you mean we by "we create two MemoryRecords instances"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-25 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1580155319


##
clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java:
##
@@ -44,11 +44,15 @@ public enum ControlRecordType {
 ABORT((short) 0),
 COMMIT((short) 1),
 
-// Raft quorum related control messages.
+// KRaft quorum related control messages
 LEADER_CHANGE((short) 2),
 SNAPSHOT_HEADER((short) 3),
 SNAPSHOT_FOOTER((short) 4),
 
+// KRaft membership changes messages
+KRAFT_VERSION((short) 5),
+VOTERS((short) 6),

Review Comment:
   Sounds. Fixed for KRAFT_VOTERS. I'll fix the rest in another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-12534) kafka-configs does not work with ssl enabled kafka broker.

2024-04-25 Thread keith.paulson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840985#comment-17840985
 ] 

keith.paulson edited comment on KAFKA-12534 at 4/25/24 9:26 PM:


I can reproduce this with kafka 3.6.0 and BCFKS keystores.  

 

Changing keystore and password gives 
{code:java}
 ERROR Encountered metadata publishing fault: Error updating node with new 
configuration: listener.name.SSL.ssl.key.password -> 
[hidden],listener.name.SSL.ssl.keystore.location -> /etc/ssl/
{code}
{color:#910091}cert{color}
{code:java}
.bcfks in MetadataDelta up to 5940236 
(org.apache.kafka.server.fault.LoggingFaultHandler)
org.apache.kafka.common.config.ConfigException: Validation of dynamic config 
update of SSLFactory failed: org.apache.kafka.common.KafkaException: Failed to 
load SSL keystore /etc/ssl/
{code}
{color:#910091}cert{color}
{code:java}
.bcfks of type BCFKS {code}
but on a kafka restart, that keystore/password combination works fine

 

If I change just the keystore (ie new keystore created using same password as 
previous one), the cert change works
{code:java}
[2024-04-25 21:15:47,065] INFO [DynamicConfigPublisher broker id=1] Updating 
node 1 with new configuration : listener.name.SSL.ssl.keystore.location -> 
/etc/ssl/
{code}
{color:#910091}cert{color}
{code:java}
.bcfks (kafka.server.metadata.DynamicConfigPublisher)
{code}
 

Not being able to change passwords is a significant limitation.


was (Author: JIRAUSER299451):
I can reproduce this with kafka 3.6.0 and BCFKS keystores.  

 

Changing keystore and password gives 
{code:java}
 ERROR Encountered metadata publishing fault: Error updating node with new 
configuration: listener.name.SSL.ssl.key.password -> 
[hidden],listener.name.SSL.ssl.keystore.location -> 
/etc/ssl/private/kafkachain.bcfks in MetadataDelta up to 5940236 
(org.apache.kafka.server.fault.LoggingFaultHandler)
org.apache.kafka.common.config.ConfigException: Validation of dynamic config 
update of SSLFactory failed: org.apache.kafka.common.KafkaException: Failed to 
load SSL keystore /etc/ssl/private/kafkachain.bcfks of type BCFKS {code}
but on a kafka restart, that keystore/password combination works fine

 

If I change just the keystore (ie new keystore created using same password as 
previous one)
{code:java}
[2024-04-25 21:15:47,065] INFO [DynamicConfigPublisher broker id=1] Updating 
node 1 with new configuration : listener.name.SSL.ssl.keystore.location -> 
/etc/ssl/private/kafkachain.bcfks (kafka.server.metadata.DynamicConfigPublisher)
{code}
 

Not being able to change passwords is a significant limitation.

> kafka-configs does not work with ssl enabled kafka broker.
> --
>
> Key: KAFKA-12534
> URL: https://issues.apache.org/jira/browse/KAFKA-12534
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.1
>Reporter: kaushik srinivas
>Priority: Critical
>
> We are trying to change the trust store password on the fly using the 
> kafka-configs script for a ssl enabled kafka broker.
> Below is the command used:
> kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers 
> --entity-name 1001 --alter --add-config 'ssl.truststore.password=xxx'
> But we see below error in the broker logs when the command is run.
> {"type":"log", "host":"kf-2-0", "level":"INFO", 
> "neid":"kafka-cfd5ccf2af7f47868e83473408", "system":"kafka", 
> "time":"2021-03-23T12:14:40.055", "timezone":"UTC", 
> "log":\{"message":"data-plane-kafka-network-thread-1002-ListenerName(SSL)-SSL-2
>  - org.apache.kafka.common.network.Selector - [SocketServer brokerId=1002] 
> Failed authentication with /127.0.0.1 (SSL handshake failed)"}}
>  How can anyone configure ssl certs for the kafka-configs script and succeed 
> with the ssl handshake in this case ? 
> Note : 
> We are trying with a single listener i.e SSL: 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12534) kafka-configs does not work with ssl enabled kafka broker.

2024-04-25 Thread keith.paulson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840985#comment-17840985
 ] 

keith.paulson commented on KAFKA-12534:
---

I can reproduce this with kafka 3.6.0 and BCFKS keystores.  

 

Changing keystore and password gives 
{code:java}
 ERROR Encountered metadata publishing fault: Error updating node with new 
configuration: listener.name.SSL.ssl.key.password -> 
[hidden],listener.name.SSL.ssl.keystore.location -> 
/etc/ssl/private/kafkachain.bcfks in MetadataDelta up to 5940236 
(org.apache.kafka.server.fault.LoggingFaultHandler)
org.apache.kafka.common.config.ConfigException: Validation of dynamic config 
update of SSLFactory failed: org.apache.kafka.common.KafkaException: Failed to 
load SSL keystore /etc/ssl/private/kafkachain.bcfks of type BCFKS {code}
but on a kafka restart, that keystore/password combination works fine

 

If I change just the keystore (ie new keystore created using same password as 
previous one)
{code:java}
[2024-04-25 21:15:47,065] INFO [DynamicConfigPublisher broker id=1] Updating 
node 1 with new configuration : listener.name.SSL.ssl.keystore.location -> 
/etc/ssl/private/kafkachain.bcfks (kafka.server.metadata.DynamicConfigPublisher)
{code}
 

Not being able to change passwords is a significant limitation.

> kafka-configs does not work with ssl enabled kafka broker.
> --
>
> Key: KAFKA-12534
> URL: https://issues.apache.org/jira/browse/KAFKA-12534
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.1
>Reporter: kaushik srinivas
>Priority: Critical
>
> We are trying to change the trust store password on the fly using the 
> kafka-configs script for a ssl enabled kafka broker.
> Below is the command used:
> kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers 
> --entity-name 1001 --alter --add-config 'ssl.truststore.password=xxx'
> But we see below error in the broker logs when the command is run.
> {"type":"log", "host":"kf-2-0", "level":"INFO", 
> "neid":"kafka-cfd5ccf2af7f47868e83473408", "system":"kafka", 
> "time":"2021-03-23T12:14:40.055", "timezone":"UTC", 
> "log":\{"message":"data-plane-kafka-network-thread-1002-ListenerName(SSL)-SSL-2
>  - org.apache.kafka.common.network.Selector - [SocketServer brokerId=1002] 
> Failed authentication with /127.0.0.1 (SSL handshake failed)"}}
>  How can anyone configure ssl certs for the kafka-configs script and succeed 
> with the ssl handshake in this case ? 
> Note : 
> We are trying with a single listener i.e SSL: 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-25 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1580152614


##
clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java:
##
@@ -26,7 +26,7 @@
 /**
  * Represents an immutable basic version range using 2 attributes: min and 
max, each of type short.
  * The min and max attributes need to satisfy 2 rules:
- *  - they are each expected to be >= 1, as we only consider positive version 
values to be valid.

Review Comment:
   Yes. I think this was missed during the original implementation. The default 
value for any feature version is 0 but that cannot be expressed in the range of 
supported versions since it doesn't allow 0 as the min or max value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-25 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1580150231


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -852,6 +855,33 @@ class ZkMigrationIntegrationTest {
 }
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testIncrementalAlterConfigsPreMigration(zkCluster: ClusterInstance): 
Unit = {
+// Enable migration configs and restart brokers without KRaft quorum ready
+
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
"1@localhost:")

Review Comment:
   The config changes are killing me in this PR  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-25 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1580149700


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -620,16 +620,19 @@ class ZkMigrationIntegrationTest {
   // Alter the metadata
   log.info("Updating metadata with AdminClient")
   admin = zkCluster.createAdminClient()
-  alterTopicConfig(admin).all().get(60, TimeUnit.SECONDS)
-  alterClientQuotas(admin).all().get(60, TimeUnit.SECONDS)
+  alterTopicConfig(admin)
+  alterClientQuotas(admin)
+  alterBrokerConfigs(admin)
 
   // Verify the changes made to KRaft are seen in ZK
   log.info("Verifying metadata changes with ZK")
   verifyTopicConfigs(zkClient)
   verifyClientQuotas(zkClient)
+  verifyBrokerConfigs(zkClient)
   val nextKRaftProducerId = 
sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30, 
TimeUnit.SECONDS)
   assertNotEquals(nextProducerId, nextKRaftProducerId)
-
+} catch {
+  case t: Throwable => fail("Uncaught error in test", t)

Review Comment:
   Just my preference I guess. I've seen many cases where its hard to see where 
a test failing due to a throw. Maybe this is better in recent versions of JUnit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-25 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1580133686


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -687,7 +688,7 @@ public void 
testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignme
 currentAssignmentForC
 ));
 
-AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, 
HETEROGENEOUS);

Review Comment:
   Yeah it covers it. Nothing changed in the hetero path anyways



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1580130359


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -950,16 +980,47 @@ class ZkMigrationIntegrationTest {
 dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get
   }
 
-  def alterTopicConfig(admin: Admin): AlterConfigsResult = {
+  def alterBrokerConfigs(admin: Admin): Unit = {
+val defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
"")
+val defaultBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "8640"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
+val broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1")
+val specificBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "4320"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+
+TestUtils.retry(6) {
+  val result = admin.incrementalAlterConfigs(Map(
+defaultBrokerResource -> defaultBrokerConfigs,
+broker0Resource -> specificBrokerConfigs,
+broker1Resource -> specificBrokerConfigs
+  ).asJava)
+  try {
+result.all().get(10, TimeUnit.SECONDS)
+  } catch {
+case t: Throwable => fail("Alter Broker Configs had an error", t)
+  }
+}

Review Comment:
   I have the same question here.
   1. In the test case `testIncrementalAlterConfigsPreMigration`, it should 
pass at once since KRaft quorum is not ready.
   2. In the test `testDualWrite`,  it should pass at once as we have wait for 
migration to begin.



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -974,7 +1035,11 @@ class ZkMigrationIntegrationTest {
 quotas.add(new ClientQuotaAlteration(
   new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
   List(new ClientQuotaAlteration.Op("connection_creation_rate", 
10.0)).asJava))
-admin.alterClientQuotas(quotas)
+try {
+  admin.alterClientQuotas(quotas).all().get(10, TimeUnit.SECONDS)
+} catch {
+  case t: Throwable => fail("Alter Client Quotas had an error", t)

Review Comment:
   ditto. not sure why we need to catch it and then call `fail`



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -620,16 +620,19 @@ class ZkMigrationIntegrationTest {
   // Alter the metadata
   log.info("Updating metadata with AdminClient")
   admin = zkCluster.createAdminClient()
-  alterTopicConfig(admin).all().get(60, TimeUnit.SECONDS)
-  alterClientQuotas(admin).all().get(60, TimeUnit.SECONDS)
+  alterTopicConfig(admin)
+  alterClientQuotas(admin)
+  alterBrokerConfigs(admin)
 
   // Verify the changes made to KRaft are seen in ZK
   log.info("Verifying metadata changes with ZK")
   verifyTopicConfigs(zkClient)
   verifyClientQuotas(zkClient)
+  verifyBrokerConfigs(zkClient)
   val nextKRaftProducerId = 
sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30, 
TimeUnit.SECONDS)
   assertNotEquals(nextProducerId, nextKRaftProducerId)
-
+} catch {
+  case t: Throwable => fail("Uncaught error in test", t)

Review Comment:
   why catching the error in this test case? The test case can get failed even 
though we don't catch it. Or you plan to make it be retryable by 
`TestUtils.retry`?



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -852,6 +855,33 @@ class ZkMigrationIntegrationTest {
 }
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testIncrementalAlterConfigsPreMigration(zkCluster: ClusterInstance): 
Unit = {
+// Enable migration configs and restart brokers without KRaft quorum ready
+
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
"1@localhost:")

Review Comment:
   `RaftConfig` is renamed to `QuorumConfig`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about 

[jira] [Created] (KAFKA-16624) Don't generate useless PartitionChangeRecord on older MV

2024-04-25 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-16624:


 Summary: Don't generate useless PartitionChangeRecord on older MV
 Key: KAFKA-16624
 URL: https://issues.apache.org/jira/browse/KAFKA-16624
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe
Assignee: Colin McCabe


Fix a case where we could generate useless PartitionChangeRecords on metadata 
versions older than 3.6-IV0. This could happen in the case where we had an ISR 
with only one broker in it, and we were trying to go down to a fully empty ISR. 
In this case, PartitionChangeBuilder would block the record to going down to a 
fully empty ISR (since that is not valid in these pre-KIP-966 metadata 
versions), but it would still emit the record, even though it had no effect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-25 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1580115582


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -90,17 +91,29 @@ private Map> membersPerTopic(final 
AssignmentSpec assignmentS
 Map> membersPerTopic = new HashMap<>();
 Map membersData = 
assignmentSpec.members();
 
-membersData.forEach((memberId, memberMetadata) -> {
-Collection topics = memberMetadata.subscribedTopicIds();
+if (assignmentSpec.groupSubscriptionModel().equals(HOMOGENEOUS)) {
+List allMembers = new ArrayList<>(membersData.keySet());

Review Comment:
   Thanks! Made the change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on PR #15750:
URL: https://github.com/apache/kafka/pull/15750#issuecomment-2078159039

   I think I would limit it to the request state classes, just to keep the 
changes to a minimum


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15647:
URL: https://github.com/apache/kafka/pull/15647#discussion_r1580103763


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:
##
@@ -327,21 +327,27 @@ private void handleInitializeErrors(final CompletedFetch 
completedFetch, final E
 final long fetchOffset = completedFetch.nextFetchOffset();
 
 if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
-error == Errors.REPLICA_NOT_AVAILABLE ||
+error == Errors.FENCED_LEADER_EPOCH) {
+log.debug("Error in fetch for partition {}: {}", tp, 
error.exceptionName());
+requestMetadataUpdate(metadata, subscriptions, tp);
+} else if (error == Errors.REPLICA_NOT_AVAILABLE ||
 error == Errors.KAFKA_STORAGE_ERROR ||
-error == Errors.FENCED_LEADER_EPOCH ||
 error == Errors.OFFSET_NOT_AVAILABLE) {
 log.debug("Error in fetch for partition {}: {}", tp, 
error.exceptionName());
 requestMetadataUpdate(metadata, subscriptions, tp);
+subscriptions.awaitUpdate(tp);

Review Comment:
   With this change, if the replica is not available, we will flag the 
partition as awaiting a metadata update. Is this a key part of this change? Why 
don't we want the first `if` block to also await an update?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##
@@ -967,7 +984,8 @@ private boolean 
maybeValidatePosition(Metadata.LeaderAndEpoch currentLeaderAndEp
 return false;
 }
 
-if (position != null && 
!position.currentLeader.equals(currentLeaderAndEpoch)) {
+if (position != null &&
+(!position.currentLeader.equals(currentLeaderAndEpoch) || 
this.fetchState.equals(FetchStates.AWAIT_UPDATE))) {

Review Comment:
   Not sure if using the helper method shortens the line length enough to avoid 
wrapping 路‍♂️ 
   
   ```suggestion
   if (position != null && 
(!position.currentLeader.equals(currentLeaderAndEpoch) || awaitingUpdate())) {
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget,
 if (partitionData.currentLeader().leaderId() != -1 && 
partitionData.currentLeader().leaderEpoch() != -1) {
 partitionsWithUpdatedLeaderInfo.put(partition, new 
Metadata.LeaderIdAndEpoch(
 
Optional.of(partitionData.currentLeader().leaderId()), 
Optional.of(partitionData.currentLeader().leaderEpoch(;
+} else {
+requestMetadataUpdate(metadata, subscriptions, 
partition);
+subscriptions.awaitUpdate(partition);

Review Comment:
   With this change, we first request a metadata update, then flag our 
partition as awaiting the metadata update whenever we encounter a 
`NOT_LEADER_OR_FOLLOWER` or `FENCED_LEADER_EPOCH`. However, in the 
`FetchCollector.handleInitializeErrors()` method, we only only request the 
metadata update, but _don't_ flag the partition. Is that seeming inconsistency 
intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]

2024-04-25 Thread via GitHub


phooq commented on PR #15750:
URL: https://github.com/apache/kafka/pull/15750#issuecomment-2078128947

   Thanks for the suggestion @kirktrue . I will use `toStringDetails()` then. 
Would you suggest I change the methods in the `***Event` classes as well, or 
just focus on `RequestState` for this PR?  
   
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once

2024-04-25 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840966#comment-17840966
 ] 

Edoardo Comar commented on KAFKA-16622:
---

Activating DEBUG logging 
```
[2024-04-25 21:20:10,856] DEBUG [MirrorCheckpointConnector|task-0] 
translateDownstream(mygroup1,mytopic-0,13805): Skipped 
(OffsetSync{topicPartition=mytopic-0, upstreamOffset=1, 
downstreamOffset=1} is ahead of upstream consumer group 13805) 
(org.apache.kafka.connect.mirror.OffsetSyncStore:125)
```
The checkpoint is not emitted because the topic-partition has been mirrorred 
further than where the consumer group is,
so until the group catches up no checkpoints will be emitted.

Question for [~gregharris73]
this behavior would mean that any consumers in groups that are behind the log 
end 
that are switched from consuming from source cluster to the target cluster 
to reprocess the entire partition ? They would have access to no translated 
offsets.


> Mirromaker2 first Checkpoint not emitted until consumer group fully catches 
> up once
> ---
>
> Key: KAFKA-16622
> URL: https://issues.apache.org/jira/browse/KAFKA-16622
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0, 3.6.2, 3.8.0
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: edo-connect-mirror-maker-sourcetarget.properties
>
>
> We observed an excessively delayed emission of the MM2 Checkpoint record.
> It only gets created when the source consumer reaches the end of a topic. 
> This does not seem reasonable.
> In a very simple setup :
> Tested with a standalone single process MirrorMaker2 mirroring between two 
> single-node kafka clusters(mirromaker config attached) with quick refresh 
> intervals (eg 5 sec) and a small offset.lag.max (eg 10)
> create a single topic in the source cluster
> produce data to it (e.g. 1 records)
> start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec 
> between polls which commits after each poll
> watch the Checkpoint topic in the target cluster
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \
>   --topic source.checkpoints.internal \
>   --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \
>--from-beginning
> -> no record appears in the checkpoint topic until the consumer reaches the 
> end of the topic (ie its consumer group lag gets down to 0).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15307: Update/errors for deprecated config [kafka]

2024-04-25 Thread via GitHub


Cerchie commented on code in PR #14448:
URL: https://github.com/apache/kafka/pull/14448#discussion_r1580062597


##
.github/workflows/codesee-arch-diagram.yml:
##
@@ -0,0 +1,23 @@
+# This workflow was added by CodeSee. Learn more at https://codesee.io/

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-25 Thread via GitHub


hachikuji commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1576879728


##
clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java:
##
@@ -26,7 +26,7 @@
 /**
  * Represents an immutable basic version range using 2 attributes: min and 
max, each of type short.
  * The min and max attributes need to satisfy 2 rules:
- *  - they are each expected to be >= 1, as we only consider positive version 
values to be valid.

Review Comment:
   Was it a bug that we only allowed version 1 and above? I'm wondering if we 
really need to change it. 



##
clients/src/main/resources/common/message/VotersRecord.json:
##
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "VotersRecord",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "Version", "type": "int16", "versions": "0+",
+  "about": "The version of the voters record" },
+{ "name": "Voters", "type": "[]Voter", "versions": "0+", "fields": [
+  { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+"about": "The replica id of the voter in the topic partition" },
+  { "name": "VoterUuid", "type": "uuid", "versions": "0+",

Review Comment:
   Why don't we call it `VoterDirectoryId`? 



##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * A type for representing the set of voters for a topic partition.
+ *
+ * It encapsulates static information like a voter's endpoint and their 
supported kraft.version.
+ *
+ * It providees functionality for converting to and from {@code VotersRecord} 
and for converting
+ * from the static configuration.
+ */
+final public class VoterSet {
+private final Map voters;
+
+VoterSet(Map voters) {
+if (voters.isEmpty()) {
+throw new IllegalArgumentException("Voters cannot be empty");
+}
+
+this.voters = voters;
+}
+
+/**
+ * Returns the socket address for a given voter at a given listener.
+ *
+ * @param voter the id of the voter
+ * @param listener the name of the listener
+ * @return the socket address if it exist, otherwise {@code 
Optional.empty()}
+ */
+public Optional voterAddress(int voter, String 
listener) {
+return Optional.ofNullable(voters.get(voter))
+.flatMap(voterNode -> voterNode.address(listener));
+}
+
+/**
+ * Returns all of the voter ids.
+ */
+public Set voterIds() {
+return voters.keySet();
+}
+
+/**
+ * Adds a voter to the voter set.
+ *
+ * This object is immutable. A new voter set is returned if the voter was 
added.
+ *
+ * A new voter can be added to a voter set if its id doesn't already exist 
in the 

Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1580024742


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.consumer.group;
+
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import static java.util.Collections.singleton;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.GroupType.CONSUMER;
+
+public class ConsumerGroupExecutor implements AutoCloseable {

Review Comment:
   Remove the `public` to make it be package-private



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.consumer.group;
+
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import static java.util.Collections.singleton;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 

[PR] MINOR: Change the documentation of the Brokers field. [kafka]

2024-04-25 Thread via GitHub


emasab opened a new pull request, #15809:
URL: https://github.com/apache/kafka/pull/15809

   It doesn't contain all the brokers in the response: in case a broker is down 
it will be listed in some partition "Replicas" but not be present in this array.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rename RaftConfig to QuorumConfigs [kafka]

2024-04-25 Thread via GitHub


chia7712 merged PR #15797:
URL: https://github.com/apache/kafka/pull/15797


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Modified System.getProperty("line.separator") to java.lang.System.lineSeparator() [kafka]

2024-04-25 Thread via GitHub


chia7712 merged PR #15782:
URL: https://github.com/apache/kafka/pull/15782


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840907#comment-17840907
 ] 

Matthias J. Sax commented on KAFKA-16585:
-

{quote}I can use the regular Processor, but as I understand it add some 
overhead comparing with FixedKeyProcessor
{quote}
Where did you get this? The Processor itself does not have overhead. – The only 
think that could happen downstream is, that a unnecessary repartition step 
could be inserted. We are tackling this via 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling]
{quote}{color:#172b4d}Really, I think FixedKeyProcessor do not need to be 
"ensure that the key is not changed". IMHO there is enough to have a key from 
the same partition. So, if you will provide the way to generate the 
{color}*FixedKeyRecord*{color:#172b4d} from any local store it will be 
enough.{color}
{quote}
{color:#172b4d}Well, technically yes, but there is no simply way to 
enforce/check this... We would need to serialize the provided key, pipe it 
through the Partitioner, and compare the computed partition. Or is there some 
other way to do this? – This would be quite expensive to do.{color}

{color:#172b4d}If you feel strong about all this, feel free to do a POC PR and 
write a KIP about it, and we can take it from there. I don't see a simple way 
to do it, and I believe that using a regular Processor is the right way to go 
(especially with KIP-759 on the horizon). {color}

> No way to forward message from punctuation method in the FixedKeyProcessor
> --
>
> Key: KAFKA-16585
> URL: https://issues.apache.org/jira/browse/KAFKA-16585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
> doesn't have a public constructor and can be created based on existing 
> records. But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2077870915

   @cadonna—thanks for your review. I have made the requested changes, so 
please take another pass. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579915561


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1338,7 +1339,14 @@ private CompletableFuture 
enqueueConsumerRebalanceListenerCallback(Consume
  
Set partitions) {
 SortedSet sortedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
 sortedPartitions.addAll(partitions);
-CompletableBackgroundEvent event = new 
ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
+
+// We don't yet have the concept of having an expiring callback, but 
we will likely want that eventually.
+Timer timer = time.timer(Long.MAX_VALUE);
+CompletableBackgroundEvent event = new 
ConsumerRebalanceListenerCallbackNeededEvent(
+methodName,
+sortedPartitions,
+timer
+);

Review Comment:
   I was able to refactor the code and this is now sans `Timer` again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579914824


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -769,7 +730,8 @@ public void commitAsync(OffsetCommitCallback callback) {
 public void commitAsync(Map offsets, 
OffsetCommitCallback callback) {
 acquireAndEnsureOpen();
 try {
-AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);
+Timer timer = time.timer(Long.MAX_VALUE);
+AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets, 
timer);

Review Comment:
   I was able to refactor the code to eliminate the need for passing in a 
`Timer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579897029


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java:
##
@@ -36,13 +36,14 @@ protected CompletableApplicationEvent(final Type type, 
final Timer timer) {
 super(type);
 this.future = new CompletableFuture<>();
 Objects.requireNonNull(timer);
-this.deadlineMs = timer.remainingMs() + timer.currentTimeMs();
-}
 
-protected CompletableApplicationEvent(final Type type, final long 
deadlineMs) {
-super(type);
-this.future = new CompletableFuture<>();
-this.deadlineMs = deadlineMs;
+long currentTimeMs = timer.currentTimeMs();
+long remainingMs = timer.remainingMs();
+
+if (currentTimeMs > Long.MAX_VALUE - remainingMs)
+this.deadlineMs = Long.MAX_VALUE;
+else
+this.deadlineMs = currentTimeMs + remainingMs;

Review Comment:
   For point 1, I created a new method named `calculateDeadlineMs` that moves 
this code into one place.
   
   For point 2, there is no `Timer` in the event because `Timer` is not thread 
safe, and events cross the application/background thread boundary. I did not 
want to expose the `Timer` in the event to avoid its possible usage from the 
background thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579899408


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java:
##
@@ -16,120 +16,31 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.internals.IdempotentCloser;
-import org.apache.kafka.common.utils.LogContext;
-import org.slf4j.Logger;
-
-import java.io.Closeable;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 
 /**
- * An {@link EventProcessor} is the means by which events produced by 
thread A are
- * processed by thread B. By definition, threads A 
and B run in parallel to
- * each other, so a mechanism is needed with which to receive and process the 
events from the other thread. That
- * communication channel is formed around {@link BlockingQueue a shared queue} 
into which thread A
- * enqueues events and thread B reads and processes those events.
+ * An {@code EventProcessor} is the means by which events are 
processed, the meaning of which is left
+ * intentionally loose. This is in large part to keep the {@code 
EventProcessor} focused on what it means to process
+ * the events, and not linking itself too closely with the rest of 
the surrounding application.
+ *
+ * 
+ *
+ * The {@code EventProcessor} is envisaged as a stateless service that acts as 
a conduit, receiving an event and
+ * dispatching to another block of code to process. The semantic meaning of 
each event is different, so the
+ * {@code EventProcessor} will need to interact with other parts of the system 
that maintain state. The
+ * implementation should not be concerned with the mechanism by which an event 
arrived for processing. While the
+ * events are shuffled around the consumer subsystem by means of {@link 
BlockingQueue shared queues}, it should
+ * be considered an anti-pattern to need to know how it arrived or what 
happens after its is processed.
  */
-public abstract class EventProcessor implements Closeable {
-
-private final Logger log;
-private final BlockingQueue eventQueue;
-private final IdempotentCloser closer;
-
-protected EventProcessor(final LogContext logContext, final 
BlockingQueue eventQueue) {
-this.log = logContext.logger(EventProcessor.class);
-this.eventQueue = eventQueue;
-this.closer = new IdempotentCloser();
-}
-
-public abstract boolean process();
-
-protected abstract void process(T event);
-
-@Override
-public void close() {
-closer.close(this::closeInternal, () -> log.warn("The event processor 
was already closed"));
-}
-
-protected interface ProcessHandler {
-
-void onProcess(T event, Optional error);
-}
+public interface EventProcessor extends AutoCloseable {
 
 /**
- * Drains all available events from the queue, and then processes them in 
order. If any errors are thrown while
- * processing the individual events, these are submitted to the given 
{@link ProcessHandler}.
+ * Process an event that is received.
  */
-protected boolean process(ProcessHandler processHandler) {
-closer.assertOpen("The processor was previously closed, so no further 
processing can occur");
-
-List events = drain();
-
-if (events.isEmpty()) {
-log.trace("No events to process");
-return false;
-}
+void process(T event);
 
-try {
-log.trace("Starting processing of {} event{}", events.size(), 
events.size() == 1 ? "" : "s");
-
-for (T event : events) {
-try {
-Objects.requireNonNull(event, "Attempted to process a null 
event");
-log.trace("Processing event: {}", event);
-process(event);
-processHandler.onProcess(event, Optional.empty());
-} catch (Throwable t) {
-KafkaException error = 
ConsumerUtils.maybeWrapAsKafkaException(t);
-processHandler.onProcess(event, Optional.of(error));
-}
-}
-} finally {
-log.trace("Completed processing");
-}
-
-return true;
-}
-
-/**
- * It is possible for the consumer to close before complete processing all 
the events in the queue. In
- * this case, we need to throw an exception to notify the user the 
consumer is closed.
- */
-private void closeInternal() {
-log.trace("Closing event processor");
-List incompleteEvents = drain();
-
-if (incompleteEvents.isEmpty())
-return;
-
-KafkaException exception = new KafkaException("The consumer is 
closed");
-
-// Check 

Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579897029


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java:
##
@@ -36,13 +36,14 @@ protected CompletableApplicationEvent(final Type type, 
final Timer timer) {
 super(type);
 this.future = new CompletableFuture<>();
 Objects.requireNonNull(timer);
-this.deadlineMs = timer.remainingMs() + timer.currentTimeMs();
-}
 
-protected CompletableApplicationEvent(final Type type, final long 
deadlineMs) {
-super(type);
-this.future = new CompletableFuture<>();
-this.deadlineMs = deadlineMs;
+long currentTimeMs = timer.currentTimeMs();
+long remainingMs = timer.remainingMs();
+
+if (currentTimeMs > Long.MAX_VALUE - remainingMs)
+this.deadlineMs = Long.MAX_VALUE;
+else
+this.deadlineMs = currentTimeMs + remainingMs;

Review Comment:
   For point #1, I created a new method named `calculateDeadlineMs` that moves 
this code into one place.
   
   For point #2, there is no `Timer` in the event because `Timer` is not thread 
safe, and events cross the application/background thread boundary. I did not 
want to expose the `Timer` in the event to avoid its possible usage from the 
background thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-25 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1579894472


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -66,6 +69,7 @@ public class ConfigurationControlManager {
 private final TimelineHashMap> configData;
 private final Map staticConfig;
 private final ConfigResource currentController;
+private final MinIsrConfigUpdatePartitionHandler 
minIsrConfigUpdatePartitionHandler;

Review Comment:
   maybe more of a question for someone with more code ownership of the quorum 
controller code, but I wonder if it would be preferable to handle generating 
the replication control manager records in the 
`QuorumController.incrementalAlterConfigs`. That would also make it a bit 
easier to handle `validateOnly` which we are not currently handling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-25 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1579888146


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
 return ApiError.NONE;
 }
 
+void maybeTriggerPartitionUpdateOnMinIsrChange(List 
records) {
+List minIsrRecords = new ArrayList<>();
+Map topicMap = new HashMap<>();
+Map configRemovedTopicMap = new HashMap<>();
+records.forEach(record -> {
+if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+ConfigRecord configRecord = (ConfigRecord) record.message();
+if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+minIsrRecords.add(configRecord);
+if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+if (configRecord.value() == null) 
topicMap.put(configRecord.resourceName(), configRecord.value());
+else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+}
+}
+}
+});
+
+if (minIsrRecords.isEmpty()) return;
+if (topicMap.size() == minIsrRecords.size()) {
+// If all the min isr config updates are on the topic level, we 
can trigger a simpler update just on the
+// updated topics.
+
records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(
+new ArrayList<>(topicMap.keySet()),
+topicName -> topicMap.get(topicName))
+);
+return;
+}
+
+// Because it may require multiple layer look up for the min ISR 
config value. Build a config data copy and apply
+// the config updates to it. Use this config copy for the min ISR look 
up.
+Map> configDataCopy = 
new HashMap<>(configData);
+SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new 
LogContext("dummy-config-update"));
+for (ConfigRecord record : minIsrRecords) {
+replayInternal(record, configDataCopy, localSnapshotRegistry);
+}

Review Comment:
   why are we calling replay here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-25 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1579886194


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
 return ApiError.NONE;
 }
 
+void maybeTriggerPartitionUpdateOnMinIsrChange(List 
records) {
+List minIsrRecords = new ArrayList<>();
+Map topicMap = new HashMap<>();
+Map configRemovedTopicMap = new HashMap<>();
+records.forEach(record -> {
+if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+ConfigRecord configRecord = (ConfigRecord) record.message();
+if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+minIsrRecords.add(configRecord);
+if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+if (configRecord.value() == null) 
topicMap.put(configRecord.resourceName(), configRecord.value());
+else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+}
+}
+}
+});

Review Comment:
   what is the behavior if the default broker config for `min.insync.replicas` 
is changed? 
   I am not actually sure how that impacts the `min.insync.replicas` for 
existing topics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579885684


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -144,6 +166,12 @@ void runOnce() {
 .map(Optional::get)
 .map(rm -> rm.maximumTimeToWait(currentTimeMs))
 .reduce(Long.MAX_VALUE, Math::min);
+
+// "Complete" any events that have expired. This cleanup step should 
only be called after the network I/O
+// thread has made at least one call to poll. This is done to emulate 
the behavior of the legacy consumer's
+// handling of timeouts. The legacy consumer makes at least one 
attempt to satisfy any network requests
+// before checking if a timeout has expired.

Review Comment:
   Split into a separate method to accommodate the reworded comment.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -273,9 +301,20 @@ void cleanup() {
 log.error("Unexpected error during shutdown.  Proceed with 
closing.", e);
 } finally {
 sendUnsentRequests(timer);
+
+// Copy over the completable events to a separate list, then reap 
any incomplete
+// events on that list.

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579884608


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1270,6 +1230,20 @@ private void close(Duration timeout, boolean 
swallowException) {
 if (applicationEventHandler != null)
 closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed shutting down network thread", firstException);
 closeTimer.update();
+
+if (backgroundEventReaper != null && backgroundEventQueue != null) {

Review Comment:
   Added a brief comment. PTAL.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -128,7 +138,19 @@ void runOnce() {
 // Process the events—if any—that were produced by the application 
thread. It is possible that when processing
 // an event generates an error. In such cases, the processor will log 
an exception, but we do not want those
 // errors to be propagated to the caller.
-applicationEventProcessor.process();
+LinkedList events = new LinkedList<>();
+applicationEventQueue.drainTo(events);
+
+for (ApplicationEvent event : events) {
+try {
+if (event instanceof CompletableApplicationEvent)
+
applicationEventReaper.add((CompletableApplicationEvent) event);
+
+applicationEventProcessor.process(event);
+} catch (Throwable t) {
+log.warn("Error processing event {}", t.getMessage(), t);
+}
+}

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-25 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1579880592


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -260,6 +279,7 @@ private ApiError incrementalAlterConfigResource(
 if (error.isFailure()) {
 return error;
 }
+maybeTriggerPartitionUpdateOnMinIsrChange(newRecords);

Review Comment:
   we need to support `legacyAlterConfigResource` also.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-25 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1579872828


##
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##
@@ -159,4 +160,14 @@ BrokersToIsrs.PartitionsOnReplicaIterator 
partitionsWithBrokerInElr(int brokerId
 }
 return new BrokersToIsrs.PartitionsOnReplicaIterator(topicMap, false);
 }
+
+BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithElr() {
+Map topicMap = new HashMap<>();
+for (Map map : elrMembers.values()) {
+if (map != null) {

Review Comment:
   when would this be null? is there a particular reason we chose to use a null 
array instead of an empty array?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1579868091


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -228,13 +228,16 @@ private void process(final ErrorEvent event) {
 }
 
 private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
-ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
+ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
 rebalanceListenerInvoker,
 event.methodName(),
 event.partitions(),
 event.future()
 );
 applicationEventHandler.add(invokedEvent);
+if (invokedEvent.error().isPresent()) {
+throw invokedEvent.error().get();

Review Comment:
   Sorry for the delay!
   
   Yes, I'm in agreement with the perspectives you and @lianetm stated. No 
qualms from me 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579865601


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java:
##
@@ -16,120 +16,31 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.internals.IdempotentCloser;
-import org.apache.kafka.common.utils.LogContext;
-import org.slf4j.Logger;
-
-import java.io.Closeable;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 
 /**
- * An {@link EventProcessor} is the means by which events produced by 
thread A are
- * processed by thread B. By definition, threads A 
and B run in parallel to
- * each other, so a mechanism is needed with which to receive and process the 
events from the other thread. That
- * communication channel is formed around {@link BlockingQueue a shared queue} 
into which thread A
- * enqueues events and thread B reads and processes those events.
+ * An {@code EventProcessor} is the means by which events are 
processed, the meaning of which is left
+ * intentionally loose. This is in large part to keep the {@code 
EventProcessor} focused on what it means to process
+ * the events, and not linking itself too closely with the rest of 
the surrounding application.
+ *
+ * 
+ *
+ * The {@code EventProcessor} is envisaged as a stateless service that acts as 
a conduit, receiving an event and
+ * dispatching to another block of code to process. The semantic meaning of 
each event is different, so the
+ * {@code EventProcessor} will need to interact with other parts of the system 
that maintain state. The
+ * implementation should not be concerned with the mechanism by which an event 
arrived for processing. While the
+ * events are shuffled around the consumer subsystem by means of {@link 
BlockingQueue shared queues}, it should
+ * be considered an anti-pattern to need to know how it arrived or what 
happens after its is processed.
  */
-public abstract class EventProcessor implements Closeable {
-
-private final Logger log;
-private final BlockingQueue eventQueue;
-private final IdempotentCloser closer;
-
-protected EventProcessor(final LogContext logContext, final 
BlockingQueue eventQueue) {
-this.log = logContext.logger(EventProcessor.class);
-this.eventQueue = eventQueue;
-this.closer = new IdempotentCloser();
-}
-
-public abstract boolean process();
-
-protected abstract void process(T event);
-
-@Override
-public void close() {
-closer.close(this::closeInternal, () -> log.warn("The event processor 
was already closed"));
-}
-
-protected interface ProcessHandler {
-
-void onProcess(T event, Optional error);
-}
+public interface EventProcessor extends AutoCloseable {
 
 /**
- * Drains all available events from the queue, and then processes them in 
order. If any errors are thrown while
- * processing the individual events, these are submitted to the given 
{@link ProcessHandler}.
+ * Process an event that is received.
  */
-protected boolean process(ProcessHandler processHandler) {
-closer.assertOpen("The processor was previously closed, so no further 
processing can occur");
-
-List events = drain();
-
-if (events.isEmpty()) {
-log.trace("No events to process");
-return false;
-}
+void process(T event);
 
-try {
-log.trace("Starting processing of {} event{}", events.size(), 
events.size() == 1 ? "" : "s");
-
-for (T event : events) {
-try {
-Objects.requireNonNull(event, "Attempted to process a null 
event");
-log.trace("Processing event: {}", event);
-process(event);
-processHandler.onProcess(event, Optional.empty());
-} catch (Throwable t) {
-KafkaException error = 
ConsumerUtils.maybeWrapAsKafkaException(t);
-processHandler.onProcess(event, Optional.of(error));
-}
-}
-} finally {
-log.trace("Completed processing");
-}
-
-return true;
-}
-
-/**
- * It is possible for the consumer to close before complete processing all 
the events in the queue. In
- * this case, we need to throw an exception to notify the user the 
consumer is closed.
- */
-private void closeInternal() {
-log.trace("Closing event processor");
-List incompleteEvents = drain();
-
-if (incompleteEvents.isEmpty())
-return;
-
-KafkaException exception = new KafkaException("The consumer is 
closed");
-
-// Check 

Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579864626


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java:
##
@@ -36,13 +36,14 @@ protected CompletableApplicationEvent(final Type type, 
final Timer timer) {
 super(type);
 this.future = new CompletableFuture<>();
 Objects.requireNonNull(timer);
-this.deadlineMs = timer.remainingMs() + timer.currentTimeMs();
-}
 
-protected CompletableApplicationEvent(final Type type, final long 
deadlineMs) {
-super(type);
-this.future = new CompletableFuture<>();
-this.deadlineMs = deadlineMs;
+long currentTimeMs = timer.currentTimeMs();
+long remainingMs = timer.remainingMs();
+
+if (currentTimeMs > Long.MAX_VALUE - remainingMs)
+this.deadlineMs = Long.MAX_VALUE;
+else
+this.deadlineMs = currentTimeMs + remainingMs;

Review Comment:
   `CompleteableEvent` is an interface, but I could see if I can put a static 
method in there to keep the logic in one place.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java:
##
@@ -36,13 +36,14 @@ protected CompletableApplicationEvent(final Type type, 
final Timer timer) {
 super(type);
 this.future = new CompletableFuture<>();
 Objects.requireNonNull(timer);
-this.deadlineMs = timer.remainingMs() + timer.currentTimeMs();
-}
 
-protected CompletableApplicationEvent(final Type type, final long 
deadlineMs) {
-super(type);
-this.future = new CompletableFuture<>();
-this.deadlineMs = deadlineMs;
+long currentTimeMs = timer.currentTimeMs();
+long remainingMs = timer.remainingMs();
+
+if (currentTimeMs > Long.MAX_VALUE - remainingMs)
+this.deadlineMs = Long.MAX_VALUE;
+else
+this.deadlineMs = currentTimeMs + remainingMs;

Review Comment:
   `CompletableEvent` is an interface, but I could see if I can put a static 
method in there to keep the logic in one place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] cherrypick KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]

2024-04-25 Thread via GitHub


jolshan merged PR #15755:
URL: https://github.com/apache/kafka/pull/15755


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16386) NETWORK_EXCEPTIONs from transaction verification are not translated

2024-04-25 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-16386:
---
Fix Version/s: 3.7.1

> NETWORK_EXCEPTIONs from transaction verification are not translated
> ---
>
> Key: KAFKA-16386
> URL: https://issues.apache.org/jira/browse/KAFKA-16386
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0, 3.7.0
>Reporter: Sean Quah
>Priority: Minor
> Fix For: 3.8.0, 3.7.1
>
>
> KAFKA-14402 
> ([KIP-890|https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense])
>  adds verification with the transaction coordinator on Produce and 
> TxnOffsetCommit paths as a defense against hanging transactions. For 
> compatibility with older clients, retriable errors from the verification step 
> are translated to ones already expected and handled by existing clients. When 
> verification was added, we forgot to translate {{NETWORK_EXCEPTION}} s.
> [~dajac] noticed this manifesting as a test failure when 
> tests/kafkatest/tests/core/transactions_test.py was run with an older client 
> (prior to the fix for KAFKA-16122):
> {quote}
> {{NETWORK_EXCEPTION}} is indeed returned as a partition error. The 
> {{TransactionManager.TxnOffsetCommitHandler}} considers it as a fatal error 
> so it transitions to the fatal state.
> It seems that there are two cases where the server could return it: (1) When 
> the verification request times out or its connections is cut; or (2) in 
> {{AddPartitionsToTxnManager.addTxnData}} where we say that we use it because 
> we want a retriable error.
> {quote}
> The first case was triggered as part of the test. The second case happens 
> when there is already a verification request ({{AddPartitionsToTxn}}) in 
> flight with the same epoch and we want clients to try again when we're not 
> busy.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15723:
URL: https://github.com/apache/kafka/pull/15723#discussion_r1579862658


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestStateTest.java:
##
@@ -48,4 +50,40 @@ public void testRequestStateSimple() {
 state.reset();
 assertTrue(state.canSendRequest(200));
 }
+
+@Test
+public void testTrackInflightOnSuccessfulAttempt() {
+testTrackInflight(RequestState::onSuccessfulAttempt);
+}
+
+@Test
+public void testTrackInflightOnFailedAttempt() {
+testTrackInflight(RequestState::onFailedAttempt);
+}
+
+private void testTrackInflight(BiConsumer 
onCompletedAttempt) {
+RequestState state = new RequestState(
+new LogContext(),
+this.getClass().getSimpleName(),
+100,
+2,
+1000,
+0);
+
+// This is just being paranoid...
+assertFalse(state.requestInFlight());
+
+// When we've sent a request, the flag should update from false to 
true.
+state.onSendAttempt();
+assertTrue(state.requestInFlight());
+
+// Now we've received the response.
+onCompletedAttempt.accept(state, 236);
+
+// When we've sent a second request with THE SAME TIMESTAMP as the 
previous response,

Review Comment:
   I added back the timestamp so we could use it in the `lastSentMs` value for 
debugging. So the comment should make sense again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]

2024-04-25 Thread via GitHub


johnnychhsu commented on PR #15788:
URL: https://github.com/apache/kafka/pull/15788#issuecomment-208202

   thanks for the prompt reply @OmniaGM !
   just updated to resolve the conflict.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16623) KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned

2024-04-25 Thread Kirk True (Jira)
Kirk True created KAFKA-16623:
-

 Summary: KafkaAsyncConsumer system tests warn about revoking 
partitions that weren't previously assigned
 Key: KAFKA-16623
 URL: https://issues.apache.org/jira/browse/KAFKA-16623
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, system tests
Affects Versions: 3.8.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


When running system tests for the KafkaAsyncConsumer, we occasionally see this 
warning:

{noformat}
  File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
self.run()
  File "/usr/lib/python3.7/threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
 line 38, in _protected_worker
self._worker(idx, node)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
 line 304, in _worker
handler.handle_partitions_revoked(event, node, self.logger)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
 line 163, in handle_partitions_revoked
(tp, node.account.hostname)
AssertionError: Topic partition TopicPartition(topic='test_topic', partition=0) 
cannot be revoked from worker20 as it was not previously assigned to that 
consumer
{noformat}

It is unclear what is causing this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Clean up TestUtils.scala [kafka]

2024-04-25 Thread via GitHub


m1a2st commented on PR #15808:
URL: https://github.com/apache/kafka/pull/15808#issuecomment-2077728033

   @chia7712 please review this PR, thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14585: Move StorageTool to tools [kafka]

2024-04-25 Thread via GitHub


mimaison commented on code in PR #14847:
URL: https://github.com/apache/kafka/pull/14847#discussion_r1579785296


##
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java:
##
@@ -47,7 +47,7 @@ public final class MetaProperties {
 /**
  * The property that specifies the node id. Replaces broker.id in V1.
  */
-static final String NODE_ID_PROP = "node.id";
+public static final String NODE_ID_PROP = "node.id";

Review Comment:
   This is really not nice that we have to do that. I wonder if we need to wait 
for the `NodeIdProp` from KafkaConfig to move instead of doing that.



##
tools/src/test/java/org/apache/kafka/tools/StorageToolTest.java:
##
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.DirectoryId;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.properties.MetaProperties;
+import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
+import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
+import org.apache.kafka.metadata.properties.PropertiesUtils;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.config.ServerLogConfigs;
+import org.apache.kafka.storage.internals.log.LogConfig;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.test.TestUtils.tempFile;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(MockitoExtension.class)
+@Timeout(value = 40)
+public class StorageToolTest {
+private Properties newSelfManagedProperties() {
+Properties properties = new Properties();
+properties.setProperty(ServerLogConfigs.LOG_DIRS_CONFIG, 
"/tmp/foo,/tmp/bar");
+properties.setProperty(StorageTool.PROCESS_ROLES_CONFIG, "controller");
+properties.setProperty(MetaProperties.NODE_ID_PROP, "2");
+properties.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, 
"2@localhost:9092");
+return properties;
+}
+
+@Test
+public void testConfigToLogDirectories() {
+LogConfig config = new LogConfig(newSelfManagedProperties());
+assertEquals(new ArrayList<>(Arrays.asList("/tmp/bar", "/tmp/foo")), 
StorageTool.configToLogDirectories(config));
+}
+
+@Test
+public void testConfigToLogDirectoriesWithMetaLogDir() {
+Properties properties = newSelfManagedProperties();
+properties.setProperty(StorageTool.METADATA_LOG_DIR_CONFIG, 
"/tmp/baz");
+LogConfig config = new LogConfig(properties);
+assertEquals(new ArrayList<>(Arrays.asList("/tmp/bar", "/tmp/baz", 
"/tmp/foo")), StorageTool.configToLogDirectories(config));
+}
+
+@Test
+public void 

Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on PR #15750:
URL: https://github.com/apache/kafka/pull/15750#issuecomment-2077701719

   What about `toStringDetails()`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Clean up TestUtils.scala [kafka]

2024-04-25 Thread via GitHub


m1a2st opened a new pull request, #15808:
URL: https://github.com/apache/kafka/pull/15808

   Because TestUtils.scala has some unused method, and some methods only use in 
one class, so I delete unused methods and move methods to individual class 
which only one class used


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Various cleanups in generator [kafka]

2024-04-25 Thread via GitHub


mimaison opened a new pull request, #15807:
URL: https://github.com/apache/kafka/pull/15807

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Various cleanups in metadata [kafka]

2024-04-25 Thread via GitHub


mimaison opened a new pull request, #15806:
URL: https://github.com/apache/kafka/pull/15806

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579760563


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1338,7 +1339,14 @@ private CompletableFuture 
enqueueConsumerRebalanceListenerCallback(Consume
  
Set partitions) {
 SortedSet sortedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
 sortedPartitions.addAll(partitions);
-CompletableBackgroundEvent event = new 
ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
+
+// We don't yet have the concept of having an expiring callback, but 
we will likely want that eventually.
+Timer timer = time.timer(Long.MAX_VALUE);
+CompletableBackgroundEvent event = new 
ConsumerRebalanceListenerCallbackNeededEvent(
+methodName,
+sortedPartitions,
+timer
+);

Review Comment:
   I'll look into how to do this in a way that I don't find too ugly  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579756071


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -144,6 +166,12 @@ void runOnce() {
 .map(Optional::get)
 .map(rm -> rm.maximumTimeToWait(currentTimeMs))
 .reduce(Long.MAX_VALUE, Math::min);
+
+// "Complete" any events that have expired. This cleanup step should 
only be called after the network I/O
+// thread has made at least one call to poll. This is done to emulate 
the behavior of the legacy consumer's
+// handling of timeouts. The legacy consumer makes at least one 
attempt to satisfy any network requests
+// before checking if a timeout has expired.

Review Comment:
   I'm happy to reword the comment and clean it up, but the lines that follow 
that comment are the raison d'être of this change. It's very subtle and easy to 
miss, hence the call-out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on PR #15750:
URL: https://github.com/apache/kafka/pull/15750#issuecomment-2077634416

   Thanks for the work on this @phooq!
   
   > I plan to, on top the current changes I have, rename the `toStringBase` 
method as `getDetails`
   
   Renaming `toStringBase()` to `getDetails()` makes its purpose more vague, in 
my opinion 
   
   Keep in mind that the naming convention `toStringBase()` is used in 
`ApplicationEvent`, `BackgroundEvent`, and maybe elsewhere, too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579735209


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1270,6 +1230,20 @@ private void close(Duration timeout, boolean 
swallowException) {
 if (applicationEventHandler != null)
 closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed shutting down network thread", firstException);
 closeTimer.update();
+
+if (backgroundEventReaper != null && backgroundEventQueue != null) {

Review Comment:
   They're only `null` if there was an error in the constructor. The 
constructor's `finally` block calls `close()`, so we need to handle the case 
where the consumer wasn't fully constructed before it's closed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579733831


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -769,7 +730,8 @@ public void commitAsync(OffsetCommitCallback callback) {
 public void commitAsync(Map offsets, 
OffsetCommitCallback callback) {
 acquireAndEnsureOpen();
 try {
-AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);
+Timer timer = time.timer(Long.MAX_VALUE);
+AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets, 
timer);

Review Comment:
   Yeah, I went back and forth on this a few times 
   
   Ultimately I wanted to force the caller to be explicit about its timeout 
intention, vs. having it implicitly "hidden" away in the event hierarchy.
   
   Also, to create a `Timer` in the event constructor, we'd have to pass in a 
`Time` object (`time.timer(Long.MAX_VALUE)`), which seemed a bit obtuse, so 
路‍♂️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15723:
URL: https://github.com/apache/kafka/pull/15723#discussion_r1579725251


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java:
##
@@ -98,12 +93,11 @@ public boolean canSendRequest(final long currentTimeMs) {
  * is a request in-flight.
  */
 public boolean requestInFlight() {
-return this.lastSentMs > -1 && this.lastReceivedMs < this.lastSentMs;
+return requestInFlight;

Review Comment:
   @philipnee—I didn't make any changes to the name as `requestInFlight` was 
the existing method name. Are you OK to leave this for now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15737:
URL: https://github.com/apache/kafka/pull/15737#discussion_r1579722758


##
tests/kafkatest/services/verifiable_consumer.py:
##
@@ -140,22 +150,31 @@ class 
IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler):
 def __init__(self, node, verify_offsets, idx):
 super().__init__(node, verify_offsets, idx)
 
-def handle_partitions_revoked(self, event):
+def handle_partitions_revoked(self, event, node, logger):
 self.revoked_count += 1
 self.state = ConsumerState.Rebalancing
 self.position = {}
+revoked = []
+
 for topic_partition in event["partitions"]:
-topic = topic_partition["topic"]
-partition = topic_partition["partition"]
-self.assignment.remove(TopicPartition(topic, partition))
+tp = _create_partition_from_dict(topic_partition)
+assert tp in self.assignment, \
+"Topic partition %s cannot be revoked from %s as it was not 
previously assigned to that consumer" % \
+(tp, node.account.hostname)

Review Comment:
   @lucasbru—this is the main functional change: ensure that an attempt to 
remove a partition from the local state verifies that it was previously 
assigned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]

2024-04-25 Thread via GitHub


phooq commented on PR #15750:
URL: https://github.com/apache/kafka/pull/15750#issuecomment-2077592945

   Thanks so @lianetm ! Hey @kirktrue , I plan to, on top the current changes I 
have,  rename the `toStringBase` method as `getDetails` for `RequestState`, 
does this look okay to you?
   
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: refine DeleteOffsetsConsumerGroupCommandIntegrationTest#produceRecord [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15802:
URL: https://github.com/apache/kafka/pull/15802#discussion_r1579708062


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -169,20 +201,22 @@ private static ConsumerGroupCommand.ConsumerGroupService 
consumerGroupService(St
 );
 }
 
-private void testWithConsumerGroup(String inputTopic,
+private void testWithConsumerGroup(String inputTopicWithData,
+   String inputTopicForTest,
+   String inputGroup,
int inputPartition,
int expectedPartition,
Errors expectedError,
boolean isStable,
Map consumerConfig) {
-produceRecord();
+produceRecord(inputTopicWithData);

Review Comment:
   We can move `produceRecord();` out of `testWithConsumerGroup` to simplify 
the arguments of `testWithConsumerGroup`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once

2024-04-25 Thread Edoardo Comar (Jira)
Edoardo Comar created KAFKA-16622:
-

 Summary: Mirromaker2 first Checkpoint not emitted until consumer 
group fully catches up once
 Key: KAFKA-16622
 URL: https://issues.apache.org/jira/browse/KAFKA-16622
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 3.6.2, 3.7.0, 3.8.0
Reporter: Edoardo Comar
 Attachments: edo-connect-mirror-maker-sourcetarget.properties

We observed an excessively delayed emission of the MM2 Checkpoint record.
It only gets created when the source consumer reaches the end of a topic. This 
does not seem reasonable.

In a very simple setup :

Tested with a standalone single process MirrorMaker2 mirroring between two 
single-node kafka clusters(mirromaker config attached) with quick refresh 
intervals (eg 5 sec) and a small offset.lag.max (eg 10)

create a single topic in the source cluster
produce data to it (e.g. 1 records)
start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec between 
polls which commits after each poll

watch the Checkpoint topic in the target cluster

bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \
  --topic source.checkpoints.internal \
  --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \
   --from-beginning

-> no record appears in the checkpoint topic until the consumer reaches the end 
of the topic (ie its consumer group lag gets down to 0).







--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: refine DeleteOffsetsConsumerGroupCommandIntegrationTest#produceRecord [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15802:
URL: https://github.com/apache/kafka/pull/15802#discussion_r1579702459


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -91,65 +91,97 @@ public void testDeleteOffsetsNonExistingGroup() {
 
 @ClusterTest
 public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() {
-for (Map consumerConfig : consumerConfigs) {
-createTopic(TOPIC);
-testWithConsumerGroup(TOPIC, 0, 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerConfig);
-removeTopic(TOPIC);
+int idx = 0;
+for (Iterator> it = consumerConfigs.iterator(); 
it.hasNext(); idx++) {
+Map consumerConfig = it.next();
+String topic = TOPIC_PREFIX + idx;
+String group = GROUP_PREFIX + idx;
+createTopic(topic);
+testWithConsumerGroup(topic, topic, group, 0, 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerConfig);
+removeTopic(topic);
 }
 }
 
 @ClusterTest
 public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly() {
-for (Map consumerConfig : consumerConfigs) {
-createTopic(TOPIC);
-testWithConsumerGroup(TOPIC, -1, 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerConfig);
-removeTopic(TOPIC);
+int idx = 0;
+for (Iterator> it = consumerConfigs.iterator(); 
it.hasNext(); idx++) {
+Map consumerConfig = it.next();
+String topic = TOPIC_PREFIX + idx;

Review Comment:
   Maybe we can use test case + protocol name to be unique name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >