Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]
omkreddy commented on code in PR #15656: URL: https://github.com/apache/kafka/pull/15656#discussion_r1573153281 ## server/src/main/java/org/apache/kafka/server/config/KafkaSecurityConfigs.java: ## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslClientAuth; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder; + +import java.util.List; +import java.util.Locale; + +/** + * Common home for broker-side security configs which need to be accessible from the libraries shared + * between the broker and the multiple modules in Kafka. + * + * Note this is an internal API and subject to change without notice. + */ +public class KafkaSecurityConfigs { + +/** * SSL Configuration / +public final static String SSL_PROTOCOL_CONFIG = SslConfigs.SSL_PROTOCOL_CONFIG; +public final static String SSL_PROTOCOL_DOC = SslConfigs.SSL_PROTOCOL_DOC; +public static final String SSL_PROTOCOL_DEFAULT = SslConfigs.DEFAULT_SSL_PROTOCOL; + +public final static String SSL_PROVIDER_CONFIG = SslConfigs.SSL_PROVIDER_CONFIG; +public final static String SSL_PROVIDER_DOC = SslConfigs.SSL_PROVIDER_DOC; + +public final static String SSL_CIPHER_SUITES_CONFIG = SslConfigs.SSL_CIPHER_SUITES_CONFIG; +public final static String SSL_CIPHER_SUITES_DOC = SslConfigs.SSL_CIPHER_SUITES_DOC; + +public final static String SSL_ENABLED_PROTOCOLS_CONFIG = SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG; +public final static String SSL_ENABLED_PROTOCOLS_DOC = SslConfigs.SSL_ENABLED_PROTOCOLS_DOC; +public static final String SSL_ENABLED_PROTOCOLS_DEFAULTS = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS; + +public final static String SSL_KEYSTORE_TYPE_CONFIG = SslConfigs.SSL_KEYSTORE_TYPE_CONFIG; +public final static String SSL_KEYSTORE_TYPE_DOC = SslConfigs.SSL_KEYSTORE_TYPE_DOC; +public static final String SSL_KEYSTORE_TYPE_DEFAULT = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE; + +public final static String SSL_KEYSTORE_LOCATION_CONFIG = SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG; +public final static String SSL_KEYSTORE_LOCATION_DOC = SslConfigs.SSL_KEYSTORE_LOCATION_DOC; + +public final static String SSL_KEYSTORE_PASSWORD_CONFIG = SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG; +public final static String SSL_KEYSTORE_PASSWORD_DOC = SslConfigs.SSL_KEYSTORE_PASSWORD_DOC; + +public final static String SSL_KEY_PASSWORD_CONFIG = SslConfigs.SSL_KEY_PASSWORD_CONFIG; +public final static String SSL_KEY_PASSWORD_DOC = SslConfigs.SSL_KEY_PASSWORD_DOC; + +public final static String SSL_KEYSTORE_KEY_CONFIG = SslConfigs.SSL_KEYSTORE_KEY_CONFIG; +public final static String SSL_KEYSTORE_KEY_DOC = SslConfigs.SSL_KEYSTORE_KEY_DOC; + +public final static String SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG = SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG; +public final static String SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC = SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC; +public final static String SSL_TRUSTSTORE_TYPE_CONFIG = SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG; +public final static String SSL_TRUSTSTORE_TYPE_DOC = SslConfigs.SSL_TRUSTSTORE_TYPE_DOC; +public static final String SSL_TRUSTSTORE_TYPE_DEFAULT = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE; + +public final static String SSL_TRUSTSTORE_LOCATION_CONFIG = SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; +public final static String SSL_TRUSTSTORE_PASSWORD_DOC = SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC; + +public final static String SSL_TRUSTSTORE_PASSWORD_CONFIG = SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG; +public final static String SSL_TRUSTSTORE_LOCATION_DOC = SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC; + +public final static String SSL_TRUSTSTORE_CERTIFICATES_CONFIG =
Re: [PR] KAFKA-16211: Inconsistent config values in CreateTopicsResult and DescribeConfigsResult [kafka]
infantlikesprogramming commented on PR #15696: URL: https://github.com/apache/kafka/pull/15696#issuecomment-2067538939 @chia7712 Thanks for the reply. I have tried the following code and received the results. Each time I run the code, the `DescribeTopicsResult` gives a different configurations of the brokers in my cluster. ![image](https://github.com/apache/kafka/assets/60119105/04bb4ae3-98f6-4540-8b18-652ad67f00b7) ![image](https://github.com/apache/kafka/assets/60119105/00a7f8c0-ddd2-4098-8a14-03a6173a8cf3) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16588) broker shutdown hangs when `log.segment.delete.delay.ms` is zero
[ https://issues.apache.org/jira/browse/KAFKA-16588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839170#comment-17839170 ] PoAn Yang commented on KAFKA-16588: --- Hi [~chia7712], I'm interested in this. May I assign to myself? Thank you. > broker shutdown hangs when `log.segment.delete.delay.ms` is zero > - > > Key: KAFKA-16588 > URL: https://issues.apache.org/jira/browse/KAFKA-16588 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > see > [https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154] > If `log.segment.delete.delay.ms` is zero, We call `take` even though the > `logsToBeDeleted` is empty, and `KafkaScheduler#shutdown` call `shutdown` > rather than `shudownNow` > ([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)] > Hence, the thread won't be completed forever, and it blocks the shutdown of > broker. > We should replace the `take` by `poll` since we have checked the element > before. > BTW, the zero is allowed > ([https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java#L258]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15585) DescribeTopic API
[ https://issues.apache.org/jira/browse/KAFKA-15585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu resolved KAFKA-15585. Resolution: Fixed > DescribeTopic API > - > > Key: KAFKA-15585 > URL: https://issues.apache.org/jira/browse/KAFKA-15585 > Project: Kafka > Issue Type: Sub-task >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > > Adding the new DescribeTopic API + the admin client and server-side handling. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1573016246 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 1) +@Measurement(iterations = 0) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"true", "false"}) +private boolean simulateRebalanceTrigger; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +createAssignmentSpec(); + +partitionAssignor = assignorType.assignor(); + +if (simulateRebalanceTrigger) { +simulateIncrementalRebalance(topicMetadata); +} +} + +private Map createTopicMetadata() { +Map topicMetadata = new
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1573016609 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 1) +@Measurement(iterations = 0) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"true", "false"}) +private boolean simulateRebalanceTrigger; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +createAssignmentSpec(); + +partitionAssignor = assignorType.assignor(); + +if (simulateRebalanceTrigger) { +simulateIncrementalRebalance(topicMetadata); +} +} + +private Map createTopicMetadata() { +Map topicMetadata = new
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1573016246 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 1) +@Measurement(iterations = 0) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"true", "false"}) +private boolean simulateRebalanceTrigger; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +createAssignmentSpec(); + +partitionAssignor = assignorType.assignor(); + +if (simulateRebalanceTrigger) { +simulateIncrementalRebalance(topicMetadata); +} +} + +private Map createTopicMetadata() { +Map topicMetadata = new
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
jeqo commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1572995864 ## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/MultiFieldPaths.java: ## @@ -0,0 +1,581 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Schema.Type; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.util.SchemaUtil; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Multiple field paths to access data objects ({@code Struct} or {@code Map}) efficiently, + * instead of multiple individual {@link SingleFieldPath single-field paths}. + * + * If the SMT requires accessing a single field on the same data object, + * use {@link SingleFieldPath} instead. + * + * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821 + * @see SingleFieldPath + * @see FieldSyntaxVersion + */ +public class MultiFieldPaths { +final Trie trie = new Trie(); + +MultiFieldPaths(Set paths) { +paths.forEach(trie::insert); +} + +public static MultiFieldPaths of(List fields, FieldSyntaxVersion syntaxVersion) { +return new MultiFieldPaths(fields.stream() +.map(f -> new SingleFieldPath(f, syntaxVersion)) +.collect(Collectors.toSet())); +} + +/** + * Find values at the field paths + * + * @param struct data value + * @return map of field paths and field/values + */ +public Map> fieldAndValuesFrom(Struct struct) { +if (trie.isEmpty()) return Collections.emptyMap(); +return findFieldAndValues(struct, trie.root, new HashMap<>()); +} + +private Map> findFieldAndValues( +Struct originalValue, +TrieNode trieAt, +Map> fieldAndValueMap +) { +for (Map.Entry step : trieAt.steps().entrySet()) { +Field field = originalValue.schema().field(step.getKey()); +if (step.getValue().isLeaf()) { +Map.Entry fieldAndValue = +field != null +? new AbstractMap.SimpleImmutableEntry<>(field, originalValue.get(field)) +: null; +fieldAndValueMap.put(step.getValue().path, fieldAndValue); +} else { +if (field.schema().type() == Type.STRUCT) { +findFieldAndValues( +originalValue.getStruct(field.name()), +step.getValue(), +fieldAndValueMap +); +} +} +} +return fieldAndValueMap; +} + +/** + * Find values at the field paths + * + * @param value data value + * @return map of field paths and field/values + */ +public Map> fieldAndValuesFrom(Map value) { +if (trie.isEmpty()) return Collections.emptyMap(); +return findFieldAndValues(value, trie.root, new HashMap<>()); +} + +@SuppressWarnings("unchecked") +private Map> findFieldAndValues( +Map value, +TrieNode trieAt, +Map> fieldAndValueMap +) { +for (Map.Entry step : trieAt.steps().entrySet()) { +Object fieldValue = value.get(step.getKey()); +if (step.getValue().isLeaf()) { +fieldAndValueMap.put( +step.getValue().path, +new AbstractMap.SimpleImmutableEntry<>(step.getKey(), fieldValue) +); +} else { +if (fieldValue instanceof Map) { +findFieldAndValues( +(Map) fieldValue, +step.getValue(), +fieldAndValueMap +
Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]
emitskevich-blp commented on PR #15722: URL: https://github.com/apache/kafka/pull/15722#issuecomment-2067299812 > we should verify the non-deprecated metrics should have correct doc which is not marked as "deprecated". Also, that is what you try to fix, right? Correct, this is the goal, yes. I got wrong what you mean by "doc". Added test to validate metric descriptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16589) Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close
Chia-Ping Tsai created KAFKA-16589: -- Summary: Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close Key: KAFKA-16589 URL: https://issues.apache.org/jira/browse/KAFKA-16589 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai Sometimes we close the admin created by `createAdminClient`, and sometimes we don't. That is not a true problem since the `ClusterInstance` will call `close` when stopping. However, that cause a lot of inconsistent code, and in fact it does not save much time since creating a Admin is not a hard work. We can get `bootstrapServers` and `bootstrapControllers` from `ClusterInstance` easily. {code:java} // before try (Admin admin = cluster.createAdminClient()) { } // after v0 try (Admin admin = Admin.create(Collections.singletonMap( CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers( {} {code} Personally, the `after` version is not verbose, but we can have alternatives: `Map clientConfigs`. {code:java} // after v1 try (Admin admin = Admin.create(cluster.clientConfigs())) {}{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: example.com moved [kafka]
akatona84 commented on code in PR #15758: URL: https://github.com/apache/kafka/pull/15758#discussion_r1572917082 ## clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java: ## @@ -61,7 +61,7 @@ public void testParseAndValidateAddressesWithReverseLookup() { assertTrue(validatedAddresses.size() >= 1, "Unexpected addresses " + validatedAddresses); List validatedHostNames = validatedAddresses.stream().map(InetSocketAddress::getHostName) .collect(Collectors.toList()); -List expectedHostNames = asList("93.184.216.34", "2606:2800:220:1:248:1893:25c8:1946"); +List expectedHostNames = asList("93.184.215.14", "2606:2800:220:1:248:1893:25c8:1946"); Review Comment: Yeah, it's changing back n forth. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1572912365 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -16,7 +16,15 @@ */ package org.apache.kafka.tools.consumer.group; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; import kafka.utils.TestUtils; Review Comment: Could you remove the usage of `TestUtils`? the method `subscribeAndWaitForRecords` can be rewrite easily as it seems to poll records only. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16082) Broker recreates reassigned partition after logdir failure
[ https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839107#comment-17839107 ] Chia-Ping Tsai commented on KAFKA-16082: [~gnarula] Could you file PR for branch 3.7? > Broker recreates reassigned partition after logdir failure > -- > > Key: KAFKA-16082 > URL: https://issues.apache.org/jira/browse/KAFKA-16082 > Project: Kafka > Issue Type: Sub-task > Components: jbod >Affects Versions: 3.7.0 >Reporter: Proven Provenzano >Assignee: Gaurav Narula >Priority: Critical > Fix For: 3.8.0, 3.7.1 > > > There is a possible dataloss scenario > when using JBOD, > when moving the partition leader log from one directory to another on the > same broker, > when after the destination log has caught up to the source log and after the > broker has sent an update to the partition assignment > if the broker accepts and commits a new record for the partition and then the > broker restarts and the original partition leader log is lost > then the destination log would not contain the new record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 merged PR #15136: URL: https://github.com/apache/kafka/pull/15136 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on PR #15136: URL: https://github.com/apache/kafka/pull/15136#issuecomment-2067228882 the failed `testParseAndValidateAddressesWithReverseLookup` is traced by #15758. will merge it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
chia7712 merged PR #15569: URL: https://github.com/apache/kafka/pull/15569 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]
chia7712 commented on PR #15719: URL: https://github.com/apache/kafka/pull/15719#issuecomment-2067224991 @brandboat please fix the conflicts, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
chia7712 commented on PR #15569: URL: https://github.com/apache/kafka/pull/15569#issuecomment-2067222576 The failed test `testParseAndValidateAddressesWithReverseLookup` will get fixed by #15758. I will merge this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: example.com moved [kafka]
chia7712 commented on code in PR #15758: URL: https://github.com/apache/kafka/pull/15758#discussion_r1572897153 ## clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java: ## @@ -61,7 +61,7 @@ public void testParseAndValidateAddressesWithReverseLookup() { assertTrue(validatedAddresses.size() >= 1, "Unexpected addresses " + validatedAddresses); List validatedHostNames = validatedAddresses.stream().map(InetSocketAddress::getHostName) .collect(Collectors.toList()); -List expectedHostNames = asList("93.184.216.34", "2606:2800:220:1:248:1893:25c8:1946"); +List expectedHostNames = asList("93.184.215.14", "2606:2800:220:1:248:1893:25c8:1946"); Review Comment: That is weird. What I see on my local is `93.184.215.14, 2606:2800:21f:cb07:6820:80da:af6b:8b2c`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah commented on PR #15744: URL: https://github.com/apache/kafka/pull/15744#issuecomment-2067204988 Updated to include a CheckOp on the `/controller` ZNode. We don't both using the controller epoch since it is not straightforward to consistently read the controller and controller epoch from a non-controller broker. We can achieve the same fencing by checking that the `/controller` ZNode doesn't change during the method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16507 Add raw record into RecordDeserialisationException [kafka]
AndrewJSchofield commented on code in PR #15691: URL: https://github.com/apache/kafka/pull/15691#discussion_r1572845716 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java: ## @@ -311,25 +312,33 @@ ConsumerRecord parseRecord(Deserializers deserializers, Optional leaderEpoch, TimestampType timestampType, Record record) { +long offset = record.offset(); +long timestamp = record.timestamp(); +ByteBuffer keyBytes = record.key(); +ByteBuffer valueBytes = record.value(); +Headers headers = new RecordHeaders(record.headers()); +K key; +V value; try { -long offset = record.offset(); -long timestamp = record.timestamp(); -Headers headers = new RecordHeaders(record.headers()); -ByteBuffer keyBytes = record.key(); -K key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes); -ByteBuffer valueBytes = record.value(); -V value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes); -return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, -timestamp, timestampType, -keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(), -valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(), -key, value, headers, leaderEpoch); +key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes); } catch (RuntimeException e) { -log.error("Deserializers with error: {}", deserializers); -throw new RecordDeserializationException(partition, record.offset(), -"Error deserializing key/value for partition " + partition + +throw new KeyDeserializationException(partition, offset, keyBytes, valueBytes, headers, record.timestamp(), Review Comment: I think you need a TimestampType also on the constructor for the `RecordDeserializationException`. ## clients/src/main/java/org/apache/kafka/common/errors/KeyDeserializationException.java: ## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Headers; + +import java.nio.ByteBuffer; + +public class KeyDeserializationException extends RecordDeserializationException { +private final static long serialVersionUID = 1L; +public KeyDeserializationException(TopicPartition partition, long offset, ByteBuffer key, ByteBuffer value, Headers headers, long timestamp, String message, Throwable cause) { Review Comment: Please split this enormously long line :) ## clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java: ## @@ -17,21 +17,55 @@ package org.apache.kafka.common.errors; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.utils.Utils; + +import java.nio.ByteBuffer; /** * This exception is raised for any error that occurs while deserializing records received by the consumer using * the configured {@link org.apache.kafka.common.serialization.Deserializer}. */ public class RecordDeserializationException extends SerializationException { -private static final long serialVersionUID = 1L; +private static final long serialVersionUID = 2L; private final TopicPartition partition; private final long offset; +private final long timestamp; +private final ByteBuffer key; +private final ByteBuffer value; +private final Headers headers; -public RecordDeserializationException(TopicPartition partition, long offset, String message, Throwable cause) { +@Deprecated +public
[jira] [Commented] (KAFKA-16493) Avoid unneeded subscription regex check if metadata version unchanged
[ https://issues.apache.org/jira/browse/KAFKA-16493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839100#comment-17839100 ] Phuc Hong Tran commented on KAFKA-16493: [~lianetm] I’ll come back to this ticket this weekend and will try to get a pull request asap. Sorry for the delay > Avoid unneeded subscription regex check if metadata version unchanged > - > > Key: KAFKA-16493 > URL: https://issues.apache.org/jira/browse/KAFKA-16493 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > When using pattern subscription (java pattern), the new consumer regularly > checks if the list of topics that match the regex has changed. This is done > as part of the consumer poll loop, and it evaluates the regex using the > latest cluster metadata. As an improvement, we should avoid evaluating the > regex if the metadata version hasn't changed (similar to what the legacy > coordinator does > [here|https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L435C10-L435C41]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16493) Avoid unneeded subscription regex check if metadata version unchanged
[ https://issues.apache.org/jira/browse/KAFKA-16493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839096#comment-17839096 ] Lianet Magrans commented on KAFKA-16493: Hey [~phuctran], any progress on this one? Even though it's a performance improvement I'm afraid it's a very sensitive one given that it would affect the poll loop, so we need to make sure it makes it into 3.8 (it had the wrong fix version before, I just updated it). Let me know if you have any questions. Thanks! > Avoid unneeded subscription regex check if metadata version unchanged > - > > Key: KAFKA-16493 > URL: https://issues.apache.org/jira/browse/KAFKA-16493 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848-client-support > Fix For: 4.0.0 > > > When using pattern subscription (java pattern), the new consumer regularly > checks if the list of topics that match the regex has changed. This is done > as part of the consumer poll loop, and it evaluates the regex using the > latest cluster metadata. As an improvement, we should avoid evaluating the > regex if the metadata version hasn't changed (similar to what the legacy > coordinator does > [here|https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L435C10-L435C41]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16493) Avoid unneeded subscription regex check if metadata version unchanged
[ https://issues.apache.org/jira/browse/KAFKA-16493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16493: --- Fix Version/s: 3.8.0 (was: 4.0.0) > Avoid unneeded subscription regex check if metadata version unchanged > - > > Key: KAFKA-16493 > URL: https://issues.apache.org/jira/browse/KAFKA-16493 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > When using pattern subscription (java pattern), the new consumer regularly > checks if the list of topics that match the regex has changed. This is done > as part of the consumer poll loop, and it evaluates the regex using the > latest cluster metadata. As an improvement, we should avoid evaluating the > regex if the metadata version hasn't changed (similar to what the legacy > coordinator does > [here|https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L435C10-L435C41]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1572801229 ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerRegistration = getControllerRegistration match { + case Some(registration) => registration + case None => +// This case is mainly here to make tests less flaky. In practice, there will always be a /controller ZNode +throw new ControllerMovedException(s"Cannot set entity configs when there is no controller.") +} + +// If there is a KRaft controller defined, don't even attempt this write. The broker will soon get a UMR +// from the new KRaft controller that lets it know about the new controller. It will then forward +// IncrementalAlterConfig requests instead of processing directly. +if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) { + throw new ControllerMovedException(s"Cannot set entity configs directly when there is a KRaft controller.") +} def set(configData: Array[Byte]): SetDataResponse = { val setDataRequest = SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName), configData, ZkVersion.MatchAnyVersion) - retryRequestUntilConnected(setDataRequest) + if (controllerRegistration.zkVersion > 0) { +// Pass the zkVersion previously captured to ensure the controller hasn't changed to KRaft while +// this method was processing. +retryRequestUntilConnected(setDataRequest, controllerRegistration.zkVersion) + } else { +retryRequestUntilConnected(setDataRequest) Review Comment: On second thought, this isn't quite right. I need to rework this a bit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16486) Integrate metric measurability changes in metrics collector
[ https://issues.apache.org/jira/browse/KAFKA-16486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-16486. - Fix Version/s: 3.8.0 Resolution: Done > Integrate metric measurability changes in metrics collector > --- > > Key: KAFKA-16486 > URL: https://issues.apache.org/jira/browse/KAFKA-16486 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16486: Integrate KIP-1019 measurability changes (KIP-714) [kafka]
mjsax merged PR #15682: URL: https://github.com/apache/kafka/pull/15682 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1572780325 ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerRegistration = getControllerRegistration match { + case Some(registration) => registration + case None => +// This case is mainly here to make tests less flaky. In practice, there will always be a /controller ZNode +throw new ControllerMovedException(s"Cannot set entity configs when there is no controller.") +} + +// If there is a KRaft controller defined, don't even attempt this write. The broker will soon get a UMR +// from the new KRaft controller that lets it know about the new controller. It will then forward +// IncrementalAlterConfig requests instead of processing directly. +if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) { + throw new ControllerMovedException(s"Cannot set entity configs directly when there is a KRaft controller.") +} def set(configData: Array[Byte]): SetDataResponse = { val setDataRequest = SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName), configData, ZkVersion.MatchAnyVersion) - retryRequestUntilConnected(setDataRequest) + if (controllerRegistration.zkVersion > 0) { +// Pass the zkVersion previously captured to ensure the controller hasn't changed to KRaft while +// this method was processing. +retryRequestUntilConnected(setDataRequest, controllerRegistration.zkVersion) + } else { +retryRequestUntilConnected(setDataRequest) Review Comment: This is essentially the same as the match None case above. In our integration tests, we can (and apparently do) set configs in ZK before the controller gets elected for the first time. This was kept to avoid breaking a bunch of tests. Historically, setting configs will never fail (last writer wins) so neither the client nor broker implement any retries. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1572777334 ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerRegistration = getControllerRegistration match { + case Some(registration) => registration + case None => +// This case is mainly here to make tests less flaky. In practice, there will always be a /controller ZNode +throw new ControllerMovedException(s"Cannot set entity configs when there is no controller.") +} + +// If there is a KRaft controller defined, don't even attempt this write. The broker will soon get a UMR +// from the new KRaft controller that lets it know about the new controller. It will then forward +// IncrementalAlterConfig requests instead of processing directly. +if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) { + throw new ControllerMovedException(s"Cannot set entity configs directly when there is a KRaft controller.") +} def set(configData: Array[Byte]): SetDataResponse = { val setDataRequest = SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName), configData, ZkVersion.MatchAnyVersion) - retryRequestUntilConnected(setDataRequest) + if (controllerRegistration.zkVersion > 0) { Review Comment: Yes. This makes the update here more like the updates made from the ZK controller. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1572776248 ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -950,16 +980,47 @@ class ZkMigrationIntegrationTest { dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get } - def alterTopicConfig(admin: Admin): AlterConfigsResult = { + def alterBrokerConfigs(admin: Admin): Unit = { +val defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, "") +val defaultBrokerConfigs = Seq( + new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "8640"), AlterConfigOp.OpType.SET), +).asJavaCollection +val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0") +val broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1") +val specificBrokerConfigs = Seq( + new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "4320"), AlterConfigOp.OpType.SET), +).asJavaCollection + +TestUtils.retry(6) { + val result = admin.incrementalAlterConfigs(Map( +defaultBrokerResource -> defaultBrokerConfigs, +broker0Resource -> specificBrokerConfigs, +broker1Resource -> specificBrokerConfigs + ).asJava) + try { +result.all().get(10, TimeUnit.SECONDS) + } catch { +case t: Throwable => fail("Alter Broker Configs had an error", t) + } +} Review Comment: > Could we verify that a KRaft controller is ready before testing this operation to make the outcome more predictable? I tried this at first, but we don't actually expose this to the client (whether the controller is ZK or KRaft). In fact, we lie to the client and tell it that a random broker is the controller. I could have "pierced the veil" to access the underlying controller classes, but a retry seemed easier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1572735673 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 1) +@Measurement(iterations = 0) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"true", "false"}) +private boolean simulateRebalanceTrigger; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +createAssignmentSpec(); + +partitionAssignor = assignorType.assignor(); + +if (simulateRebalanceTrigger) { +simulateIncrementalRebalance(topicMetadata); +} +} + +private Map createTopicMetadata() { +Map topicMetadata = new
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1572729047 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java: ## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.consumer.VersionedMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class TargetAssignmentBuilderBenchmark { + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +private static final String GROUP_ID = "benchmark-group"; + +private static final int GROUP_EPOCH = 0; + +private PartitionAssignor partitionAssignor; + +private Map subscriptionMetadata = Collections.emptyMap(); + +private TargetAssignmentBuilder targetAssignmentBuilder; + +private AssignmentSpec assignmentSpec; + +private final List allTopicNames = new ArrayList<>(topicCount); + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +// For this benchmark we will use the Uniform Assignor +// and a group that has a homogeneous subscription model. +partitionAssignor = new UniformAssignor(); + +subscriptionMetadata = generateMockSubscriptionMetadata(); +Map members = generateMockMembers(); +Map existingTargetAssignment = generateMockInitialTargetAssignment(); + +// Add a new member to trigger a rebalance. +Set subscribedTopics = new HashSet<>(subscriptionMetadata.keySet()); + +ConsumerGroupMember newMember = new ConsumerGroupMember.Builder("new-member") +.setSubscribedTopicNames(new ArrayList<>(subscribedTopics)) +.build(); + +targetAssignmentBuilder = new TargetAssignmentBuilder(GROUP_ID, GROUP_EPOCH, partitionAssignor) +.withMembers(members) +.withSubscriptionMetadata(subscriptionMetadata) +.withTargetAssignment(existingTargetAssignment) +.addOrUpdateMember(newMember.memberId(), newMember); +} + +private Map generateMockMembers() { +Map members = new HashMap<>(); + +for (int i = 0; i < memberCount - 1; i++)
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1572726906 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +/** + * The assignment type is decided based on whether all the members are assigned partitions + * for the first time (full), or incrementally when a rebalance is triggered. + */ +public enum AssignmentType { +FULL, INCREMENTAL +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"FULL", "INCREMENTAL"}) +private AssignmentType assignmentType; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +createAssignmentSpec(); + +
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
cmccabe commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1572722514 ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerRegistration = getControllerRegistration match { + case Some(registration) => registration + case None => +// This case is mainly here to make tests less flaky. In practice, there will always be a /controller ZNode +throw new ControllerMovedException(s"Cannot set entity configs when there is no controller.") +} + +// If there is a KRaft controller defined, don't even attempt this write. The broker will soon get a UMR +// from the new KRaft controller that lets it know about the new controller. It will then forward +// IncrementalAlterConfig requests instead of processing directly. +if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) { + throw new ControllerMovedException(s"Cannot set entity configs directly when there is a KRaft controller.") +} def set(configData: Array[Byte]): SetDataResponse = { val setDataRequest = SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName), configData, ZkVersion.MatchAnyVersion) - retryRequestUntilConnected(setDataRequest) + if (controllerRegistration.zkVersion > 0) { +// Pass the zkVersion previously captured to ensure the controller hasn't changed to KRaft while +// this method was processing. +retryRequestUntilConnected(setDataRequest, controllerRegistration.zkVersion) + } else { +retryRequestUntilConnected(setDataRequest) Review Comment: can you explain how we'd get to this branch of the "if" statement? Surely the controller znode always has a version? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
cmccabe commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1572717868 ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerRegistration = getControllerRegistration match { + case Some(registration) => registration + case None => +// This case is mainly here to make tests less flaky. In practice, there will always be a /controller ZNode +throw new ControllerMovedException(s"Cannot set entity configs when there is no controller.") +} + +// If there is a KRaft controller defined, don't even attempt this write. The broker will soon get a UMR +// from the new KRaft controller that lets it know about the new controller. It will then forward +// IncrementalAlterConfig requests instead of processing directly. +if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) { + throw new ControllerMovedException(s"Cannot set entity configs directly when there is a KRaft controller.") +} def set(configData: Array[Byte]): SetDataResponse = { val setDataRequest = SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName), configData, ZkVersion.MatchAnyVersion) - retryRequestUntilConnected(setDataRequest) + if (controllerRegistration.zkVersion > 0) { Review Comment: this also fixes the lack of fencing, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
cmccabe commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1572716567 ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerRegistration = getControllerRegistration match { + case Some(registration) => registration + case None => +// This case is mainly here to make tests less flaky. In practice, there will always be a /controller ZNode Review Comment: I was a bit surprised that `ControlledMovedException` was not retriable. Should we use a retriable exception here, or does it not matter? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
cmccabe commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1572716567 ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerRegistration = getControllerRegistration match { + case Some(registration) => registration + case None => +// This case is mainly here to make tests less flaky. In practice, there will always be a /controller ZNode Review Comment: I was a bit surprised that `ControlledMovedException` was not retriable. Should we use a retriable exception here, or does it not matter? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16588) broker shutdown hangs when `log.segment.delete.delay.ms` is zero
[ https://issues.apache.org/jira/browse/KAFKA-16588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-16588: --- Description: see [https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154] We call `take` even though the `logsToBeDeleted` is empty, and `KafkaScheduler#shutdown` call `shutdown` rather than `shudownNow` ([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)] Hence, the thread won't be completed forever, and it blocks the shutdown of broker. We should replace the `take` by `poll` since we have checked the element before. BTW, the zero is allowed (https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java#L258) was: see https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154 We call `take` even though the `logsToBeDeleted` is empty, and `KafkaScheduler#shutdown` call `shutdown` rather than `shudownNow` ([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)] Hence, the thread won't be completed forever, and it blocks the shutdown of broker. We should replace the `take` by `poll` since we have checked the element before. > broker shutdown hangs when `log.segment.delete.delay.ms` is zero > - > > Key: KAFKA-16588 > URL: https://issues.apache.org/jira/browse/KAFKA-16588 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > see > [https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154] > We call `take` even though the `logsToBeDeleted` is empty, and > `KafkaScheduler#shutdown` call `shutdown` rather than `shudownNow` > ([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)] > Hence, the thread won't be completed forever, and it blocks the shutdown of > broker. > We should replace the `take` by `poll` since we have checked the element > before. > BTW, the zero is allowed > (https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java#L258) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16588) broker shutdown hangs when `log.segment.delete.delay.ms` is zero
[ https://issues.apache.org/jira/browse/KAFKA-16588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-16588: --- Description: see [https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154] If `log.segment.delete.delay.ms` is zero, We call `take` even though the `logsToBeDeleted` is empty, and `KafkaScheduler#shutdown` call `shutdown` rather than `shudownNow` ([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)] Hence, the thread won't be completed forever, and it blocks the shutdown of broker. We should replace the `take` by `poll` since we have checked the element before. BTW, the zero is allowed ([https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java#L258]) was: see [https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154] We call `take` even though the `logsToBeDeleted` is empty, and `KafkaScheduler#shutdown` call `shutdown` rather than `shudownNow` ([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)] Hence, the thread won't be completed forever, and it blocks the shutdown of broker. We should replace the `take` by `poll` since we have checked the element before. BTW, the zero is allowed (https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java#L258) > broker shutdown hangs when `log.segment.delete.delay.ms` is zero > - > > Key: KAFKA-16588 > URL: https://issues.apache.org/jira/browse/KAFKA-16588 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > see > [https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154] > If `log.segment.delete.delay.ms` is zero, We call `take` even though the > `logsToBeDeleted` is empty, and `KafkaScheduler#shutdown` call `shutdown` > rather than `shudownNow` > ([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)] > Hence, the thread won't be completed forever, and it blocks the shutdown of > broker. > We should replace the `take` by `poll` since we have checked the element > before. > BTW, the zero is allowed > ([https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java#L258]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16588) broker shutdown hangs when `log.segment.delete.delay.ms` is zero
Chia-Ping Tsai created KAFKA-16588: -- Summary: broker shutdown hangs when `log.segment.delete.delay.ms` is zero Key: KAFKA-16588 URL: https://issues.apache.org/jira/browse/KAFKA-16588 Project: Kafka Issue Type: Bug Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai see https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154 We call `take` even though the `logsToBeDeleted` is empty, and `KafkaScheduler#shutdown` call `shutdown` rather than `shudownNow` ([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)] Hence, the thread won't be completed forever, and it blocks the shutdown of broker. We should replace the `take` by `poll` since we have checked the element before. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]
phooq commented on PR #15750: URL: https://github.com/apache/kafka/pull/15750#issuecomment-2066909600 Thanks for the feedback @lianetm ! I agree with the point about the back-and-forth jumping between the child and the parent coming with this implementation, however asking each child to override the `toString` does not completely eliminate the ping-pong right? Also, the existing code itself has the `RequestState` hardcoded in the base, which is not helpful when debugging with any of its child, so we either call `getClass().getSimpleName()` in every child's `toString` override, or just call it once in the base. The latter saves some work for us. Two additional value this implementation bring are: 1. It makes its easier to build the debug string in the proper format. You can see right now the `toString` of `OffsetFetchRequestState` has to build the "{}" correctly by itself, and to enforce the uniformity, each of `RequestState` children has to do the same. This is trivial from programming perspective, but if any of the format building is wrong, it makes reading the messages significantly harder. 2. It improves the readability of the debugging message. Imaging more inheritances comes into the tree of `RequestState`. Having each child overriding the `toString` gives us something like `childName1={childName2={childName2={}...base={}}}`. With this implementation, we will have a much simpler version like `childName={properties...}` I indeed agree that `toStringBase()` as a name looks a little confusing. It essentially is building the details of the object into a string, so maybe naming it as `getDetails()` is better? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16579) Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer
[ https://issues.apache.org/jira/browse/KAFKA-16579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16579. Resolution: Fixed > Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer > - > > Key: KAFKA-16579 > URL: https://issues.apache.org/jira/browse/KAFKA-16579 > Project: Kafka > Issue Type: Task > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Critical > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated > a slew of system tests to run both the "old" and "new" implementations. > KAFKA-16271 updated the system tests in {{consumer_rolling_upgrade_test.py}} > so it could test the new consumer. However, the test is tailored specifically > to the "old" Consumer's protocol and assignment strategy upgrade. > Unsurprisingly, when we run those system tests with the new > {{AsyncKafkaConsumer}}, we get errors like the following: > {code} > test_id: > kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 29.634 seconds > AssertionError("Mismatched assignment: {frozenset(), > frozenset({TopicPartition(topic='test_topic', partition=0), > TopicPartition(topic='test_topic', partition=1)})}") > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", > line 77, in rolling_update_test > self._verify_range_assignment(consumer) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", > line 41, in _verify_range_assignment > "Mismatched assignment: %s" % assignment > AssertionError: Mismatched assignment: {frozenset(), > frozenset({TopicPartition(topic='test_topic', partition=0), > TopicPartition(topic='test_topic', partition=1)})} > {code} > The task here is to revert the changes made in KAFKA-16271. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: example.com moved [kafka]
akatona84 commented on PR #15758: URL: https://github.com/apache/kafka/pull/15758#issuecomment-2066839773 ipv6 alternates between 2606:2800:21f:cb07:6820:80da:af6b:8b2c and the old one, it's hard to fix the test like this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16579: Revert Consumer Rolling Upgrade [kafka]
lianetm commented on code in PR #15753: URL: https://github.com/apache/kafka/pull/15753#discussion_r1572569170 ## tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py: ## @@ -56,12 +56,7 @@ def _verify_roundrobin_assignment(self, consumer): metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True, False] ) -@matrix( -metadata_quorum=quorum.all_kraft, -use_new_coordinator=[True], -group_protocol=consumer_group.all_group_protocols Review Comment: You're right, I was only concerned about the new coordinator + classic combination, missed that it's coming from the default and other combination, all good then. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16579: Revert Consumer Rolling Upgrade [kafka]
lianetm commented on code in PR #15753: URL: https://github.com/apache/kafka/pull/15753#discussion_r1572569170 ## tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py: ## @@ -56,12 +56,7 @@ def _verify_roundrobin_assignment(self, consumer): metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True, False] ) -@matrix( -metadata_quorum=quorum.all_kraft, -use_new_coordinator=[True], -group_protocol=consumer_group.all_group_protocols Review Comment: You're right, I was only concerned about the new coordinator + classic combination, missed that it's coming from the default, all good then. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
soarez commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1572557807 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -786,12 +773,29 @@ public void run() throws Exception { } } +class RecoverMigrationStateFromZKEvent extends MigrationEvent { +@Override +public void run() throws Exception { +applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); +String maybeDone = migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done"; +log.info("Initial migration of ZK metadata is {}.", maybeDone); + +// Once we've recovered the migration state from ZK, install this class as a metadata publisher +// by calling the initialZkLoadHandler. +initialZkLoadHandler.accept(KRaftMigrationDriver.this); + +// Transition to INACTIVE state and wait for leadership events. +transitionTo(MigrationDriverState.INACTIVE); +} +} + class PollEvent extends MigrationEvent { + @Override public void run() throws Exception { switch (migrationState) { case UNINITIALIZED: -recoverMigrationStateFromZK(); +eventQueue.append(new RecoverMigrationStateFromZKEvent()); Review Comment: On second thought, I don't think my question makes sense. The following `PollEvent` can only after `RecoverMigrationStateFromZKEvent` finishes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
soarez commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1572550218 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -786,12 +773,29 @@ public void run() throws Exception { } } +class RecoverMigrationStateFromZKEvent extends MigrationEvent { +@Override +public void run() throws Exception { +applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); +String maybeDone = migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done"; +log.info("Initial migration of ZK metadata is {}.", maybeDone); + +// Once we've recovered the migration state from ZK, install this class as a metadata publisher +// by calling the initialZkLoadHandler. +initialZkLoadHandler.accept(KRaftMigrationDriver.this); + +// Transition to INACTIVE state and wait for leadership events. +transitionTo(MigrationDriverState.INACTIVE); +} +} + class PollEvent extends MigrationEvent { + @Override public void run() throws Exception { switch (migrationState) { case UNINITIALIZED: -recoverMigrationStateFromZK(); +eventQueue.append(new RecoverMigrationStateFromZKEvent()); Review Comment: Are we allowing a race between the `RecoverMigrationStateFromZKEvent` and the next `PollEvent` scheduled after the switch? Maybe this could be more straightforward if we only schedule the next poll once `RecoverMigrationStateFromZKEvent` finishes, either normally or exceptionally? WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
lucasbru merged PR #15594: URL: https://github.com/apache/kafka/pull/15594 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16579: Revert Consumer Rolling Upgrade [kafka]
lucasbru commented on code in PR #15753: URL: https://github.com/apache/kafka/pull/15753#discussion_r1572534438 ## tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py: ## @@ -56,12 +56,7 @@ def _verify_roundrobin_assignment(self, consumer): metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True, False] ) -@matrix( -metadata_quorum=quorum.all_kraft, -use_new_coordinator=[True], -group_protocol=consumer_group.all_group_protocols Review Comment: Hey @lianetm. My understanding was that this does test with the new coordinator also after the update, and the default is group_protocol=classic, so this should be fine right? Unless you want to add `combined_kraft`, but not sure if that's an interesting case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
jeqo commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1572509302 ## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldPathNotationTest.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class FieldPathNotationTest { +final static String[] EMPTY_PATH = new String[] {}; + +@Test +void shouldBuildV1WithDotsAndBacktickPair() { +// Given v1 +// When path contains dots, then single step path +assertArrayEquals( +new String[] {"foo.bar.baz"}, +new SingleFieldPath("foo.bar.baz", FieldSyntaxVersion.V1).path()); +// When path contains backticks, then single step path +assertArrayEquals( +new String[] {"foo`bar`"}, +new SingleFieldPath("foo`bar`", FieldSyntaxVersion.V1).path()); +// When path contains dots and backticks, then single step path +assertArrayEquals( +new String[] {"foo.`bar.baz`"}, +new SingleFieldPath("foo.`bar.baz`", FieldSyntaxVersion.V1).path()); +} + +@Test +void shouldBuildV2WithEmptyPath() { +// Given v2 +// When path is empty +// Then build a path with no steps +assertArrayEquals(EMPTY_PATH, new SingleFieldPath("", FieldSyntaxVersion.V2).path()); +} + +@Test +void shouldBuildV2WithoutDots() { +// Given v2 +// When path without dots +// Then build a single step path +assertArrayEquals(new String[] {"foobarbaz"}, new SingleFieldPath("foobarbaz", FieldSyntaxVersion.V2).path()); +} + +@Test +void shouldBuildV2WhenIncludesDots() { +// Given v2 and fields without dots +// When path includes dots +// Then build a path with steps separated by dots +assertArrayEquals(new String[] {"foo", "bar", "baz"}, new SingleFieldPath("foo.bar.baz", FieldSyntaxVersion.V2).path()); +} + +@Test +void shouldBuildV2WithoutWrappingBackticks() { +// Given v2 and fields without dots +// When backticks are not wrapping a field name +// Then build a single step path including backticks +assertArrayEquals(new String[] {"foo`bar`baz"}, new SingleFieldPath("foo`bar`baz", FieldSyntaxVersion.V2).path()); +} + +@Test +void shouldBuildV2WhenIncludesDotsAndBacktickPair() { +// Given v2 and fields including dots +// When backticks are wrapping a field name (i.e. withing edges or between dots) +// Then build a path with steps separated by dots and not including backticks +assertArrayEquals(new String[] {"foo.bar.baz"}, new SingleFieldPath("`foo.bar.baz`", FieldSyntaxVersion.V2).path()); +assertArrayEquals(new String[] {"foo", "bar.baz"}, new SingleFieldPath("foo.`bar.baz`", FieldSyntaxVersion.V2).path()); +assertArrayEquals(new String[] {"foo.bar", "baz"}, new SingleFieldPath("`foo.bar`.baz", FieldSyntaxVersion.V2).path()); +assertArrayEquals(new String[] {"foo", "bar", "baz"}, new SingleFieldPath("foo.`bar`.baz", FieldSyntaxVersion.V2).path()); +} + +@Test +void shouldBuildV2AndIgnoreBackticksThatAreNotWrapping() { +// Given v2 and fields including dots and backticks +// When backticks are wrapping a field name (i.e. withing edges or between dots) +// Then build a path with steps separated by dots and including non-wrapping backticks +assertArrayEquals(new String[] {"foo", "`bar.baz"}, new SingleFieldPath("foo.``bar.baz`", FieldSyntaxVersion.V2).path()); +assertArrayEquals(new String[] {"foo", "bar.baz`"}, new SingleFieldPath("foo.`bar.baz``", FieldSyntaxVersion.V2).path()); +assertArrayEquals(new String[] {"foo", "ba`r.baz"}, new
Re: [PR] KAFKA-16579: Revert Consumer Rolling Upgrade [kafka]
lianetm commented on code in PR #15753: URL: https://github.com/apache/kafka/pull/15753#discussion_r1572508072 ## tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py: ## @@ -56,12 +56,7 @@ def _verify_roundrobin_assignment(self, consumer): metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True, False] ) -@matrix( -metadata_quorum=quorum.all_kraft, -use_new_coordinator=[True], -group_protocol=consumer_group.all_group_protocols Review Comment: Hey, sorry I'm late to this party, but important concern. Don't we want to keep testing this with the new coordinator and the classic protocol? I would say yes. I would expect we only want to exclude the consumer protocol, so we would want to keep the test with the params it had, but changing `group_protocol=consumer_group.classic` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]
lianetm commented on PR #15750: URL: https://github.com/apache/kafka/pull/15750#issuecomment-2066727111 Thanks for the info @kirktrue , I see you mentioned it was a "suggested pattern", and I won't hold a strong position here but still sharing how I see it (struggling to find a value in it that compensates the drawbacks) Drawbacks: - overcomplicated flow to understand where a class `toString` comes from (back-and-forth between a child class and its parent): child has no `toString`, jump to parent that has it but uses a `toStringBase`, defined in the parent but redefined in the child, so jump back to child where `toStringBase` is redefined. I find more intuitive/simpler to just have child classes with `toString` that uses a `toStringBase` defined in parent (like its name clearly indicates). Value it brings: > It allows the parent to keep its state private, but still "expose" it via toString() We get exactly the same in both alternatives. Parent keeps state private, exposed via toStringBase/toString > It helps to ensure the correct class name appears in the output This was actually not happening when I reviewed this PR the first time, and it was indeed confusing. Still with the fix to get the child class name in the parent, I wonder if it's worth the ping-pong? A child class using a wrong class name in the `toString` seems like an unlikely silly bug, easy to catch/fix during the initial development phase. > It keeps the output uniform and reminds the developer to keep the parent state A little bit more uniform, yes, but only as long as the developer does it right when overriding the `toStringBase` right? So we're just kind of shifting the responsibility from overriding the toString properly, to overriding the toStringBase properly (at the expense of the ping-pong). And we're not truly reminding the developer unless we have an abstract method I would say, which is not the case. Just sharing my thoughts, I may be bothered by the ping-pong flow more than others in this case ;) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lucasbru commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1572360676 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp) } + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed, even when no commit sync is performed as part of the close (due to auto-commit +// disabled, or simply because there no consumed offsets). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount) + } + + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first Review Comment: Okay got it. Yeah, the state "async commit pending because I do not know the coordinator yet" and the state "async commit is already sent to the coordinator" may cover two different cases. In fact, in the legacy consumer, these are two independent states the async commit can be in. In the async consumer as well, although waiting for the async commit to complete works independently of that state (since the different states are handled by the background thread, and we simplify wait for the future in the foreground thread). ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +634,85 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +final TopicPartition tp = new TopicPartition("foo", 0); +testIncompleteAsyncCommit(tp); + +final CompletableFuture asyncCommitFuture = getLastEnqueuedEventFuture(); + +// Commit async is not completed yet, so commit sync should wait for it to complete (time out) +assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + +// Complete exceptionally async commit event +asyncCommitFuture.completeExceptionally(new KafkaException("Test exception")); + +// Commit async is completed, so commit sync completes immediately (since offsets are empty) +assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); +} + +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() { +final TopicPartition tp = new TopicPartition("foo", 0); +testIncompleteAsyncCommit(tp); + +final CompletableFuture asyncCommitFuture = getLastEnqueuedEventFuture(); + +// Mock to complete sync event +completeCommitSyncApplicationEventSuccessfully(); + +// Commit async is not completed yet, so commit sync should wait for it to complete (time out) +assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)),
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
jeqo commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1572436483 ## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java: ## @@ -60,6 +62,30 @@ public void schemaless() { assertEquals(expectedKey, transformedRecord.key()); } +@Test +public void schemalessAndNestedFields() { +Map configs = new HashMap<>(); +configs.put("fields", "a,b.c"); +configs.put(FieldSyntaxVersion.FIELD_SYNTAX_VERSION_CONFIG, FieldSyntaxVersion.V2.name()); +xform.configure(configs); + +final HashMap value = new HashMap<>(); +value.put("a", 1); +final HashMap nested = new HashMap<>(); +nested.put("c", 3); +value.put("b", nested); + +final SinkRecord record = new SinkRecord("", 0, null, null, null, value, 0); +final SinkRecord transformedRecord = xform.apply(record); + +final HashMap expectedKey = new HashMap<>(); +expectedKey.put("a", 1); +expectedKey.put("b.c", 3); Review Comment: Good catch. I don't remember this scenario being discussed in the KIP thread. Was using the original keys as they are meant to be unique, but they could also contain all kind of escape fields making it hard to use. Happy to switch into deriving the key schema from the nesting structure. Will make an update to the KIP to make this explicit as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
dajac commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1572314707 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +/** + * The assignment type is decided based on whether all the members are assigned partitions + * for the first time (full), or incrementally when a rebalance is triggered. + */ +public enum AssignmentType { +FULL, INCREMENTAL +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"FULL", "INCREMENTAL"}) +private AssignmentType assignmentType; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +createAssignmentSpec(); + +
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
mdedetrich commented on PR #13375: URL: https://github.com/apache/kafka/pull/13375#issuecomment-2066602056 @yashmayya Are you still working on this to get it over the finish line or is it okay for me to take over? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lianetm commented on PR #15738: URL: https://github.com/apache/kafka/pull/15738#issuecomment-2066577956 Hey @lucasbru , answering your questions : the new behaviour of the static membership regarding a member that joins with dup group instance Id is documented in [this](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-StaticMembership(KIP-345)) section of the KIP. We've also discussed it with @dajac and other client teams (librd), seeing the improvement the new protocol bring in this area (mainly in cases of conflicting members, that continuously kick each other out with the classic protocol) Your question does makes me notice that, even though the KIP describes how conflicting static members behave in the new protocol, it would probably be helpful extend that explanation to point out the fundamental difference it has with the legacy protocol and how it is an improved approach. (I can't find that explained in the KIP. If I'm not missing it, it would probably be a good update to the KIP [static membership section](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-StaticMembership(KIP-345)) @dajac ?) Thanks @lucasbru ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
dajac commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1572307205 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 1) +@Measurement(iterations = 0) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"true", "false"}) +private boolean simulateRebalanceTrigger; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +createAssignmentSpec(); + +partitionAssignor = assignorType.assignor(); + +if (simulateRebalanceTrigger) { +simulateIncrementalRebalance(topicMetadata); +} +} + +private Map createTopicMetadata() { +Map topicMetadata = new
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
cadonna commented on PR #15613: URL: https://github.com/apache/kafka/pull/15613#issuecomment-2066478507 > > @lucasbru Thanks for the PR! > > The unit tests you added fail in the build and also for me locally. > > Plus, I have a question regarding the integration tests. > > @cadonna Thanks for the review, I hadn't noticed the test failures. Seems the `ArgumentCaptor`s type-matching introduced with Mockito 5 does not work on Java 8. I committed a workaround. Also, I'm both shocked and impressed that you are using Java 8 locally. It has to run at least on Java 8, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
cadonna commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1572290394 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp) } + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed, even when no commit sync is performed as part of the close (due to auto-commit +// disabled, or simply because there no consumed offsets). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount) + } + + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first Review Comment: Fair enough, but why is that important? Is the intention that the async commit needs to lookup the group coordinator? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -1005,6 +1102,43 @@ public void testNoWakeupInCloseCommit() { assertFalse(capturedEvent.get().future().isCompletedExceptionally()); } +@Test +public void testCloseAwaitPendingAsyncCommitIncomplete() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); Review Comment: Do we need this stub? ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp) } + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed, even when no commit sync is performed as part of the close (due to auto-commit +// disabled, or simply because there no consumed offsets). Review Comment: ```suggestion // disabled, or simply because there are no consumed offsets). ``` ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +634,85 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +final TopicPartition tp = new TopicPartition("foo", 0); +testIncompleteAsyncCommit(tp); + +final CompletableFuture asyncCommitFuture
[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor
[ https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838979#comment-17838979 ] Stanislav Spiridonov commented on KAFKA-16585: -- The case is relatively simple. I have KTable with entities that have to be enrichment with icon attribute from side service. So, the processor maintains the internal store with entities keys and periodically ask the service for update for registered ids. If icon has changes it forward the message with new icon. The key of record is entity key (String), value is a icon (String). BTW I faced into strange behaviour - if I forward new record from another thread it arrived to incorrect processor. So now I just update store from icon KTable instead of forward the record. > No way to forward message from punctuation method in the FixedKeyProcessor > -- > > Key: KAFKA-16585 > URL: https://issues.apache.org/jira/browse/KAFKA-16585 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.2 >Reporter: Stanislav Spiridonov >Priority: Major > > The FixedKeyProcessorContext can forward only FixedKeyRecord. This class > doesn't have a public constructor and can be created based on existing > records. But such record usually is absent in the punctuation method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
soarez commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1572295590 ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -950,16 +980,47 @@ class ZkMigrationIntegrationTest { dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get } - def alterTopicConfig(admin: Admin): AlterConfigsResult = { + def alterBrokerConfigs(admin: Admin): Unit = { +val defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, "") +val defaultBrokerConfigs = Seq( + new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "8640"), AlterConfigOp.OpType.SET), +).asJavaCollection +val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0") +val broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1") +val specificBrokerConfigs = Seq( + new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "4320"), AlterConfigOp.OpType.SET), +).asJavaCollection + +TestUtils.retry(6) { + val result = admin.incrementalAlterConfigs(Map( +defaultBrokerResource -> defaultBrokerConfigs, +broker0Resource -> specificBrokerConfigs, +broker1Resource -> specificBrokerConfigs + ).asJava) + try { +result.all().get(10, TimeUnit.SECONDS) + } catch { +case t: Throwable => fail("Alter Broker Configs had an error", t) + } +} Review Comment: Do we really need the retry logic here? Could we verify that a KRaft controller is ready before testing this operation to make the outcome more predictable? Or is it expected that we may have to repeat the operation a few times regardless? And if that's the case should we update docs too to let operators know they might have to try a few times? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 System test for reverting migration to ZK [kafka]
mumrah commented on PR #15754: URL: https://github.com/apache/kafka/pull/15754#issuecomment-2066454711 ``` SESSION REPORT (ALL TESTS) ducktape version: 0.11.4 session_id: 2024-04-18--017 run time: 26 minutes 43.661 seconds tests run:8 passed: 8 flaky:0 failed: 0 ignored: 0 test_id: kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=False.downgrade_to_zk=False status: PASS run time: 2 minutes 54.546 seconds test_id: kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=False.downgrade_to_zk=True status: PASS run time: 3 minutes 32.044 seconds test_id: kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=True.downgrade_to_zk=False status: PASS run time: 2 minutes 56.290 seconds test_id: kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=True.downgrade_to_zk=True status: PASS run time: 3 minutes 38.101 seconds test_id: kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=False.downgrade_to_zk=False status: PASS run time: 2 minutes 52.691 seconds test_id: kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=False.downgrade_to_zk=True status: PASS run time: 3 minutes 56.394 seconds test_id: kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=True.downgrade_to_zk=False status: PASS run time: 3 minutes 8.253 seconds test_id: kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=True.downgrade_to_zk=True status: PASS run time: 3 minutes 44.203 seconds ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: example.com moved [kafka]
akatona84 commented on PR #15758: URL: https://github.com/apache/kafka/pull/15758#issuecomment-2066418336 now the ipv6 address is changed to 2606:2800:220:1:248:1893:25c8:1946... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: example.com moved [kafka]
akatona84 opened a new pull request, #15758: URL: https://github.com/apache/kafka/pull/15758 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lucasbru commented on PR #15613: URL: https://github.com/apache/kafka/pull/15613#issuecomment-2066392315 > @lucasbru Thanks for the PR! > > The unit tests you added fail in the build and also for me locally. > > Plus, I have a question regarding the integration tests. @cadonna Thanks for the review, I hadn't noticed the test failures. Seems the `ArgumentCaptor`s type-matching introduced with Mockito 5 does not work on Java 8. I committed a workaround. Also, I'm both shocked and impressed that you are using Java 8 locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]
FrankYang0529 commented on code in PR #15745: URL: https://github.com/apache/kafka/pull/15745#discussion_r1572189143 ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -179,8 +186,8 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu throw new IllegalStateException(); } -ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, brokers, controllers, autoStart, -annot.securityProtocol(), annot.metadataVersion()); +ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, brokers, controllers, disksPerBroker, Review Comment: Thanks for the suggestion. I updated it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]
chia7712 commented on PR #15722: URL: https://github.com/apache/kafka/pull/15722#issuecomment-2066283854 > Effectively, such test would verify the behavior of deprecated method. What do you think? we should verify the non-deprecated metrics should have correct doc which is not marked as "deprecated". Also, that is what you try to fix, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
chia7712 commented on PR #15616: URL: https://github.com/apache/kafka/pull/15616#issuecomment-2066277923 This PR is good but it seems to me `LogSegment` should NOT guess the directory structure managed by upper class (i.e `LogManager`). It seems the root cause is caused by following steps: 1. the segments to be deleted removed from `LocalLog` 2. `LocalLog#renameDir` move whole folder 3. `LocalLog#renameDir` update the parent folder for all segments. However, the segments to be deleted are removed form inner collection already. Hence, some `Segment#log` has a stale file. If I understand correctly, another solution is that we pass a function to get latest dir when calling `deleteSegmentFiles` (https://github.com/apache/kafka/blob/2d4abb85bf4a3afb1e3359a05786ab8f3fda127e/core/src/main/scala/kafka/log/LocalLog.scala#L904). If deleting segment get not-found error, we call `updateParentDir` and delete it again. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16561: Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config [kafka]
OmniaGM commented on PR #15728: URL: https://github.com/apache/kafka/pull/15728#issuecomment-2066275194 Thanks for getting the KIP out there for discussion and for fixing the tests. Should this PR be a draft until we have the KIP voted in by the community? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16561: Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config [kafka]
aaron-ai commented on PR #15728: URL: https://github.com/apache/kafka/pull/15728#issuecomment-2066276867 > Thanks for getting the KIP out there for discussion and for fixing the tests. Should this PR be a draft until we have the KIP voted in by the community? OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16561: Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config [kafka]
OmniaGM commented on code in PR #15728: URL: https://github.com/apache/kafka/pull/15728#discussion_r1572155918 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java: ## @@ -169,6 +170,7 @@ static Map sourceConsumerConfig(Map props) { result.putAll(Utils.entriesWithPrefix(props, CONSUMER_CLIENT_PREFIX)); result.putAll(Utils.entriesWithPrefix(props, SOURCE_PREFIX + CONSUMER_CLIENT_PREFIX)); result.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); +result.put(ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); Review Comment: @aaron-ai thanks for drafting a KIP that quickly. I left a couple of comments there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
showuon commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1572140166 ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerRegistration = getControllerRegistration match { Review Comment: Could we add some tests for these changes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]
chia7712 commented on code in PR #15745: URL: https://github.com/apache/kafka/pull/15745#discussion_r1572122943 ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -179,8 +186,8 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu throw new IllegalStateException(); } -ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, brokers, controllers, autoStart, -annot.securityProtocol(), annot.metadataVersion()); +ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, brokers, controllers, disksPerBroker, Review Comment: We can do that in this PR as most changes involved by refactor are in this method. Let's do it to decorate our test infrastructure:) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]
FrankYang0529 commented on code in PR #15745: URL: https://github.com/apache/kafka/pull/15745#discussion_r1572114481 ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -179,8 +186,8 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu throw new IllegalStateException(); } -ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, brokers, controllers, autoStart, -annot.securityProtocol(), annot.metadataVersion()); +ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, brokers, controllers, disksPerBroker, Review Comment: Do we want to use another jira to trace this? Or I remove it in this PR? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16561: Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config [kafka]
aaron-ai commented on PR #15728: URL: https://github.com/apache/kafka/pull/15728#issuecomment-2066221145 KIP has been created here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1039%3A+Disable+automatic+topic+creation+for+MirrorMaker2+consumers -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lucasbru commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1572109503 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp) } + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed, even when no commit sync is performed as part of the close (due to auto-commit +// disabled, or simply because there no consumed offsets). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount) + } + + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) +assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(1, cb.successCount) + +// Try with coordinator known Review Comment: We ar guaranteed to have looked up the coordinator here, because we did request data from it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lucasbru commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1572109078 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp) } + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed, even when no commit sync is performed as part of the close (due to auto-commit +// disabled, or simply because there no consumed offsets). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount) + } + + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first Review Comment: We are not guaranteed to have looked up the coordinator here, because we did not request any data from it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16579: Revert Consumer Rolling Upgrade [kafka]
lucasbru merged PR #15753: URL: https://github.com/apache/kafka/pull/15753 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lucasbru commented on PR #15738: URL: https://github.com/apache/kafka/pull/15738#issuecomment-2066195894 LGTM, thanks! Merging this, however: * Have we discussed the behavioral difference with broker team / David? * Have we documented the behavioral difference anywhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lucasbru merged PR #15738: URL: https://github.com/apache/kafka/pull/15738 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR:fix hint in fetchOffsetForTimestamp [kafka]
hudeqi opened a new pull request, #15757: URL: https://github.com/apache/kafka/pull/15757 A clear hint meaning error: The actual logic is that an error is thrown only when the high watermark lags behind the epoch start offset, but the hint meaning is opposite. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15709) KRaft support in ServerStartupTest
[ https://issues.apache.org/jira/browse/KAFKA-15709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838923#comment-17838923 ] Zihao Lin commented on KAFKA-15709: --- [~mdedetrich] feel free to take over > KRaft support in ServerStartupTest > -- > > Key: KAFKA-15709 > URL: https://issues.apache.org/jira/browse/KAFKA-15709 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Assignee: Zihao Lin >Priority: Minor > Labels: kraft, kraft-test, newbie > > The following tests in ServerStartupTest in > core/src/test/scala/unit/kafka/server/ServerStartupTest.scala need to be > updated to support KRaft > 38 : def testBrokerCreatesZKChroot(): Unit = { > 51 : def testConflictBrokerStartupWithSamePort(): Unit = { > 65 : def testConflictBrokerRegistration(): Unit = { > 82 : def testBrokerSelfAware(): Unit = { > 93 : def testBrokerStateRunningAfterZK(): Unit = { > Scanned 107 lines. Found 0 KRaft tests out of 5 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
funky-eyes commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1572073595 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java: ## Review Comment: I have a question, in fact, what this PR does is to provide a standard configuration and generate corresponding rate limiters and related monitoring indicators, right? Then it needs to be used in the corresponding RemoteStorageManager, correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]
chia7712 commented on code in PR #15745: URL: https://github.com/apache/kafka/pull/15745#discussion_r1572064864 ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -179,8 +186,8 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu throw new IllegalStateException(); } -ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, brokers, controllers, autoStart, -annot.securityProtocol(), annot.metadataVersion()); +ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, brokers, controllers, disksPerBroker, Review Comment: It seems to me `clusterBuilder` should be removed. Also, the temporary local variables (`type`, `brokers`, etc ) can be removed. We have a builder already and hence we should leverage it to collect all variables. for example: ```java ClusterConfig.Builder builder = ClusterConfig.builder(); if (annot.clusterType() == Type.DEFAULT) { builder.type(defaults.clusterType()); } else { builder.type(annot.clusterType()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]
chia7712 commented on PR #15719: URL: https://github.com/apache/kafka/pull/15719#issuecomment-2066133702 @brandboat this PR is great. However, I'd like to merge it after #15569. #15569 is a huge PR which refactor the `KafkaConfig` and `LogConfig`, and I try to alleviate the pain of fixing conflicts continually. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]
brandboat commented on PR #15719: URL: https://github.com/apache/kafka/pull/15719#issuecomment-2066136873 > @brandboat this PR is great. However, I'd like to merge it after https://github.com/apache/kafka/pull/15569. https://github.com/apache/kafka/pull/15569 is a huge PR which refactor the KafkaConfig and LogConfig, and I try to alleviate the pain of fixing conflicts continually. No problem ! Thanks for the notification ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]
FrankYang0529 commented on PR #15745: URL: https://github.com/apache/kafka/pull/15745#issuecomment-2066132584 Hi @gaurav-narula and @chia7712, I have addressed all comments. Thanks for your review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on PR #15616: URL: https://github.com/apache/kafka/pull/15616#issuecomment-2066130114 @johnnychhsu , do you have any other comments? I'll merge this at the weekend if no other comments. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
chia7712 commented on PR #15569: URL: https://github.com/apache/kafka/pull/15569#issuecomment-2066120621 ``` [2024-04-18T17:26:16.149Z] [ant:checkstyle] [ERROR] /home/jenkins/workspace/Kafka_kafka-pr_PR-15569/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java:22:1: Disallowed import - org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1. [ImportControl] [2024-04-18T17:26:16.149Z] [ant:checkstyle] [ERROR] /home/jenkins/workspace/Kafka_kafka-pr_PR-15569/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java:23:1: Disallowed import - org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX. [ImportControl] ``` there are imports error :_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1572018203 ## metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java: ## @@ -881,10 +937,18 @@ public List> recordBatches() { new LeaderAndEpoch(OptionalInt.of(3000), 1)).build()); TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), -"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state"); +"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); assertEquals(expectedBatchCount, batchesPassedToController.size()); assertEquals(expectedRecordCount, batchesPassedToController.stream().mapToInt(List::size).sum()); } } + +// Wait until the driver has recovered MigrationState From ZK. This is to simulate the driver needs to be installed as the metadata publisher +// so that it can receive onControllerChange (KRaftLeaderEvent) and onMetadataUpdate (MetadataChangeEvent) events. +private void startAndWaitForRecoveringMigrationStateFromZK(KRaftMigrationDriver driver) throws InterruptedException { +driver.start(); +TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.INACTIVE), +"Waiting for KRaftMigrationDriver to enter INACTIVE state"); Review Comment: This is necessary now because in the test suite, we might invoke `onControllerChange` to append `KRaftLeaderEvent` before the `RecoverMigrationStateFromZKEvent` is appended. This won't happen in practice because the driver needs to wait until `RecoverMigrationStateFromZKEvent` completed to register metadata publisher to receive `KRaftLeaderEvent` and `MetadataChangeEvent`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1572018955 ## metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java: ## @@ -881,10 +937,18 @@ public List> recordBatches() { new LeaderAndEpoch(OptionalInt.of(3000), 1)).build()); TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), -"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state"); +"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); Review Comment: Side fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15838: ExtractField and InsertField NULL Values are replaced by default value even in NULLABLE fields [kafka]
mfvitale commented on PR #15756: URL: https://github.com/apache/kafka/pull/15756#issuecomment-2066045097 > Hi @mfvitale, thanks for the PR! This is adding new configurations to transformations so this will require a [KIP](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals). @mimaison actually `key.converter.replace.null.with.default` and `value.converter.replace.null.with.default` are not new configuration but are the one used by `JsonConverter` added with https://github.com/apache/kafka/pull/13419 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15838: ExtractField and InsertField NULL Values are replaced by default value even in NULLABLE fields [kafka]
mimaison commented on PR #15756: URL: https://github.com/apache/kafka/pull/15756#issuecomment-2066023794 Hi @mfvitale, thanks for the PR! This is adding new configurations to transformations so this will require a [KIP](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1571981454 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -786,12 +773,29 @@ public void run() throws Exception { } } +class RecoverMigrationStateFromZKEvent extends MigrationEvent { +@Override +public void run() throws Exception { +applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); Review Comment: Good suggestion. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1571963909 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -786,12 +773,29 @@ public void run() throws Exception { } } +class RecoverMigrationStateFromZKEvent extends MigrationEvent { +@Override +public void run() throws Exception { +applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); +String maybeDone = migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done"; +log.info("Initial migration of ZK metadata is {}.", maybeDone); + +// Once we've recovered the migration state from ZK, install this class as a metadata publisher +// by calling the initialZkLoadHandler. +initialZkLoadHandler.accept(KRaftMigrationDriver.this); + +// Transition to INACTIVE state and wait for leadership events. +transitionTo(MigrationDriverState.INACTIVE); +} +} + class PollEvent extends MigrationEvent { + @Override public void run() throws Exception { switch (migrationState) { case UNINITIALIZED: -recoverMigrationStateFromZK(); +eventQueue.append(new RecoverMigrationStateFromZKEvent()); Review Comment: No need. Like I said in this [comment](https://github.com/apache/kafka/pull/15732#discussion_r1570596642), in the `UNINITIALIZED` state, the only event we will receive is the `pollEvent`. We'll receive additional`onControllerChange` (`KRaftLeaderEvent`) and `onMetadataUpdate` (`MetadataChangeEvent`) after completing `RecoverMigrationStateFromZKEvent`. So, we don't have to worry about the order at this moment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2065947030 > Do you think that we should revert unstable.api.versions.enable change and try again? Thanks. Yep -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Use Parametrized types correctly in RemoteLogMetadataSerde [kafka]
jlprat merged PR #13824: URL: https://github.com/apache/kafka/pull/13824 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15963) Flaky test: testBrokerHeartbeatDuringMigration [3] 3.6-IV0 – org.apache.kafka.controller.QuorumControllerTest
[ https://issues.apache.org/jira/browse/KAFKA-15963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838883#comment-17838883 ] Josep Prat commented on KAFKA-15963: it failed again but 3.6-IV1 this time: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/2/testReport/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_21_and_Scala_2_13testBrokerHeartbeatDuringMigration_MetadataVersion__metadataVersion_3_6_IV1_/ > Flaky test: testBrokerHeartbeatDuringMigration [3] 3.6-IV0 – > org.apache.kafka.controller.QuorumControllerTest > - > > Key: KAFKA-15963 > URL: https://issues.apache.org/jira/browse/KAFKA-15963 > Project: Kafka > Issue Type: Bug >Reporter: Apoorv Mittal >Assignee: Ashwin Pankaj >Priority: Major > > PR build: > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14767/15/tests/] > > {code:java} > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.NotControllerException: No controller appears > to be active.Stacktracejava.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.NotControllerException: No controller appears > to be active. at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) >at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) > at > org.apache.kafka.controller.QuorumControllerTest.testBrokerHeartbeatDuringMigration(QuorumControllerTest.java:1725) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) >at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) >at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Use Parametrized types correctly in RemoteLogMetadataSerde [kafka]
jlprat commented on PR #13824: URL: https://github.com/apache/kafka/pull/13824#issuecomment-2065886831 All tests failing for this build were known flaky tests. Merging -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org