Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]

2024-04-19 Thread via GitHub


omkreddy commented on code in PR #15656:
URL: https://github.com/apache/kafka/pull/15656#discussion_r1573153281


##
server/src/main/java/org/apache/kafka/server/config/KafkaSecurityConfigs.java:
##
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslClientAuth;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
+
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * Common home for broker-side security configs which need to be accessible 
from the libraries shared
+ * between the broker and the multiple modules in Kafka.
+ *
+ * Note this is an internal API and subject to change without notice.
+ */
+public class KafkaSecurityConfigs {
+
+/** * SSL Configuration /
+public final static String SSL_PROTOCOL_CONFIG = 
SslConfigs.SSL_PROTOCOL_CONFIG;
+public final static String SSL_PROTOCOL_DOC = SslConfigs.SSL_PROTOCOL_DOC;
+public static final String SSL_PROTOCOL_DEFAULT = 
SslConfigs.DEFAULT_SSL_PROTOCOL;
+
+public final static String SSL_PROVIDER_CONFIG = 
SslConfigs.SSL_PROVIDER_CONFIG;
+public final static String SSL_PROVIDER_DOC = SslConfigs.SSL_PROVIDER_DOC;
+
+public final static String SSL_CIPHER_SUITES_CONFIG = 
SslConfigs.SSL_CIPHER_SUITES_CONFIG;
+public final static String SSL_CIPHER_SUITES_DOC = 
SslConfigs.SSL_CIPHER_SUITES_DOC;
+
+public final static String SSL_ENABLED_PROTOCOLS_CONFIG = 
SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG;
+public final static String SSL_ENABLED_PROTOCOLS_DOC = 
SslConfigs.SSL_ENABLED_PROTOCOLS_DOC;
+public static final String SSL_ENABLED_PROTOCOLS_DEFAULTS = 
SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS;
+
+public final static String SSL_KEYSTORE_TYPE_CONFIG = 
SslConfigs.SSL_KEYSTORE_TYPE_CONFIG;
+public final static String SSL_KEYSTORE_TYPE_DOC = 
SslConfigs.SSL_KEYSTORE_TYPE_DOC;
+public static final String SSL_KEYSTORE_TYPE_DEFAULT = 
SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE;
+
+public final static String SSL_KEYSTORE_LOCATION_CONFIG = 
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
+public final static String SSL_KEYSTORE_LOCATION_DOC = 
SslConfigs.SSL_KEYSTORE_LOCATION_DOC;
+
+public final static String SSL_KEYSTORE_PASSWORD_CONFIG = 
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
+public final static String SSL_KEYSTORE_PASSWORD_DOC = 
SslConfigs.SSL_KEYSTORE_PASSWORD_DOC;
+
+public final static String SSL_KEY_PASSWORD_CONFIG = 
SslConfigs.SSL_KEY_PASSWORD_CONFIG;
+public final static String SSL_KEY_PASSWORD_DOC = 
SslConfigs.SSL_KEY_PASSWORD_DOC;
+
+public final static String SSL_KEYSTORE_KEY_CONFIG = 
SslConfigs.SSL_KEYSTORE_KEY_CONFIG;
+public final static String SSL_KEYSTORE_KEY_DOC = 
SslConfigs.SSL_KEYSTORE_KEY_DOC;
+
+public final static String SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG = 
SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG;
+public final static String SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC = 
SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC;
+public final static String SSL_TRUSTSTORE_TYPE_CONFIG = 
SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG;
+public final static String SSL_TRUSTSTORE_TYPE_DOC = 
SslConfigs.SSL_TRUSTSTORE_TYPE_DOC;
+public static final String SSL_TRUSTSTORE_TYPE_DEFAULT = 
SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE;
+
+public final static String SSL_TRUSTSTORE_LOCATION_CONFIG = 
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
+public final static String SSL_TRUSTSTORE_PASSWORD_DOC = 
SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC;
+
+public final static String SSL_TRUSTSTORE_PASSWORD_CONFIG = 
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
+public final static String SSL_TRUSTSTORE_LOCATION_DOC = 
SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC;
+
+public final static String SSL_TRUSTSTORE_CERTIFICATES_CONFIG = 

Re: [PR] KAFKA-16211: Inconsistent config values in CreateTopicsResult and DescribeConfigsResult [kafka]

2024-04-19 Thread via GitHub


infantlikesprogramming commented on PR #15696:
URL: https://github.com/apache/kafka/pull/15696#issuecomment-2067538939

   @chia7712 Thanks for the reply. I have tried the following code and received 
the results. Each time I run the code, the `DescribeTopicsResult` gives a 
different configurations of the brokers in my cluster. 
   
![image](https://github.com/apache/kafka/assets/60119105/04bb4ae3-98f6-4540-8b18-652ad67f00b7)
   
![image](https://github.com/apache/kafka/assets/60119105/00a7f8c0-ddd2-4098-8a14-03a6173a8cf3)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16588) broker shutdown hangs when `log.segment.delete.delay.ms` is zero

2024-04-19 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839170#comment-17839170
 ] 

PoAn Yang commented on KAFKA-16588:
---

Hi [~chia7712], I'm interested in this. May I assign to myself? Thank you.

> broker shutdown hangs when `log.segment.delete.delay.ms` is zero 
> -
>
> Key: KAFKA-16588
> URL: https://issues.apache.org/jira/browse/KAFKA-16588
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> see 
> [https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154]
> If `log.segment.delete.delay.ms` is zero, We call `take` even though the 
> `logsToBeDeleted` is empty, and `KafkaScheduler#shutdown` call `shutdown` 
> rather than `shudownNow` 
> ([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)]
> Hence, the thread won't be completed forever, and it blocks the shutdown of 
> broker.
> We should replace the `take` by `poll` since we have checked the element 
> before.
> BTW, the zero is allowed 
> ([https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java#L258])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15585) DescribeTopic API

2024-04-19 Thread Calvin Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Calvin Liu resolved KAFKA-15585.

Resolution: Fixed

> DescribeTopic API
> -
>
> Key: KAFKA-15585
> URL: https://issues.apache.org/jira/browse/KAFKA-15585
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Calvin Liu
>Assignee: Calvin Liu
>Priority: Major
>
> Adding the new DescribeTopic API + the admin client and server-side handling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-19 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1573016246


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+partitionAssignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) {
+simulateIncrementalRebalance(topicMetadata);
+}
+}
+
+private Map createTopicMetadata() {
+Map topicMetadata = new 

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-19 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1573016609


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+partitionAssignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) {
+simulateIncrementalRebalance(topicMetadata);
+}
+}
+
+private Map createTopicMetadata() {
+Map topicMetadata = new 

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-19 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1573016246


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+partitionAssignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) {
+simulateIncrementalRebalance(topicMetadata);
+}
+}
+
+private Map createTopicMetadata() {
+Map topicMetadata = new 

Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-19 Thread via GitHub


jeqo commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1572995864


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/MultiFieldPaths.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Multiple field paths to access data objects ({@code Struct} or {@code Map}) 
efficiently,
+ * instead of multiple individual {@link SingleFieldPath single-field paths}.
+ *
+ * If the SMT requires accessing a single field on the same data object,
+ * use {@link SingleFieldPath} instead.
+ *
+ * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821
+ * @see SingleFieldPath
+ * @see FieldSyntaxVersion
+ */
+public class MultiFieldPaths {
+final Trie trie = new Trie();
+
+MultiFieldPaths(Set paths) {
+paths.forEach(trie::insert);
+}
+
+public static MultiFieldPaths of(List fields, FieldSyntaxVersion 
syntaxVersion) {
+return new MultiFieldPaths(fields.stream()
+.map(f -> new SingleFieldPath(f, syntaxVersion))
+.collect(Collectors.toSet()));
+}
+
+/**
+ * Find values at the field paths
+ *
+ * @param struct data value
+ * @return map of field paths and field/values
+ */
+public Map> 
fieldAndValuesFrom(Struct struct) {
+if (trie.isEmpty()) return Collections.emptyMap();
+return findFieldAndValues(struct, trie.root, new HashMap<>());
+}
+
+private Map> findFieldAndValues(
+Struct originalValue,
+TrieNode trieAt,
+Map> fieldAndValueMap
+) {
+for (Map.Entry step : trieAt.steps().entrySet()) {
+Field field = originalValue.schema().field(step.getKey());
+if (step.getValue().isLeaf()) {
+Map.Entry fieldAndValue =
+field != null
+? new AbstractMap.SimpleImmutableEntry<>(field, 
originalValue.get(field))
+: null;
+fieldAndValueMap.put(step.getValue().path, fieldAndValue);
+} else {
+if (field.schema().type() == Type.STRUCT) {
+findFieldAndValues(
+originalValue.getStruct(field.name()),
+step.getValue(),
+fieldAndValueMap
+);
+}
+}
+}
+return fieldAndValueMap;
+}
+
+/**
+ * Find values at the field paths
+ *
+ * @param value data value
+ * @return map of field paths and field/values
+ */
+public Map> 
fieldAndValuesFrom(Map value) {
+if (trie.isEmpty()) return Collections.emptyMap();
+return findFieldAndValues(value, trie.root, new HashMap<>());
+}
+
+@SuppressWarnings("unchecked")
+private Map> findFieldAndValues(
+Map value,
+TrieNode trieAt,
+Map> fieldAndValueMap
+) {
+for (Map.Entry step : trieAt.steps().entrySet()) {
+Object fieldValue = value.get(step.getKey());
+if (step.getValue().isLeaf()) {
+fieldAndValueMap.put(
+step.getValue().path,
+new AbstractMap.SimpleImmutableEntry<>(step.getKey(), 
fieldValue)
+);
+} else {
+if (fieldValue instanceof Map) {
+findFieldAndValues(
+(Map) fieldValue,
+step.getValue(),
+fieldAndValueMap
+  

Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]

2024-04-19 Thread via GitHub


emitskevich-blp commented on PR #15722:
URL: https://github.com/apache/kafka/pull/15722#issuecomment-2067299812

   > we should verify the non-deprecated metrics should have correct doc which 
is not marked as "deprecated". Also, that is what you try to fix, right?
   
   Correct, this is the goal, yes. I got wrong what you mean by "doc". Added 
test to validate metric descriptions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16589) Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close

2024-04-19 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16589:
--

 Summary: Consider removing `ClusterInstance#createAdminClient` 
since callers are not sure whether they need to call close
 Key: KAFKA-16589
 URL: https://issues.apache.org/jira/browse/KAFKA-16589
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


Sometimes we close the admin created by `createAdminClient`, and sometimes we 
don't. That is not a true problem since the `ClusterInstance` will call `close` 
when stopping.

However, that cause a lot of inconsistent code, and in fact it does not save 
much time since creating a Admin is not a hard work. We can get 
`bootstrapServers` and `bootstrapControllers` from `ClusterInstance` easily.

 
{code:java}
// before
try (Admin admin = cluster.createAdminClient()) { }

// after v0
try (Admin admin = Admin.create(Collections.singletonMap(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers( {}

{code}
Personally, the `after` version is not verbose, but we can have alternatives: 
`Map clientConfigs`.

 
{code:java}
// after v1
try (Admin admin = Admin.create(cluster.clientConfigs())) {}{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: example.com moved [kafka]

2024-04-19 Thread via GitHub


akatona84 commented on code in PR #15758:
URL: https://github.com/apache/kafka/pull/15758#discussion_r1572917082


##
clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java:
##
@@ -61,7 +61,7 @@ public void testParseAndValidateAddressesWithReverseLookup() {
 assertTrue(validatedAddresses.size() >= 1, "Unexpected addresses " + 
validatedAddresses);
 List validatedHostNames = 
validatedAddresses.stream().map(InetSocketAddress::getHostName)
 .collect(Collectors.toList());
-List expectedHostNames = asList("93.184.216.34", 
"2606:2800:220:1:248:1893:25c8:1946");
+List expectedHostNames = asList("93.184.215.14", 
"2606:2800:220:1:248:1893:25c8:1946");

Review Comment:
   Yeah, it's changing back n forth. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-19 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1572912365


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -16,7 +16,15 @@
  */
 package org.apache.kafka.tools.consumer.group;
 
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
 import kafka.utils.TestUtils;

Review Comment:
   Could you remove the usage of `TestUtils`? the method 
`subscribeAndWaitForRecords` can be rewrite easily as it seems to poll records 
only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16082) Broker recreates reassigned partition after logdir failure

2024-04-19 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839107#comment-17839107
 ] 

Chia-Ping Tsai commented on KAFKA-16082:


[~gnarula] Could you file PR for branch 3.7?

> Broker recreates reassigned partition after logdir failure
> --
>
> Key: KAFKA-16082
> URL: https://issues.apache.org/jira/browse/KAFKA-16082
> Project: Kafka
>  Issue Type: Sub-task
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Proven Provenzano
>Assignee: Gaurav Narula
>Priority: Critical
> Fix For: 3.8.0, 3.7.1
>
>
> There is a possible dataloss scenario
> when using JBOD,
> when moving the partition leader log from one directory to another on the 
> same broker,
> when after the destination log has caught up to the source log and after the 
> broker has sent an update to the partition assignment
> if the broker accepts and commits a new record for the partition and then the 
> broker restarts and the original partition leader log is lost
> then the destination log would not contain the new record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-19 Thread via GitHub


chia7712 merged PR #15136:
URL: https://github.com/apache/kafka/pull/15136


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-19 Thread via GitHub


chia7712 commented on PR #15136:
URL: https://github.com/apache/kafka/pull/15136#issuecomment-2067228882

   the failed `testParseAndValidateAddressesWithReverseLookup` is traced by 
#15758. will merge it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-19 Thread via GitHub


chia7712 merged PR #15569:
URL: https://github.com/apache/kafka/pull/15569


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]

2024-04-19 Thread via GitHub


chia7712 commented on PR #15719:
URL: https://github.com/apache/kafka/pull/15719#issuecomment-2067224991

   @brandboat please fix the conflicts, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-19 Thread via GitHub


chia7712 commented on PR #15569:
URL: https://github.com/apache/kafka/pull/15569#issuecomment-2067222576

   The failed test `testParseAndValidateAddressesWithReverseLookup` will get 
fixed by #15758. I will merge this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: example.com moved [kafka]

2024-04-19 Thread via GitHub


chia7712 commented on code in PR #15758:
URL: https://github.com/apache/kafka/pull/15758#discussion_r1572897153


##
clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java:
##
@@ -61,7 +61,7 @@ public void testParseAndValidateAddressesWithReverseLookup() {
 assertTrue(validatedAddresses.size() >= 1, "Unexpected addresses " + 
validatedAddresses);
 List validatedHostNames = 
validatedAddresses.stream().map(InetSocketAddress::getHostName)
 .collect(Collectors.toList());
-List expectedHostNames = asList("93.184.216.34", 
"2606:2800:220:1:248:1893:25c8:1946");
+List expectedHostNames = asList("93.184.215.14", 
"2606:2800:220:1:248:1893:25c8:1946");

Review Comment:
   That is weird. What I see on my local is `93.184.215.14, 
2606:2800:21f:cb07:6820:80da:af6b:8b2c`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


mumrah commented on PR #15744:
URL: https://github.com/apache/kafka/pull/15744#issuecomment-2067204988

   Updated to include a CheckOp on the `/controller` ZNode. We don't both using 
the controller epoch since it is not straightforward to consistently read the 
controller and controller epoch from a non-controller broker. We can achieve 
the same fencing by checking that the `/controller` ZNode doesn't change during 
the method.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16507 Add raw record into RecordDeserialisationException [kafka]

2024-04-19 Thread via GitHub


AndrewJSchofield commented on code in PR #15691:
URL: https://github.com/apache/kafka/pull/15691#discussion_r1572845716


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##
@@ -311,25 +312,33 @@  ConsumerRecord parseRecord(Deserializers deserializers,
 Optional leaderEpoch,
 TimestampType timestampType,
 Record record) {
+long offset = record.offset();
+long timestamp = record.timestamp();
+ByteBuffer keyBytes = record.key();
+ByteBuffer valueBytes = record.value();
+Headers headers = new RecordHeaders(record.headers());
+K key;
+V value;
 try {
-long offset = record.offset();
-long timestamp = record.timestamp();
-Headers headers = new RecordHeaders(record.headers());
-ByteBuffer keyBytes = record.key();
-K key = keyBytes == null ? null : 
deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
-ByteBuffer valueBytes = record.value();
-V value = valueBytes == null ? null : 
deserializers.valueDeserializer.deserialize(partition.topic(), headers, 
valueBytes);
-return new ConsumerRecord<>(partition.topic(), 
partition.partition(), offset,
-timestamp, timestampType,
-keyBytes == null ? ConsumerRecord.NULL_SIZE : 
keyBytes.remaining(),
-valueBytes == null ? ConsumerRecord.NULL_SIZE : 
valueBytes.remaining(),
-key, value, headers, leaderEpoch);
+key = keyBytes == null ? null : 
deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
 } catch (RuntimeException e) {
-log.error("Deserializers with error: {}", deserializers);
-throw new RecordDeserializationException(partition, 
record.offset(),
-"Error deserializing key/value for partition " + partition 
+
+throw new KeyDeserializationException(partition, offset, keyBytes, 
valueBytes, headers, record.timestamp(),

Review Comment:
   I think you need a TimestampType also on the constructor for the 
`RecordDeserializationException`.



##
clients/src/main/java/org/apache/kafka/common/errors/KeyDeserializationException.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
+
+import java.nio.ByteBuffer;
+
+public class KeyDeserializationException extends 
RecordDeserializationException {
+private final static long serialVersionUID = 1L;
+public KeyDeserializationException(TopicPartition partition, long offset, 
ByteBuffer key, ByteBuffer value, Headers headers, long timestamp, String 
message, Throwable cause) {

Review Comment:
   Please split this enormously long line :)



##
clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java:
##
@@ -17,21 +17,55 @@
 package org.apache.kafka.common.errors;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
 
 /**
  *  This exception is raised for any error that occurs while deserializing 
records received by the consumer using 
  *  the configured {@link org.apache.kafka.common.serialization.Deserializer}.
  */
 public class RecordDeserializationException extends SerializationException {
 
-private static final long serialVersionUID = 1L;
+private static final long serialVersionUID = 2L;
 private final TopicPartition partition;
 private final long offset;
+private final long timestamp;
+private final ByteBuffer key;
+private final ByteBuffer value;
+private final Headers headers;
 
-public RecordDeserializationException(TopicPartition partition, long 
offset, String message, Throwable cause) {
+@Deprecated
+public 

[jira] [Commented] (KAFKA-16493) Avoid unneeded subscription regex check if metadata version unchanged

2024-04-19 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839100#comment-17839100
 ] 

Phuc Hong Tran commented on KAFKA-16493:


[~lianetm] I’ll come back to this ticket this weekend and will try to get a 
pull request asap. Sorry for the delay

> Avoid unneeded subscription regex check if metadata version unchanged
> -
>
> Key: KAFKA-16493
> URL: https://issues.apache.org/jira/browse/KAFKA-16493
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> When using pattern subscription (java pattern), the new consumer regularly 
> checks if the list of topics that match the regex has changed. This is done 
> as part of the consumer poll loop, and it evaluates the regex using the 
> latest cluster metadata. As an improvement, we should avoid evaluating the 
> regex if the metadata version hasn't changed (similar to what the legacy 
> coordinator does 
> [here|https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L435C10-L435C41])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16493) Avoid unneeded subscription regex check if metadata version unchanged

2024-04-19 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839096#comment-17839096
 ] 

Lianet Magrans commented on KAFKA-16493:


Hey [~phuctran], any progress on this one? Even though it's a performance 
improvement I'm afraid it's a very sensitive one given that it would affect the 
poll loop, so we need to make sure it makes it into 3.8 (it had the wrong fix 
version before, I just updated it). Let me know if you have any questions. 
Thanks!

> Avoid unneeded subscription regex check if metadata version unchanged
> -
>
> Key: KAFKA-16493
> URL: https://issues.apache.org/jira/browse/KAFKA-16493
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> When using pattern subscription (java pattern), the new consumer regularly 
> checks if the list of topics that match the regex has changed. This is done 
> as part of the consumer poll loop, and it evaluates the regex using the 
> latest cluster metadata. As an improvement, we should avoid evaluating the 
> regex if the metadata version hasn't changed (similar to what the legacy 
> coordinator does 
> [here|https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L435C10-L435C41])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16493) Avoid unneeded subscription regex check if metadata version unchanged

2024-04-19 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16493:
---
Fix Version/s: 3.8.0
   (was: 4.0.0)

> Avoid unneeded subscription regex check if metadata version unchanged
> -
>
> Key: KAFKA-16493
> URL: https://issues.apache.org/jira/browse/KAFKA-16493
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> When using pattern subscription (java pattern), the new consumer regularly 
> checks if the list of topics that match the regex has changed. This is done 
> as part of the consumer poll loop, and it evaluates the regex using the 
> latest cluster metadata. As an improvement, we should avoid evaluating the 
> regex if the metadata version hasn't changed (similar to what the legacy 
> coordinator does 
> [here|https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L435C10-L435C41])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572801229


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode
+throw new ControllerMovedException(s"Cannot set entity configs when 
there is no controller.")
+}
+
+// If there is a KRaft controller defined, don't even attempt this write. 
The broker will soon get a UMR
+// from the new KRaft controller that lets it know about the new 
controller. It will then forward
+// IncrementalAlterConfig requests instead of processing directly.
+if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) {
+  throw new ControllerMovedException(s"Cannot set entity configs directly 
when there is a KRaft controller.")
+}
 
 def set(configData: Array[Byte]): SetDataResponse = {
   val setDataRequest = 
SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName),
 configData, ZkVersion.MatchAnyVersion)
-  retryRequestUntilConnected(setDataRequest)
+  if (controllerRegistration.zkVersion > 0) {
+// Pass the zkVersion previously captured to ensure the controller 
hasn't changed to KRaft while
+// this method was processing.
+retryRequestUntilConnected(setDataRequest, 
controllerRegistration.zkVersion)
+  } else {
+retryRequestUntilConnected(setDataRequest)

Review Comment:
   On second thought, this isn't quite right. I need to rework this a bit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16486) Integrate metric measurability changes in metrics collector

2024-04-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16486.
-
Fix Version/s: 3.8.0
   Resolution: Done

> Integrate metric measurability changes in metrics collector
> ---
>
> Key: KAFKA-16486
> URL: https://issues.apache.org/jira/browse/KAFKA-16486
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16486: Integrate KIP-1019 measurability changes (KIP-714) [kafka]

2024-04-19 Thread via GitHub


mjsax merged PR #15682:
URL: https://github.com/apache/kafka/pull/15682


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572780325


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode
+throw new ControllerMovedException(s"Cannot set entity configs when 
there is no controller.")
+}
+
+// If there is a KRaft controller defined, don't even attempt this write. 
The broker will soon get a UMR
+// from the new KRaft controller that lets it know about the new 
controller. It will then forward
+// IncrementalAlterConfig requests instead of processing directly.
+if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) {
+  throw new ControllerMovedException(s"Cannot set entity configs directly 
when there is a KRaft controller.")
+}
 
 def set(configData: Array[Byte]): SetDataResponse = {
   val setDataRequest = 
SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName),
 configData, ZkVersion.MatchAnyVersion)
-  retryRequestUntilConnected(setDataRequest)
+  if (controllerRegistration.zkVersion > 0) {
+// Pass the zkVersion previously captured to ensure the controller 
hasn't changed to KRaft while
+// this method was processing.
+retryRequestUntilConnected(setDataRequest, 
controllerRegistration.zkVersion)
+  } else {
+retryRequestUntilConnected(setDataRequest)

Review Comment:
   This is essentially the same as the match None case above. In our 
integration tests, we can (and apparently do) set configs in ZK before the 
controller gets elected for the first time. This was kept to avoid breaking a 
bunch of tests.
   
   Historically, setting configs will never fail (last writer wins) so neither 
the client nor broker implement any retries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572777334


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode
+throw new ControllerMovedException(s"Cannot set entity configs when 
there is no controller.")
+}
+
+// If there is a KRaft controller defined, don't even attempt this write. 
The broker will soon get a UMR
+// from the new KRaft controller that lets it know about the new 
controller. It will then forward
+// IncrementalAlterConfig requests instead of processing directly.
+if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) {
+  throw new ControllerMovedException(s"Cannot set entity configs directly 
when there is a KRaft controller.")
+}
 
 def set(configData: Array[Byte]): SetDataResponse = {
   val setDataRequest = 
SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName),
 configData, ZkVersion.MatchAnyVersion)
-  retryRequestUntilConnected(setDataRequest)
+  if (controllerRegistration.zkVersion > 0) {

Review Comment:
   Yes. This makes the update here more like the updates made from the ZK 
controller. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572776248


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -950,16 +980,47 @@ class ZkMigrationIntegrationTest {
 dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get
   }
 
-  def alterTopicConfig(admin: Admin): AlterConfigsResult = {
+  def alterBrokerConfigs(admin: Admin): Unit = {
+val defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
"")
+val defaultBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "8640"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
+val broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1")
+val specificBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "4320"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+
+TestUtils.retry(6) {
+  val result = admin.incrementalAlterConfigs(Map(
+defaultBrokerResource -> defaultBrokerConfigs,
+broker0Resource -> specificBrokerConfigs,
+broker1Resource -> specificBrokerConfigs
+  ).asJava)
+  try {
+result.all().get(10, TimeUnit.SECONDS)
+  } catch {
+case t: Throwable => fail("Alter Broker Configs had an error", t)
+  }
+}

Review Comment:
   > Could we verify that a KRaft controller is ready before testing this 
operation to make the outcome more predictable?
   
   I tried this at first, but we don't actually expose this to the client 
(whether the controller is ZK or KRaft). In fact, we lie to the client and tell 
it that a random broker is the controller. I could have "pierced the veil" to 
access the underlying controller classes, but a retry seemed easier. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-19 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1572735673


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+partitionAssignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) {
+simulateIncrementalRebalance(topicMetadata);
+}
+}
+
+private Map createTopicMetadata() {
+Map topicMetadata = new 

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-19 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1572729047


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.VersionedMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TargetAssignmentBuilderBenchmark {
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+private static final String GROUP_ID = "benchmark-group";
+
+private static final int GROUP_EPOCH = 0;
+
+private PartitionAssignor partitionAssignor;
+
+private Map subscriptionMetadata = 
Collections.emptyMap();
+
+private TargetAssignmentBuilder targetAssignmentBuilder;
+
+private AssignmentSpec assignmentSpec;
+
+private final List allTopicNames = new ArrayList<>(topicCount);
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+// For this benchmark we will use the Uniform Assignor
+// and a group that has a homogeneous subscription model.
+partitionAssignor = new UniformAssignor();
+
+subscriptionMetadata = generateMockSubscriptionMetadata();
+Map members = generateMockMembers();
+Map existingTargetAssignment = 
generateMockInitialTargetAssignment();
+
+// Add a new member to trigger a rebalance.
+Set subscribedTopics = new 
HashSet<>(subscriptionMetadata.keySet());
+
+ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder("new-member")
+.setSubscribedTopicNames(new ArrayList<>(subscribedTopics))
+.build();
+
+targetAssignmentBuilder = new TargetAssignmentBuilder(GROUP_ID, 
GROUP_EPOCH, partitionAssignor)
+.withMembers(members)
+.withSubscriptionMetadata(subscriptionMetadata)
+.withTargetAssignment(existingTargetAssignment)
+.addOrUpdateMember(newMember.memberId(), newMember);
+}
+
+private Map generateMockMembers() {
+Map members = new HashMap<>();
+
+for (int i = 0; i < memberCount - 1; i++) 

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-19 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1572726906


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+

Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


cmccabe commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572722514


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode
+throw new ControllerMovedException(s"Cannot set entity configs when 
there is no controller.")
+}
+
+// If there is a KRaft controller defined, don't even attempt this write. 
The broker will soon get a UMR
+// from the new KRaft controller that lets it know about the new 
controller. It will then forward
+// IncrementalAlterConfig requests instead of processing directly.
+if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) {
+  throw new ControllerMovedException(s"Cannot set entity configs directly 
when there is a KRaft controller.")
+}
 
 def set(configData: Array[Byte]): SetDataResponse = {
   val setDataRequest = 
SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName),
 configData, ZkVersion.MatchAnyVersion)
-  retryRequestUntilConnected(setDataRequest)
+  if (controllerRegistration.zkVersion > 0) {
+// Pass the zkVersion previously captured to ensure the controller 
hasn't changed to KRaft while
+// this method was processing.
+retryRequestUntilConnected(setDataRequest, 
controllerRegistration.zkVersion)
+  } else {
+retryRequestUntilConnected(setDataRequest)

Review Comment:
   can you explain how we'd get to this branch of the "if" statement? Surely 
the controller znode always has a version?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


cmccabe commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572717868


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode
+throw new ControllerMovedException(s"Cannot set entity configs when 
there is no controller.")
+}
+
+// If there is a KRaft controller defined, don't even attempt this write. 
The broker will soon get a UMR
+// from the new KRaft controller that lets it know about the new 
controller. It will then forward
+// IncrementalAlterConfig requests instead of processing directly.
+if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) {
+  throw new ControllerMovedException(s"Cannot set entity configs directly 
when there is a KRaft controller.")
+}
 
 def set(configData: Array[Byte]): SetDataResponse = {
   val setDataRequest = 
SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName),
 configData, ZkVersion.MatchAnyVersion)
-  retryRequestUntilConnected(setDataRequest)
+  if (controllerRegistration.zkVersion > 0) {

Review Comment:
   this also fixes the lack of fencing, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


cmccabe commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572716567


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode

Review Comment:
   I was a bit surprised that `ControlledMovedException` was not retriable. 
Should we use a retriable exception here, or does it not matter?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


cmccabe commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572716567


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode

Review Comment:
   I was a bit surprised that `ControlledMovedException` was not retriable. 
Should we use a retriable exception here, or does it not matter?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16588) broker shutdown hangs when `log.segment.delete.delay.ms` is zero

2024-04-19 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16588:
---
Description: 
see 
[https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154]

We call `take` even though the `logsToBeDeleted` is empty, and 
`KafkaScheduler#shutdown` call `shutdown` rather than `shudownNow` 
([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)]

Hence, the thread won't be completed forever, and it blocks the shutdown of 
broker.

We should replace the `take` by `poll` since we have checked the element before.

BTW, the zero is allowed 
(https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java#L258)

  was:
see 
https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154

We call `take` even though the `logsToBeDeleted` is empty, and 
`KafkaScheduler#shutdown` call `shutdown` rather than `shudownNow` 
([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)]

Hence, the thread won't be completed forever, and it blocks the shutdown of 
broker.

We should replace the `take` by `poll` since we have checked the element before.

 

 


> broker shutdown hangs when `log.segment.delete.delay.ms` is zero 
> -
>
> Key: KAFKA-16588
> URL: https://issues.apache.org/jira/browse/KAFKA-16588
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> see 
> [https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154]
> We call `take` even though the `logsToBeDeleted` is empty, and 
> `KafkaScheduler#shutdown` call `shutdown` rather than `shudownNow` 
> ([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)]
> Hence, the thread won't be completed forever, and it blocks the shutdown of 
> broker.
> We should replace the `take` by `poll` since we have checked the element 
> before.
> BTW, the zero is allowed 
> (https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java#L258)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16588) broker shutdown hangs when `log.segment.delete.delay.ms` is zero

2024-04-19 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16588:
---
Description: 
see 
[https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154]

If `log.segment.delete.delay.ms` is zero, We call `take` even though the 
`logsToBeDeleted` is empty, and `KafkaScheduler#shutdown` call `shutdown` 
rather than `shudownNow` 
([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)]

Hence, the thread won't be completed forever, and it blocks the shutdown of 
broker.

We should replace the `take` by `poll` since we have checked the element before.

BTW, the zero is allowed 
([https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java#L258])

  was:
see 
[https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154]

We call `take` even though the `logsToBeDeleted` is empty, and 
`KafkaScheduler#shutdown` call `shutdown` rather than `shudownNow` 
([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)]

Hence, the thread won't be completed forever, and it blocks the shutdown of 
broker.

We should replace the `take` by `poll` since we have checked the element before.

BTW, the zero is allowed 
(https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java#L258)


> broker shutdown hangs when `log.segment.delete.delay.ms` is zero 
> -
>
> Key: KAFKA-16588
> URL: https://issues.apache.org/jira/browse/KAFKA-16588
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> see 
> [https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154]
> If `log.segment.delete.delay.ms` is zero, We call `take` even though the 
> `logsToBeDeleted` is empty, and `KafkaScheduler#shutdown` call `shutdown` 
> rather than `shudownNow` 
> ([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)]
> Hence, the thread won't be completed forever, and it blocks the shutdown of 
> broker.
> We should replace the `take` by `poll` since we have checked the element 
> before.
> BTW, the zero is allowed 
> ([https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java#L258])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16588) broker shutdown hangs when `log.segment.delete.delay.ms` is zero

2024-04-19 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16588:
--

 Summary: broker shutdown hangs when `log.segment.delete.delay.ms` 
is zero 
 Key: KAFKA-16588
 URL: https://issues.apache.org/jira/browse/KAFKA-16588
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


see 
https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154

We call `take` even though the `logsToBeDeleted` is empty, and 
`KafkaScheduler#shutdown` call `shutdown` rather than `shudownNow` 
([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)]

Hence, the thread won't be completed forever, and it blocks the shutdown of 
broker.

We should replace the `take` by `poll` since we have checked the element before.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]

2024-04-19 Thread via GitHub


phooq commented on PR #15750:
URL: https://github.com/apache/kafka/pull/15750#issuecomment-2066909600

   Thanks for the feedback @lianetm !
   
   I agree with the point about the back-and-forth jumping between the child 
and the parent coming with this implementation, however asking each child to 
override the `toString` does not completely eliminate the ping-pong right?
   
   Also, the existing code itself has the `RequestState` hardcoded in the base, 
which is not helpful when debugging with any of its child, so we either call 
`getClass().getSimpleName()` in every child's `toString` override, or just call 
it once in the base. The latter saves some work for us.
   
   Two additional value this implementation bring are:
   
   1.  It makes its easier to build the debug string in the proper format. You 
can see right now the `toString` of `OffsetFetchRequestState` has to build the 
"{}" correctly by itself, and to enforce the uniformity, each of `RequestState` 
children has to do the same. This is trivial from programming perspective, but 
if any of the format building is wrong, it makes reading the messages 
significantly harder.
   
   2. It improves the readability of the debugging message. Imaging more 
inheritances comes into the tree of `RequestState`. Having each child 
overriding the `toString` gives us something like 
`childName1={childName2={childName2={}...base={}}}`. With this implementation, 
we will have a much simpler version like `childName={properties...}`
   
   I indeed agree that `toStringBase()` as a name looks a little confusing. It 
essentially is building the details of the object into a string, so maybe 
naming it as `getDetails()` is better?
   
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16579) Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer

2024-04-19 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-16579.

Resolution: Fixed

> Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer
> -
>
> Key: KAFKA-16579
> URL: https://issues.apache.org/jira/browse/KAFKA-16579
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated 
> a slew of system tests to run both the "old" and "new" implementations. 
> KAFKA-16271 updated the system tests in {{consumer_rolling_upgrade_test.py}} 
> so it could test the new consumer. However, the test is tailored specifically 
> to the "old" Consumer's protocol and assignment strategy upgrade.
> Unsurprisingly, when we run those system tests with the new 
> {{AsyncKafkaConsumer}}, we get errors like the following:
> {code}
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   29.634 seconds
> AssertionError("Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1)})}")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 77, in rolling_update_test
> self._verify_range_assignment(consumer)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 41, in _verify_range_assignment
> "Mismatched assignment: %s" % assignment
> AssertionError: Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1)})}
> {code}
> The task here is to revert the changes made in KAFKA-16271.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: example.com moved [kafka]

2024-04-19 Thread via GitHub


akatona84 commented on PR #15758:
URL: https://github.com/apache/kafka/pull/15758#issuecomment-2066839773

   ipv6 alternates between 2606:2800:21f:cb07:6820:80da:af6b:8b2c and the old 
one, it's hard to fix the test like this. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16579: Revert Consumer Rolling Upgrade [kafka]

2024-04-19 Thread via GitHub


lianetm commented on code in PR #15753:
URL: https://github.com/apache/kafka/pull/15753#discussion_r1572569170


##
tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py:
##
@@ -56,12 +56,7 @@ def _verify_roundrobin_assignment(self, consumer):
 metadata_quorum=[quorum.isolated_kraft],
 use_new_coordinator=[True, False]
 )
-@matrix(
-metadata_quorum=quorum.all_kraft,
-use_new_coordinator=[True],
-group_protocol=consumer_group.all_group_protocols

Review Comment:
   You're right, I was only concerned about the new coordinator + classic 
combination, missed that it's coming from the default and other combination, 
all good then. Thanks! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16579: Revert Consumer Rolling Upgrade [kafka]

2024-04-19 Thread via GitHub


lianetm commented on code in PR #15753:
URL: https://github.com/apache/kafka/pull/15753#discussion_r1572569170


##
tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py:
##
@@ -56,12 +56,7 @@ def _verify_roundrobin_assignment(self, consumer):
 metadata_quorum=[quorum.isolated_kraft],
 use_new_coordinator=[True, False]
 )
-@matrix(
-metadata_quorum=quorum.all_kraft,
-use_new_coordinator=[True],
-group_protocol=consumer_group.all_group_protocols

Review Comment:
   You're right, I was only concerned about the new coordinator + classic 
combination, missed that it's coming from the default, all good then. Thanks! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-19 Thread via GitHub


soarez commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1572557807


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -786,12 +773,29 @@ public void run() throws Exception {
 }
 }
 
+class RecoverMigrationStateFromZKEvent extends MigrationEvent {
+@Override
+public void run() throws Exception {
+applyMigrationOperation("Recovering migration state from ZK", 
zkMigrationClient::getOrCreateMigrationRecoveryState);
+String maybeDone = 
migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done";
+log.info("Initial migration of ZK metadata is {}.", maybeDone);
+
+// Once we've recovered the migration state from ZK, install this 
class as a metadata publisher
+// by calling the initialZkLoadHandler.
+initialZkLoadHandler.accept(KRaftMigrationDriver.this);
+
+// Transition to INACTIVE state and wait for leadership events.
+transitionTo(MigrationDriverState.INACTIVE);
+}
+}
+
 class PollEvent extends MigrationEvent {
+
 @Override
 public void run() throws Exception {
 switch (migrationState) {
 case UNINITIALIZED:
-recoverMigrationStateFromZK();
+eventQueue.append(new RecoverMigrationStateFromZKEvent());

Review Comment:
   On second thought, I don't think my question makes sense. The following 
`PollEvent` can only after `RecoverMigrationStateFromZKEvent` finishes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-19 Thread via GitHub


soarez commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1572550218


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -786,12 +773,29 @@ public void run() throws Exception {
 }
 }
 
+class RecoverMigrationStateFromZKEvent extends MigrationEvent {
+@Override
+public void run() throws Exception {
+applyMigrationOperation("Recovering migration state from ZK", 
zkMigrationClient::getOrCreateMigrationRecoveryState);
+String maybeDone = 
migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done";
+log.info("Initial migration of ZK metadata is {}.", maybeDone);
+
+// Once we've recovered the migration state from ZK, install this 
class as a metadata publisher
+// by calling the initialZkLoadHandler.
+initialZkLoadHandler.accept(KRaftMigrationDriver.this);
+
+// Transition to INACTIVE state and wait for leadership events.
+transitionTo(MigrationDriverState.INACTIVE);
+}
+}
+
 class PollEvent extends MigrationEvent {
+
 @Override
 public void run() throws Exception {
 switch (migrationState) {
 case UNINITIALIZED:
-recoverMigrationStateFromZK();
+eventQueue.append(new RecoverMigrationStateFromZKEvent());

Review Comment:
   Are we allowing a race between the `RecoverMigrationStateFromZKEvent` and 
the next `PollEvent` scheduled after the switch? Maybe this could be more 
straightforward if we only schedule the next poll once 
`RecoverMigrationStateFromZKEvent` finishes, either normally or exceptionally? 
WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-04-19 Thread via GitHub


lucasbru merged PR #15594:
URL: https://github.com/apache/kafka/pull/15594


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16579: Revert Consumer Rolling Upgrade [kafka]

2024-04-19 Thread via GitHub


lucasbru commented on code in PR #15753:
URL: https://github.com/apache/kafka/pull/15753#discussion_r1572534438


##
tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py:
##
@@ -56,12 +56,7 @@ def _verify_roundrobin_assignment(self, consumer):
 metadata_quorum=[quorum.isolated_kraft],
 use_new_coordinator=[True, False]
 )
-@matrix(
-metadata_quorum=quorum.all_kraft,
-use_new_coordinator=[True],
-group_protocol=consumer_group.all_group_protocols

Review Comment:
   Hey @lianetm. My understanding was that this does test with the new 
coordinator also after the update, and the default is group_protocol=classic, 
so this should be fine right? Unless you want to add `combined_kraft`, but not 
sure if that's an interesting case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-19 Thread via GitHub


jeqo commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1572509302


##
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldPathNotationTest.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class FieldPathNotationTest {
+final static String[] EMPTY_PATH = new String[] {};
+
+@Test
+void shouldBuildV1WithDotsAndBacktickPair() {
+// Given v1
+// When path contains dots, then single step path
+assertArrayEquals(
+new String[] {"foo.bar.baz"},
+new SingleFieldPath("foo.bar.baz", 
FieldSyntaxVersion.V1).path());
+// When path contains backticks, then single step path
+assertArrayEquals(
+new String[] {"foo`bar`"},
+new SingleFieldPath("foo`bar`", FieldSyntaxVersion.V1).path());
+// When path contains dots and backticks, then single step path
+assertArrayEquals(
+new String[] {"foo.`bar.baz`"},
+new SingleFieldPath("foo.`bar.baz`", 
FieldSyntaxVersion.V1).path());
+}
+
+@Test
+void shouldBuildV2WithEmptyPath() {
+// Given v2
+// When path is empty
+// Then build a path with no steps
+assertArrayEquals(EMPTY_PATH, new SingleFieldPath("", 
FieldSyntaxVersion.V2).path());
+}
+
+@Test
+void shouldBuildV2WithoutDots() {
+// Given v2
+// When path without dots
+// Then build a single step path
+assertArrayEquals(new String[] {"foobarbaz"}, new 
SingleFieldPath("foobarbaz", FieldSyntaxVersion.V2).path());
+}
+
+@Test
+void shouldBuildV2WhenIncludesDots() {
+// Given v2 and fields without dots
+// When path includes dots
+// Then build a path with steps separated by dots
+assertArrayEquals(new String[] {"foo", "bar", "baz"}, new 
SingleFieldPath("foo.bar.baz", FieldSyntaxVersion.V2).path());
+}
+
+@Test
+void shouldBuildV2WithoutWrappingBackticks() {
+// Given v2 and fields without dots
+// When backticks are not wrapping a field name
+// Then build a single step path including backticks
+assertArrayEquals(new String[] {"foo`bar`baz"}, new 
SingleFieldPath("foo`bar`baz", FieldSyntaxVersion.V2).path());
+}
+
+@Test
+void shouldBuildV2WhenIncludesDotsAndBacktickPair() {
+// Given v2 and fields including dots
+// When backticks are wrapping a field name (i.e. withing edges or 
between dots)
+// Then build a path with steps separated by dots and not including 
backticks
+assertArrayEquals(new String[] {"foo.bar.baz"}, new 
SingleFieldPath("`foo.bar.baz`", FieldSyntaxVersion.V2).path());
+assertArrayEquals(new String[] {"foo", "bar.baz"}, new 
SingleFieldPath("foo.`bar.baz`", FieldSyntaxVersion.V2).path());
+assertArrayEquals(new String[] {"foo.bar", "baz"}, new 
SingleFieldPath("`foo.bar`.baz", FieldSyntaxVersion.V2).path());
+assertArrayEquals(new String[] {"foo", "bar", "baz"}, new 
SingleFieldPath("foo.`bar`.baz", FieldSyntaxVersion.V2).path());
+}
+
+@Test
+void shouldBuildV2AndIgnoreBackticksThatAreNotWrapping() {
+// Given v2 and fields including dots and backticks
+// When backticks are wrapping a field name (i.e. withing edges or 
between dots)
+// Then build a path with steps separated by dots and including 
non-wrapping backticks
+assertArrayEquals(new String[] {"foo", "`bar.baz"}, new 
SingleFieldPath("foo.``bar.baz`", FieldSyntaxVersion.V2).path());
+assertArrayEquals(new String[] {"foo", "bar.baz`"}, new 
SingleFieldPath("foo.`bar.baz``", FieldSyntaxVersion.V2).path());
+assertArrayEquals(new String[] {"foo", "ba`r.baz"}, new 

Re: [PR] KAFKA-16579: Revert Consumer Rolling Upgrade [kafka]

2024-04-19 Thread via GitHub


lianetm commented on code in PR #15753:
URL: https://github.com/apache/kafka/pull/15753#discussion_r1572508072


##
tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py:
##
@@ -56,12 +56,7 @@ def _verify_roundrobin_assignment(self, consumer):
 metadata_quorum=[quorum.isolated_kraft],
 use_new_coordinator=[True, False]
 )
-@matrix(
-metadata_quorum=quorum.all_kraft,
-use_new_coordinator=[True],
-group_protocol=consumer_group.all_group_protocols

Review Comment:
   Hey, sorry I'm late to this party, but important concern. Don't we want to 
keep testing this with the new coordinator and the classic protocol? I would 
say yes. I would expect we only want to exclude the consumer protocol, so we 
would want to keep the test with the params it had, but changing 
`group_protocol=consumer_group.classic`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]

2024-04-19 Thread via GitHub


lianetm commented on PR #15750:
URL: https://github.com/apache/kafka/pull/15750#issuecomment-2066727111

   Thanks for the info @kirktrue , I see you mentioned it was a "suggested 
pattern", and I won't hold a strong position here but still sharing how I see 
it (struggling to find a value in it that compensates the drawbacks) 
   
   Drawbacks:
   - overcomplicated flow to understand where a class `toString` comes from 
(back-and-forth between a child class and its parent): child has no `toString`, 
jump to parent that has it but uses a `toStringBase`, defined in the parent but 
redefined in the child, so jump back to child where `toStringBase` is 
redefined. I find more intuitive/simpler to just have child classes with 
`toString` that uses a `toStringBase` defined in parent (like its name clearly 
indicates). 
   
   Value it brings:
   > It allows the parent to keep its state private, but still "expose" it via 
toString()
   We get exactly the same in both alternatives. Parent keeps state private, 
exposed via toStringBase/toString
   
   > It helps to ensure the correct class name appears in the output
   This was actually not happening when I reviewed this PR the first time, and 
it was indeed confusing. Still with the fix to get the child class name in the 
parent, I wonder if it's worth the ping-pong? A child class using a wrong class 
name in the `toString` seems like an unlikely silly bug, easy to catch/fix 
during the initial development phase. 
   
> It keeps the output uniform and reminds the developer to keep the parent 
state
   A little bit more uniform, yes, but only as long as the developer does it 
right when overriding the `toStringBase` right? So we're just kind of shifting 
the responsibility from overriding the toString properly, to overriding the 
toStringBase properly (at the expense of the ping-pong). And we're not truly 
reminding the developer unless we have an abstract method I would say, which is 
not the case. 
   
   Just sharing my thoughts, I may be bothered by the ping-pong flow more than 
others in this case ;)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-19 Thread via GitHub


lucasbru commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1572360676


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed, even when no commit sync is performed as part of the close 
(due to auto-commit
+// disabled, or simply because there no consumed offsets).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount)
+  }
+
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first

Review Comment:
   Okay got it. Yeah, the state "async commit pending because I do not know the 
coordinator yet" and the state "async commit is already sent to the 
coordinator" may cover two different cases. In fact, in the legacy consumer, 
these are two independent states the async commit can be in. In the async 
consumer as well, although waiting for the async commit to complete works 
independently of that state (since the different states are handled by the 
background thread, and we simplify wait for the future in the foreground 
thread). 



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +634,85 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+final TopicPartition tp = new TopicPartition("foo", 0);
+testIncompleteAsyncCommit(tp);
+
+final CompletableFuture asyncCommitFuture = 
getLastEnqueuedEventFuture();
+
+// Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+// Complete exceptionally async commit event
+asyncCommitFuture.completeExceptionally(new KafkaException("Test 
exception"));
+
+// Commit async is completed, so commit sync completes immediately 
(since offsets are empty)
+assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), 
Duration.ofMillis(100)));
+}
+
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() 
{
+final TopicPartition tp = new TopicPartition("foo", 0);
+testIncompleteAsyncCommit(tp);
+
+final CompletableFuture asyncCommitFuture = 
getLastEnqueuedEventFuture();
+
+// Mock to complete sync event
+completeCommitSyncApplicationEventSuccessfully();
+
+// Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), 

Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-19 Thread via GitHub


jeqo commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1572436483


##
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java:
##
@@ -60,6 +62,30 @@ public void schemaless() {
 assertEquals(expectedKey, transformedRecord.key());
 }
 
+@Test
+public void schemalessAndNestedFields() {
+Map configs = new HashMap<>();
+configs.put("fields", "a,b.c");
+configs.put(FieldSyntaxVersion.FIELD_SYNTAX_VERSION_CONFIG, 
FieldSyntaxVersion.V2.name());
+xform.configure(configs);
+
+final HashMap value = new HashMap<>();
+value.put("a", 1);
+final HashMap nested = new HashMap<>();
+nested.put("c", 3);
+value.put("b", nested);
+
+final SinkRecord record = new SinkRecord("", 0, null, null, null, 
value, 0);
+final SinkRecord transformedRecord = xform.apply(record);
+
+final HashMap expectedKey = new HashMap<>();
+expectedKey.put("a", 1);
+expectedKey.put("b.c", 3);

Review Comment:
   Good catch. I don't remember this scenario being discussed in the KIP 
thread. Was using the original keys as they are meant to be unique, but they 
could also contain all kind of escape fields making it hard to use. 
   Happy to switch into deriving the key schema from the nesting structure. 
Will make an update to the KIP to make this explicit as well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-19 Thread via GitHub


dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1572314707


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+

Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-04-19 Thread via GitHub


mdedetrich commented on PR #13375:
URL: https://github.com/apache/kafka/pull/13375#issuecomment-2066602056

   @yashmayya Are you still working on this to get it over the finish line or 
is it okay for me to take over?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]

2024-04-19 Thread via GitHub


lianetm commented on PR #15738:
URL: https://github.com/apache/kafka/pull/15738#issuecomment-2066577956

   Hey @lucasbru , answering your questions : the new behaviour of the static 
membership regarding a member that joins with dup group instance Id is 
documented in 
[this](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-StaticMembership(KIP-345))
 section of the KIP. We've also discussed it with @dajac and other client teams 
(librd), seeing the improvement the new protocol bring in this area (mainly in 
cases of conflicting members, that continuously kick each other out with the 
classic protocol)
  
   Your question does makes me notice that, even though the KIP describes how 
conflicting static members behave in the new protocol, it would probably be 
helpful extend that explanation to point out the fundamental difference it has 
with the legacy protocol and how it is an improved approach. (I can't find that 
explained in the KIP. If I'm not missing it, it would probably be a good update 
to the KIP [static membership 
section](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-StaticMembership(KIP-345))
 @dajac ?)
   
   Thanks @lucasbru ! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-19 Thread via GitHub


dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1572307205


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+partitionAssignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) {
+simulateIncrementalRebalance(topicMetadata);
+}
+}
+
+private Map createTopicMetadata() {
+Map topicMetadata = new 

Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-19 Thread via GitHub


cadonna commented on PR #15613:
URL: https://github.com/apache/kafka/pull/15613#issuecomment-2066478507

   > > @lucasbru Thanks for the PR!
   > > The unit tests you added fail in the build and also for me locally.
   > > Plus, I have a question regarding the integration tests.
   > 
   > @cadonna Thanks for the review, I hadn't noticed the test failures. Seems 
the `ArgumentCaptor`s type-matching introduced with Mockito 5 does not work on 
Java 8. I committed a workaround. Also, I'm both shocked and impressed that you 
are using Java 8 locally.
   
   It has to run at least on Java 8, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-19 Thread via GitHub


cadonna commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1572290394


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed, even when no commit sync is performed as part of the close 
(due to auto-commit
+// disabled, or simply because there no consumed offsets).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount)
+  }
+
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first

Review Comment:
   Fair enough, but why is that important? Is the intention that the async 
commit needs to lookup the group coordinator?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1005,6 +1102,43 @@ public void testNoWakeupInCloseCommit() {
 assertFalse(capturedEvent.get().future().isCompletedExceptionally());
 }
 
+@Test
+public void testCloseAwaitPendingAsyncCommitIncomplete() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));

Review Comment:
   Do we need this stub?



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed, even when no commit sync is performed as part of the close 
(due to auto-commit
+// disabled, or simply because there no consumed offsets).

Review Comment:
   ```suggestion
   // disabled, or simply because there are no consumed offsets).
   ```



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +634,85 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+final TopicPartition tp = new TopicPartition("foo", 0);
+testIncompleteAsyncCommit(tp);
+
+final CompletableFuture asyncCommitFuture 

[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-19 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838979#comment-17838979
 ] 

Stanislav Spiridonov commented on KAFKA-16585:
--

The case is relatively simple. I have KTable with entities that have to be 
enrichment with icon attribute from side service. So, the processor maintains 
the internal store with entities keys and periodically ask the service for 
update for registered ids. If icon has changes it forward the message with new 
icon. The key of record is entity key (String), value is a icon (String).

 

BTW I faced into strange behaviour - if I forward new record from another 
thread it arrived to incorrect processor. So now I just update store from icon 
KTable instead of forward the record.   

> No way to forward message from punctuation method in the FixedKeyProcessor
> --
>
> Key: KAFKA-16585
> URL: https://issues.apache.org/jira/browse/KAFKA-16585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
> doesn't have a public constructor and can be created based on existing 
> records. But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


soarez commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572295590


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -950,16 +980,47 @@ class ZkMigrationIntegrationTest {
 dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get
   }
 
-  def alterTopicConfig(admin: Admin): AlterConfigsResult = {
+  def alterBrokerConfigs(admin: Admin): Unit = {
+val defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
"")
+val defaultBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "8640"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
+val broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1")
+val specificBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "4320"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+
+TestUtils.retry(6) {
+  val result = admin.incrementalAlterConfigs(Map(
+defaultBrokerResource -> defaultBrokerConfigs,
+broker0Resource -> specificBrokerConfigs,
+broker1Resource -> specificBrokerConfigs
+  ).asJava)
+  try {
+result.all().get(10, TimeUnit.SECONDS)
+  } catch {
+case t: Throwable => fail("Alter Broker Configs had an error", t)
+  }
+}

Review Comment:
   Do we really need the retry logic here? Could we verify that a KRaft 
controller is ready before testing this operation to make the outcome more 
predictable? Or is it expected that we may have to repeat the operation a few 
times regardless? And if that's the case should we update docs too to let 
operators know they might have to try a few times?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 System test for reverting migration to ZK [kafka]

2024-04-19 Thread via GitHub


mumrah commented on PR #15754:
URL: https://github.com/apache/kafka/pull/15754#issuecomment-2066454711

   ```
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.11.4
   session_id:   2024-04-18--017
   run time: 26 minutes 43.661 seconds
   tests run:8
   passed:   8
   flaky:0
   failed:   0
   ignored:  0
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=False.downgrade_to_zk=False
   status: PASS
   run time:   2 minutes 54.546 seconds
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=False.downgrade_to_zk=True
   status: PASS
   run time:   3 minutes 32.044 seconds
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=True.downgrade_to_zk=False
   status: PASS
   run time:   2 minutes 56.290 seconds
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=True.downgrade_to_zk=True
   status: PASS
   run time:   3 minutes 38.101 seconds
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=False.downgrade_to_zk=False
   status: PASS
   run time:   2 minutes 52.691 seconds
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=False.downgrade_to_zk=True
   status: PASS
   run time:   3 minutes 56.394 seconds
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=True.downgrade_to_zk=False
   status: PASS
   run time:   3 minutes 8.253 seconds
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=True.downgrade_to_zk=True
   status: PASS
   run time:   3 minutes 44.203 seconds
   

   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: example.com moved [kafka]

2024-04-19 Thread via GitHub


akatona84 commented on PR #15758:
URL: https://github.com/apache/kafka/pull/15758#issuecomment-2066418336

   now the ipv6 address is changed to 2606:2800:220:1:248:1893:25c8:1946...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: example.com moved [kafka]

2024-04-19 Thread via GitHub


akatona84 opened a new pull request, #15758:
URL: https://github.com/apache/kafka/pull/15758

   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-19 Thread via GitHub


lucasbru commented on PR #15613:
URL: https://github.com/apache/kafka/pull/15613#issuecomment-2066392315

   > @lucasbru Thanks for the PR!
   > 
   > The unit tests you added fail in the build and also for me locally.
   > 
   > Plus, I have a question regarding the integration tests.
   
   @cadonna Thanks for the review, I hadn't noticed the test failures. Seems 
the `ArgumentCaptor`s type-matching introduced with Mockito 5 does not work on 
Java 8. I committed a workaround. Also, I'm both shocked and impressed that you 
are using Java 8 locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-19 Thread via GitHub


FrankYang0529 commented on code in PR #15745:
URL: https://github.com/apache/kafka/pull/15745#discussion_r1572189143


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -179,8 +186,8 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 throw new IllegalStateException();
 }
 
-ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, 
brokers, controllers, autoStart,
-annot.securityProtocol(), annot.metadataVersion());
+ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, 
brokers, controllers, disksPerBroker,

Review Comment:
   Thanks for the suggestion. I updated it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]

2024-04-19 Thread via GitHub


chia7712 commented on PR #15722:
URL: https://github.com/apache/kafka/pull/15722#issuecomment-2066283854

   > Effectively, such test would verify the behavior of deprecated method. 
What do you think?
   
   we should verify the non-deprecated metrics should have correct doc which is 
not marked as "deprecated". Also, that is what you try to fix, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-19 Thread via GitHub


chia7712 commented on PR #15616:
URL: https://github.com/apache/kafka/pull/15616#issuecomment-2066277923

   This PR is good but it seems to me `LogSegment` should NOT guess the 
directory structure managed by upper class (i.e `LogManager`).
   
   It seems the root cause is caused by following steps:
   1. the segments to be deleted removed from `LocalLog`
   2. `LocalLog#renameDir` move whole folder
   3. `LocalLog#renameDir` update the parent folder for all segments. However, 
the segments to be deleted are removed form inner collection already. Hence, 
some `Segment#log` has a stale file.
   
   If I understand correctly, another solution is that we pass a function to 
get latest dir when calling `deleteSegmentFiles` 
(https://github.com/apache/kafka/blob/2d4abb85bf4a3afb1e3359a05786ab8f3fda127e/core/src/main/scala/kafka/log/LocalLog.scala#L904).
 If deleting segment get not-found error, we call `updateParentDir` and delete 
it again.
   
   WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16561: Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config [kafka]

2024-04-19 Thread via GitHub


OmniaGM commented on PR #15728:
URL: https://github.com/apache/kafka/pull/15728#issuecomment-2066275194

   Thanks for getting the KIP out there for discussion and for fixing the 
tests. Should this PR be a draft until we have the KIP voted in by the 
community?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16561: Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config [kafka]

2024-04-19 Thread via GitHub


aaron-ai commented on PR #15728:
URL: https://github.com/apache/kafka/pull/15728#issuecomment-2066276867

   > Thanks for getting the KIP out there for discussion and for fixing the 
tests. Should this PR be a draft until we have the KIP voted in by the 
community?
   
   OK


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16561: Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config [kafka]

2024-04-19 Thread via GitHub


OmniaGM commented on code in PR #15728:
URL: https://github.com/apache/kafka/pull/15728#discussion_r1572155918


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java:
##
@@ -169,6 +170,7 @@ static Map sourceConsumerConfig(Map props) {
 result.putAll(Utils.entriesWithPrefix(props, CONSUMER_CLIENT_PREFIX));
 result.putAll(Utils.entriesWithPrefix(props, SOURCE_PREFIX + 
CONSUMER_CLIENT_PREFIX));
 result.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
+result.put(ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");

Review Comment:
   @aaron-ai thanks for drafting a KIP that quickly.  I left a couple of 
comments there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


showuon commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572140166


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {

Review Comment:
   Could we add some tests for these changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-19 Thread via GitHub


chia7712 commented on code in PR #15745:
URL: https://github.com/apache/kafka/pull/15745#discussion_r1572122943


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -179,8 +186,8 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 throw new IllegalStateException();
 }
 
-ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, 
brokers, controllers, autoStart,
-annot.securityProtocol(), annot.metadataVersion());
+ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, 
brokers, controllers, disksPerBroker,

Review Comment:
   We can do that in this PR as most changes involved by refactor are in this 
method. Let's do it to decorate our test infrastructure:)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-19 Thread via GitHub


FrankYang0529 commented on code in PR #15745:
URL: https://github.com/apache/kafka/pull/15745#discussion_r1572114481


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -179,8 +186,8 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 throw new IllegalStateException();
 }
 
-ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, 
brokers, controllers, autoStart,
-annot.securityProtocol(), annot.metadataVersion());
+ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, 
brokers, controllers, disksPerBroker,

Review Comment:
   Do we want to use another jira to trace this? Or I remove it in this PR? 
Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16561: Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config [kafka]

2024-04-19 Thread via GitHub


aaron-ai commented on PR #15728:
URL: https://github.com/apache/kafka/pull/15728#issuecomment-2066221145

   KIP has been created here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1039%3A+Disable+automatic+topic+creation+for+MirrorMaker2+consumers


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-19 Thread via GitHub


lucasbru commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1572109503


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed, even when no commit sync is performed as part of the close 
(due to auto-commit
+// disabled, or simply because there no consumed offsets).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount)
+  }
+
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(1, cb.successCount)
+
+// Try with coordinator known

Review Comment:
   We ar guaranteed to have looked up the coordinator here, because we did 
request data from it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-19 Thread via GitHub


lucasbru commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1572109078


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed, even when no commit sync is performed as part of the close 
(due to auto-commit
+// disabled, or simply because there no consumed offsets).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount)
+  }
+
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first

Review Comment:
   We are not guaranteed to have looked up the coordinator here, because we did 
not request any data from it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16579: Revert Consumer Rolling Upgrade [kafka]

2024-04-19 Thread via GitHub


lucasbru merged PR #15753:
URL: https://github.com/apache/kafka/pull/15753


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]

2024-04-19 Thread via GitHub


lucasbru commented on PR #15738:
URL: https://github.com/apache/kafka/pull/15738#issuecomment-2066195894

   LGTM, thanks! 
   
   Merging this, however:
* Have we discussed the behavioral difference with broker team / David?
* Have we documented the behavioral difference anywhere?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]

2024-04-19 Thread via GitHub


lucasbru merged PR #15738:
URL: https://github.com/apache/kafka/pull/15738


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR:fix hint in fetchOffsetForTimestamp [kafka]

2024-04-19 Thread via GitHub


hudeqi opened a new pull request, #15757:
URL: https://github.com/apache/kafka/pull/15757

   A clear hint meaning error: The actual logic is that an error is thrown only 
when the high watermark lags behind the epoch start offset, but the hint 
meaning is opposite.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15709) KRaft support in ServerStartupTest

2024-04-19 Thread Zihao Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838923#comment-17838923
 ] 

Zihao Lin commented on KAFKA-15709:
---

[~mdedetrich] feel free to take over

> KRaft support in ServerStartupTest
> --
>
> Key: KAFKA-15709
> URL: https://issues.apache.org/jira/browse/KAFKA-15709
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Assignee: Zihao Lin
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in ServerStartupTest in 
> core/src/test/scala/unit/kafka/server/ServerStartupTest.scala need to be 
> updated to support KRaft
> 38 : def testBrokerCreatesZKChroot(): Unit = {
> 51 : def testConflictBrokerStartupWithSamePort(): Unit = {
> 65 : def testConflictBrokerRegistration(): Unit = {
> 82 : def testBrokerSelfAware(): Unit = {
> 93 : def testBrokerStateRunningAfterZK(): Unit = {
> Scanned 107 lines. Found 0 KRaft tests out of 5 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-04-19 Thread via GitHub


funky-eyes commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1572073595


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##


Review Comment:
   I have a question, in fact, what this PR does is to provide a standard 
configuration and generate corresponding rate limiters and related monitoring 
indicators, right? Then it needs to be used in the corresponding 
RemoteStorageManager, correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-19 Thread via GitHub


chia7712 commented on code in PR #15745:
URL: https://github.com/apache/kafka/pull/15745#discussion_r1572064864


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -179,8 +186,8 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 throw new IllegalStateException();
 }
 
-ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, 
brokers, controllers, autoStart,
-annot.securityProtocol(), annot.metadataVersion());
+ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, 
brokers, controllers, disksPerBroker,

Review Comment:
   It seems to me `clusterBuilder` should be removed. Also, the temporary local 
variables (`type`, `brokers`, etc ) can be removed. We have a builder already 
and hence we should leverage it to collect all variables. for example:
   ```java
   ClusterConfig.Builder builder = ClusterConfig.builder();
   if (annot.clusterType() == Type.DEFAULT) {
   builder.type(defaults.clusterType());
   } else {
   builder.type(annot.clusterType());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]

2024-04-19 Thread via GitHub


chia7712 commented on PR #15719:
URL: https://github.com/apache/kafka/pull/15719#issuecomment-2066133702

   @brandboat this PR is great. However, I'd like to merge it after #15569. 
#15569 is a huge PR which refactor the `KafkaConfig` and `LogConfig`, and I try 
to alleviate the pain of fixing conflicts continually.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]

2024-04-19 Thread via GitHub


brandboat commented on PR #15719:
URL: https://github.com/apache/kafka/pull/15719#issuecomment-2066136873

   > @brandboat this PR is great. However, I'd like to merge it after 
https://github.com/apache/kafka/pull/15569. 
https://github.com/apache/kafka/pull/15569 is a huge PR which refactor the 
KafkaConfig and LogConfig, and I try to alleviate the pain of fixing conflicts 
continually.
   
   No problem ! Thanks for the notification !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-19 Thread via GitHub


FrankYang0529 commented on PR #15745:
URL: https://github.com/apache/kafka/pull/15745#issuecomment-2066132584

   Hi @gaurav-narula and @chia7712, I have addressed all comments. Thanks for 
your review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-19 Thread via GitHub


showuon commented on PR #15616:
URL: https://github.com/apache/kafka/pull/15616#issuecomment-2066130114

   @johnnychhsu , do you have any other comments? I'll merge this at the 
weekend if no other comments. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-19 Thread via GitHub


chia7712 commented on PR #15569:
URL: https://github.com/apache/kafka/pull/15569#issuecomment-2066120621

   ```
   [2024-04-18T17:26:16.149Z] [ant:checkstyle] [ERROR] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-15569/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java:22:1:
 Disallowed import - 
org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1. [ImportControl]
   
   [2024-04-18T17:26:16.149Z] [ant:checkstyle] [ERROR] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-15569/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java:23:1:
 Disallowed import - 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX. 
[ImportControl]
   
   ```
   there are imports error :_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-19 Thread via GitHub


showuon commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1572018203


##
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java:
##
@@ -881,10 +937,18 @@ public List> recordBatches() {
 new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
 
 TestUtils.waitForCondition(() -> driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
-"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION 
state");
+"Waiting for KRaftMigrationDriver to enter DUAL_WRITE 
state");
 
 assertEquals(expectedBatchCount, batchesPassedToController.size());
 assertEquals(expectedRecordCount, 
batchesPassedToController.stream().mapToInt(List::size).sum());
 }
 }
+
+// Wait until the driver has recovered MigrationState From ZK. This is to 
simulate the driver needs to be installed as the metadata publisher
+// so that it can receive onControllerChange (KRaftLeaderEvent) and 
onMetadataUpdate (MetadataChangeEvent) events.
+private void 
startAndWaitForRecoveringMigrationStateFromZK(KRaftMigrationDriver driver) 
throws InterruptedException {
+driver.start();
+TestUtils.waitForCondition(() -> driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.INACTIVE),
+"Waiting for KRaftMigrationDriver to enter INACTIVE state");

Review Comment:
   This is necessary now because in the test suite, we might invoke 
`onControllerChange` to append `KRaftLeaderEvent` before the 
`RecoverMigrationStateFromZKEvent` is appended. This won't happen in practice 
because the driver needs to wait until `RecoverMigrationStateFromZKEvent` 
completed to register metadata publisher to receive `KRaftLeaderEvent` and 
`MetadataChangeEvent`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-19 Thread via GitHub


showuon commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1572018955


##
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java:
##
@@ -881,10 +937,18 @@ public List> recordBatches() {
 new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
 
 TestUtils.waitForCondition(() -> driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
-"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION 
state");
+"Waiting for KRaftMigrationDriver to enter DUAL_WRITE 
state");

Review Comment:
   Side fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15838: ExtractField and InsertField NULL Values are replaced by default value even in NULLABLE fields [kafka]

2024-04-19 Thread via GitHub


mfvitale commented on PR #15756:
URL: https://github.com/apache/kafka/pull/15756#issuecomment-2066045097

   > Hi @mfvitale, thanks for the PR! This is adding new configurations to 
transformations so this will require a 
[KIP](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals).
   
   @mimaison actually `key.converter.replace.null.with.default` and 
`value.converter.replace.null.with.default` are not new configuration but are 
the one used by `JsonConverter` added with 
https://github.com/apache/kafka/pull/13419


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15838: ExtractField and InsertField NULL Values are replaced by default value even in NULLABLE fields [kafka]

2024-04-19 Thread via GitHub


mimaison commented on PR #15756:
URL: https://github.com/apache/kafka/pull/15756#issuecomment-2066023794

   Hi @mfvitale, thanks for the PR!
   This is adding new configurations to transformations so this will require a 
[KIP](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-19 Thread via GitHub


showuon commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1571981454


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -786,12 +773,29 @@ public void run() throws Exception {
 }
 }
 
+class RecoverMigrationStateFromZKEvent extends MigrationEvent {
+@Override
+public void run() throws Exception {
+applyMigrationOperation("Recovering migration state from ZK", 
zkMigrationClient::getOrCreateMigrationRecoveryState);

Review Comment:
   Good suggestion. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-19 Thread via GitHub


showuon commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1571963909


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -786,12 +773,29 @@ public void run() throws Exception {
 }
 }
 
+class RecoverMigrationStateFromZKEvent extends MigrationEvent {
+@Override
+public void run() throws Exception {
+applyMigrationOperation("Recovering migration state from ZK", 
zkMigrationClient::getOrCreateMigrationRecoveryState);
+String maybeDone = 
migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done";
+log.info("Initial migration of ZK metadata is {}.", maybeDone);
+
+// Once we've recovered the migration state from ZK, install this 
class as a metadata publisher
+// by calling the initialZkLoadHandler.
+initialZkLoadHandler.accept(KRaftMigrationDriver.this);
+
+// Transition to INACTIVE state and wait for leadership events.
+transitionTo(MigrationDriverState.INACTIVE);
+}
+}
+
 class PollEvent extends MigrationEvent {
+
 @Override
 public void run() throws Exception {
 switch (migrationState) {
 case UNINITIALIZED:
-recoverMigrationStateFromZK();
+eventQueue.append(new RecoverMigrationStateFromZKEvent());

Review Comment:
   No need. Like I said in this 
[comment](https://github.com/apache/kafka/pull/15732#discussion_r1570596642), 
in the `UNINITIALIZED` state, the only event we will receive is the 
`pollEvent`. We'll receive  additional`onControllerChange` (`KRaftLeaderEvent`) 
and `onMetadataUpdate` (`MetadataChangeEvent`) after completing 
`RecoverMigrationStateFromZKEvent`. So, we don't have to worry about the order 
at this moment. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-19 Thread via GitHub


chia7712 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2065947030

   > Do you think that we should revert unstable.api.versions.enable change and 
try again? Thanks.
   
   Yep 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Use Parametrized types correctly in RemoteLogMetadataSerde [kafka]

2024-04-19 Thread via GitHub


jlprat merged PR #13824:
URL: https://github.com/apache/kafka/pull/13824


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15963) Flaky test: testBrokerHeartbeatDuringMigration [3] 3.6-IV0 – org.apache.kafka.controller.QuorumControllerTest

2024-04-19 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838883#comment-17838883
 ] 

Josep Prat commented on KAFKA-15963:


it failed again but 3.6-IV1 this time: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/2/testReport/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_21_and_Scala_2_13testBrokerHeartbeatDuringMigration_MetadataVersion__metadataVersion_3_6_IV1_/

> Flaky test: testBrokerHeartbeatDuringMigration [3] 3.6-IV0 – 
> org.apache.kafka.controller.QuorumControllerTest
> -
>
> Key: KAFKA-15963
> URL: https://issues.apache.org/jira/browse/KAFKA-15963
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apoorv Mittal
>Assignee: Ashwin Pankaj
>Priority: Major
>
> PR build: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14767/15/tests/]
>  
> {code:java}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NotControllerException: No controller appears 
> to be active.Stacktracejava.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NotControllerException: No controller appears 
> to be active.  at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
>at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
> at 
> org.apache.kafka.controller.QuorumControllerTest.testBrokerHeartbeatDuringMigration(QuorumControllerTest.java:1725)
>   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568)   at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>   at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
>   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
> at 
> org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
>  at 
> org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Use Parametrized types correctly in RemoteLogMetadataSerde [kafka]

2024-04-19 Thread via GitHub


jlprat commented on PR #13824:
URL: https://github.com/apache/kafka/pull/13824#issuecomment-2065886831

   All tests failing for this build were known flaky tests. Merging


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >