[GitHub] [kafka] emissionnebula commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures
emissionnebula commented on code in PR #13437: URL: https://github.com/apache/kafka/pull/13437#discussion_r1173353923 ## server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java: ## @@ -68,7 +68,7 @@ public void testUnderlying() { } @Test -public void testDelegationOfAfterAdding() { +public void testDelegationOfAfterUpdated() { Review Comment: ohh. I missed this :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14827) Support for StandardAuthorizer in Benchmark
[ https://issues.apache.org/jira/browse/KAFKA-14827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Purshotam Chauhan resolved KAFKA-14827. --- Fix Version/s: 3.5.0 Reviewer: Manikumar Reddy Resolution: Fixed > Support for StandardAuthorizer in Benchmark > --- > > Key: KAFKA-14827 > URL: https://issues.apache.org/jira/browse/KAFKA-14827 > Project: Kafka > Issue Type: Improvement >Reporter: Purshotam Chauhan >Assignee: Purshotam Chauhan >Priority: Major > Fix For: 3.5.0 > > > Support for StandardAuthorizer in Benchmark -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-14827) Support for StandardAuthorizer in Benchmark
[ https://issues.apache.org/jira/browse/KAFKA-14827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Purshotam Chauhan closed KAFKA-14827. - > Support for StandardAuthorizer in Benchmark > --- > > Key: KAFKA-14827 > URL: https://issues.apache.org/jira/browse/KAFKA-14827 > Project: Kafka > Issue Type: Improvement >Reporter: Purshotam Chauhan >Assignee: Purshotam Chauhan >Priority: Major > Fix For: 3.5.0 > > > Support for StandardAuthorizer in Benchmark -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dengziming commented on a diff in pull request #13555: MINOR: Move `ControllerPurgatory` to `server-common`
dengziming commented on code in PR #13555: URL: https://github.com/apache/kafka/pull/13555#discussion_r1173247022 ## server-common/src/main/java/org/apache/kafka/purgatory/DeferredEventPurgatory.java: ## @@ -26,10 +26,9 @@ /** * The purgatory which holds events that have been started, but not yet completed. Review Comment: Some of my thoughts: the founder of Kafka had some artistic elements, with Kafka(a novelist) mainly representing the modernist school, and "purgatory" being a term used in Dante's "Divine Comedy", a representative writer of the Renaissance era. Purgatory' is first used in the `TimingWheel` algorithm and has been used several times to represent a `ThresholdFuture` data structure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #13532: KAFKA-14887: No shutdown for ZK session expiration in feature processing
rondagostino commented on PR #13532: URL: https://github.com/apache/kafka/pull/13532#issuecomment-1517118288 Merged to 3.5, 3.4, 3.3, 3.2, 3.1, 3.0, 2.8, and 2.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14887) ZK session timeout can cause broker to shutdown
[ https://issues.apache.org/jira/browse/KAFKA-14887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-14887: -- Fix Version/s: 2.7.3 3.2.4 3.1.3 3.0.3 3.4.1 3.3.3 2.8.3 > ZK session timeout can cause broker to shutdown > --- > > Key: KAFKA-14887 > URL: https://issues.apache.org/jira/browse/KAFKA-14887 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.7.0, 2.8.0, 2.7.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0, 3.0.1, > 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, > 3.3.2 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Major > Fix For: 2.7.3, 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 3.3.3, 2.8.3 > > > We have the following code in FinalizedFeatureChangeListener.scala which will > exit regardless of the type of exception that is thrown when trying to > process feature changes: > case e: Exception => { > error("Failed to process feature ZK node change event. The broker > will eventually exit.", e) > throw new FatalExitError(1) > The issue here is that this does not distinguish between exceptions caused by > an inability to process a feature change and an exception caused by a > ZooKeeper session timeout. We want to shut the broker down for the former > case, but we do NOT want to shut the broker down in the latter case; the > ZooKeeper session will eventually be reestablished, and we can continue > processing at that time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14887) ZK session timeout can cause broker to shutdown
[ https://issues.apache.org/jira/browse/KAFKA-14887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-14887: -- Fix Version/s: 3.5.0 > ZK session timeout can cause broker to shutdown > --- > > Key: KAFKA-14887 > URL: https://issues.apache.org/jira/browse/KAFKA-14887 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.7.0, 2.8.0, 2.7.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0, 3.0.1, > 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, > 3.3.2 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Major > Fix For: 3.5.0 > > > We have the following code in FinalizedFeatureChangeListener.scala which will > exit regardless of the type of exception that is thrown when trying to > process feature changes: > case e: Exception => { > error("Failed to process feature ZK node change event. The broker > will eventually exit.", e) > throw new FatalExitError(1) > The issue here is that this does not distinguish between exceptions caused by > an inability to process a feature change and an exception caused by a > ZooKeeper session timeout. We want to shut the broker down for the former > case, but we do NOT want to shut the broker down in the latter case; the > ZooKeeper session will eventually be reestablished, and we can continue > processing at that time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rondagostino commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures
rondagostino commented on code in PR #13437: URL: https://github.com/apache/kafka/pull/13437#discussion_r1173188783 ## server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java: ## @@ -77,7 +77,7 @@ public void testDelegationOfAfterAdding() { } @Test -public void testDelegationOfAfterRemoving() { +public void testDelegationOfAfterRemoved() { Review Comment: s/AfterRemoved/Removed/ ## server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSetTest.java: ## @@ -71,7 +71,7 @@ public void testUnderlying() { } @Test -public void testDelegationOfAfterAdding() { +public void testDelegationOfAfterAdded() { Review Comment: s/AfterAdded/Added/ ## server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableNavigableSetTest.java: ## @@ -82,7 +82,7 @@ public void testDelegationOfAfterAdding() { } @Test -public void testDelegationOfAfterRemoving() { +public void testDelegationOfAfterRemoved() { Review Comment: s/AfterRemoved/Removed/ ## server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSetTest.java: ## @@ -80,7 +80,7 @@ public void testDelegationOfAfterAdding() { } @Test -public void testDelegationOfAfterRemoving() { +public void testDelegationOfAfterRemoved() { Review Comment: s/AfterRemoved/Removed/ ## server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableNavigableSetTest.java: ## @@ -73,7 +73,7 @@ public void testUnderlying() { } @Test -public void testDelegationOfAfterAdding() { +public void testDelegationOfAfterAdded() { Review Comment: s/AfterAdded/Added/ ## server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java: ## @@ -68,7 +68,7 @@ public void testUnderlying() { } @Test -public void testDelegationOfAfterAdding() { +public void testDelegationOfAfterUpdated() { Review Comment: s/AfterUpdated/Updated/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures
rondagostino commented on code in PR #13437: URL: https://github.com/apache/kafka/pull/13437#discussion_r1173187845 ## server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java: ## @@ -225,77 +225,59 @@ public void testDelegationOfReplaceAll() { new PCollectionsHashMapWrapperDelegationChecker<>() .defineMockConfigurationForVoidMethodInvocation(mock -> mock.replaceAll(eq(mockBiFunction))) .defineWrapperVoidMethodInvocation(wrapper -> wrapper.replaceAll(mockBiFunction)) -.doVoidMethodDelegationCheck(); +.doUnsupportedVoidFunctionDelegrationCheck(); } @Test public void testDelegationOfPutIfAbsent() { new PCollectionsHashMapWrapperDelegationChecker<>() -.defineMockConfigurationForFunctionInvocation(mock -> mock.putIfAbsent(eq(this), eq(this)), this) - .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.putIfAbsent(this, this), identity()) -.doFunctionDelegationCheck(); -} - -@ParameterizedTest -@ValueSource(booleans = {true, false}) -public void testDelegationOfRemoveByKeyAndValue(boolean mockFunctionReturnValue) { -new PCollectionsHashMapWrapperDelegationChecker<>() -.defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this), eq(this)), mockFunctionReturnValue) - .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this, this), identity()) -.doFunctionDelegationCheck(); -} - -@ParameterizedTest -@ValueSource(booleans = {true, false}) -public void testDelegationOfReplaceWhenMappedToSpecificValue(boolean mockFunctionReturnValue) { -new PCollectionsHashMapWrapperDelegationChecker<>() -.defineMockConfigurationForFunctionInvocation(mock -> mock.replace(eq(this), eq(this), eq(this)), mockFunctionReturnValue) - .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.replace(this, this, this), identity()) -.doFunctionDelegationCheck(); Review Comment: Yeah, this is why I said in the comment above: ``` I think on the one hand we don't have to test that the real method throws the exception -- that's up to the library itself to test in its test suite. All we have to test is that we delegate correctly. ``` The underlying library implementation in I guess these two cases doesn't explicitly throw the exception but instead reuses a default implementation from `java.util`, and that will cause an exception to be thrown if the map ends up needing to be mutated. At a minimum we should put the two test cases back in as I wrote them. If you want to do it with an explicit check for the thrown exception by invoking the underlying method then you may have to do a bit more configuration on the mock so that the underlying method will actually decide it needs to mutate the map. probably just putting my stuff back in is easier (and also add "UnsupportedOperation" into the test method names). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1173179837 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are as follows: + * + * Each member must get at least one partition for every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + *This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + *Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * Members should retain as much as their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + * + * + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * Add members from the potentiallyUnfilled list to the Unfilled list if they haven't met the total required quota i.e. minimum number of partitions per member + 1 (if member is designated to receive one of the excess partitions) + * Generate a list of unassigned partitions by calculating the difference between total partitions and already assigned (sticky) partitions + * Iterate through unfilled members and assign partitions from the unassigned partitions + * + * + * + */ +public class RangeAssignor implements PartitionAssignor { + +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +static class RemainingAssignmentsForMember { +private final String memberId; +private final Integer remaining; + +public RemainingAssignmentsForMember(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} + +public String memberId() { +return memberId; +} + +public Integer remaining() { +return remaining; +} + +} + +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic.computeIfAbsent(topicId, k -> new ArrayList<>()).add(memberId); +} else { +log.info(memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +private Map>
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1173177108 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are as follows: + * + * Each member must get at least one partition for every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + *This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + *Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * Members should retain as much as their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + * + * + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * Add members from the potentiallyUnfilled list to the Unfilled list if they haven't met the total required quota i.e. minimum number of partitions per member + 1 (if member is designated to receive one of the excess partitions) + * Generate a list of unassigned partitions by calculating the difference between total partitions and already assigned (sticky) partitions + * Iterate through unfilled members and assign partitions from the unassigned partitions + * + * + * + */ +public class RangeAssignor implements PartitionAssignor { + +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +static class RemainingAssignmentsForMember { +private final String memberId; +private final Integer remaining; + +public RemainingAssignmentsForMember(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} + +public String memberId() { +return memberId; +} + +public Integer remaining() { +return remaining; +} + +} + +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic.computeIfAbsent(topicId, k -> new ArrayList<>()).add(memberId); +} else { +log.info(memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +private Map>
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1173174594 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +List subscribedTopics = new ArrayList<>(); +members.computeIfAbsent(consumerA, k -> new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testOneConsumerNonExistentTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +List subscribedTopics = new ArrayList<>(); +subscribedTopics.add(topic2Uuid); +members.computeIfAbsent(consumerA, k -> new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 Partitions +// Topics +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); +// Members +Map members = new HashMap<>(); +// Consumer A +List subscribedTopicsA = new ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid)); +members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopicsA, new HashMap<>())); +// Consumer B +List subscribedTopicsB = new ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid)); +members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopicsB, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +expectedAssignment.computeIfAbsent(topic1Uuid, k -> new HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1))); +expectedAssignment.computeIfAbsent(topic1Uuid, k -> new HashSet<>()).add(new HashSet<>(Collections.singletonList(2))); +// Topic 3 Partitions Assignment +expectedAssignment.computeIfAbsent(topic3Uuid, k -> new HashSet<>()).add(new HashSet<>(Collections.singletonList(0))); +expectedAssignment.computeIfAbsent(topic3Uuid, k -> new HashSet<>()
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1173172297 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +members.put(consumerA, new AssignmentMemberSpec( +Optional.empty(), + Optional.empty(), + Collections.emptyList(), + Collections.emptyMap())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testOneConsumerNonExistentTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1173171410 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -266,7 +266,7 @@ public synchronized TransactionalRequestResult beginAbort() { return handleCachedTransactionRequestResult(() -> { if (currentState != State.ABORTABLE_ERROR) maybeFailWithError(); -transitionTo(State.ABORTING_TRANSACTION); +transitionTo(State.ABORTING_TRANSACTION, null, true); Review Comment: Good catch. I've created another version of `beginAbort` that takes the flag and am passing in `false` from the `Sender` `run` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1173167778 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -968,13 +968,31 @@ private void transitionTo(State target) { } private void transitionTo(State target, RuntimeException error) { +transitionTo(target, error, false); Review Comment: > I think changing the default behavior to not throw can cause issues in some calls: > > 1. TransactionManager.InitProducerIdHandler#handleResponse on line 1303 - lastError is explicitly set to null (which shouldn't be done at all, as transitionTo already does that if the state transition is valid), which will clear the latest error. I think to make this work, that lastError = null should be removed from line 1303 I've removed that line. I am not sure why it's there, but `git blame` says it's been there since 2017 😬 > 2. This is a call chain where we transition on direct user action, shouldn't this be throwing? KafkaProducer.send -> KafkaProducer.doSend -> maybeTransitionToErrorState -> transitionToAbortableError -> transitionTo Since `maybeTransitionToErrorState` is being called from inside a `catch` block, if we did throw an exception, it would mask the root issue (`ApiException`), right? > 3. In TransactionManager.TxnOffsetCommitHandler#handleResponse, there are multiple > > ``` > abortableError(...); > break; > ``` > > blocks. If abortableError does not throw on invalid state transition anymore, the txn commit will be retried, even when in a failed state, which doesn't seem correct. After the `abortableError` call, if the state transition was invalid, then `currentState` would be `FATAL_ERROR`. That has the same effect as the last two branches of the `if` statement that call the `fatalError` method, right? Would simply skipping the reenqueue on fatal errors be sufficient? ``` if (result.isCompleted()) { pendingTxnOffsetCommits.clear(); } else if (pendingTxnOffsetCommits.isEmpty()) { result.done(); } else { if (!hasFatalError()) { // Retry the commits which failed with a retriable error reenqueue(); } } ``` I don't yet understand why we'd want to re-enqueue after those calls to `fatalError` anyway 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14922) kafka-streams-application-reset deletes topics not belonging to specified application-id
[ https://issues.apache.org/jira/browse/KAFKA-14922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714770#comment-17714770 ] Matthias J. Sax commented on KAFKA-14922: - {quote}We could add a warning with the list of internal topics found to delete and ask for a confirmation to make it harder to inadvertently delete internal topics of other application IDs. {quote} There is already a `--dry-run` option, but we could of course also try adding a `--execute` one and flip it around... Would need a KIP of course. {quote}An _improvement_ would be to only return topics that exactly contains the applicationId provided.{quote} {color:#172b4d}I don't believe it's possible to implement this.{color} {quote}Would not cover the case where other applicationIds starts with applicationId provided (foo-v1 would delete foo-v1-2 topics, etc) {quote} Both seem to be the same issue? Note: we already do `topicName.startsWith(options.valueOf(applicationIdOption) + "-")`, ie, we add the expected `-` – if your app.id uses a dash like `myApp-v1` there is nothing we can do about it. It provides a protection for `appV1` vs `appV2` and if you pass in `app` it won't match either of them, but if `-` is use inside app.id, it seems there is nothing we can do about it. > kafka-streams-application-reset deletes topics not belonging to specified > application-id > > > Key: KAFKA-14922 > URL: https://issues.apache.org/jira/browse/KAFKA-14922 > Project: Kafka > Issue Type: Bug > Components: streams, tools >Affects Versions: 3.4.0 >Reporter: Jørgen >Priority: Major > > Slack-thread: > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1681908267206849] > When running the command _kafka-streams-application-reset --bootstrap-servers > $BOOTSTRAP --application-id foo_ all internal topics that _starts with_ foo > is deleted. This happens even if there's no application-id named foo. > Example: > {code:java} > Application IDs: > foo-v1 > foo-v2 > Internal topics: > foo-v1-repartition-topic-repartition > foo-v2-repartition-topic-repartition > Application reset: > kafka-streams-application-reset --bootstrap-servers $BOOTSTRAP > --application-id foo > > No input or intermediate topics specified. Skipping seek. > Deleting inferred internal topics [foo-v2-repartition-topic-repartition, > foo-v1-repartition-topic-repartition] > Done.{code} > Expected behaviour is that the command fails as there are no application-id's > with the name foo instead of deleting all foo* topics. > This is critical on typos or if application-ids starts with the same name as > others (for example if we had foo-v21 and wanted to reset foo-v2) > The bug should be located here: > [https://github.com/apache/kafka/blob/c14f56b48461f01743146d58987bc8661ba0d459/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java#L693] > Should check that the topics matches the application-id exactly instead of > checking that it starts with the application-id. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mjsax commented on pull request #13614: KAFKA-14586: Adding redirection for StreamsResetter
mjsax commented on PR #13614: URL: https://github.com/apache/kafka/pull/13614#issuecomment-1517029646 Thanks all of you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia opened a new pull request, #13622: KAFKA-14834: [13/N] Docs updates for versioned store semantics
vcrfxia opened a new pull request, #13622: URL: https://github.com/apache/kafka/pull/13622 This PR updates docs for [KIP-914](https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
mumrah commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173127962 ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -324,4 +329,29 @@ class ZkMigrationIntegrationTest { assertTrue(firstProducerIdBlock.firstProducerId() < producerIdBlock.firstProducerId()) } } + + /** + * Start a KRaft cluster with migrations enabled, verify that the controller does not accept metadata changes + * through the RPCs + */ + @ClusterTests(Array( +new ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0, + serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true"))), +new ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_5_IV0, + serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true"))), +new ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_5_IV1, + serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true"))) + )) + def testPreMigrationMode(clusterInstance: ClusterInstance): Unit = { Review Comment: Heh, the test framework is politely adding a broker for me :) This test does seem to do what I want, i see: ``` [2023-04-20 17:56:08,267] ERROR [QuorumController id=3000] createTopics: unable to start processing because of NotControllerException. Reason: The active controller appears to be node 3000 (org.apache.kafka.controller.QuorumController:445) ``` in the log, and then the create topics call times out after a short time. I've updated the test to have `brokers = 1` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
mumrah commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173116859 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1123,29 +1165,104 @@ private void claim(int epoch) { // Prepend the activate event. It is important that this event go at the beginning // of the queue rather than the end (hence prepend rather than append). It's also // important not to use prepend for anything else, to preserve the ordering here. -queue.prepend(new ControllerWriteEvent<>("completeActivation[" + epoch + "]", -new CompleteActivationEvent())); +ControllerWriteEvent activationEvent = new ControllerWriteEvent<>("completeActivation[" + epoch + "]", +new CompleteActivationEvent(), +EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, RUNS_IN_PREMIGRATION)); +activationEvent.future.whenComplete((__, t) -> { +if (t != null) { +fatalFaultHandler.handleFault("exception while activating controller", t); Review Comment: In `handleEventException`, we renounce the controller leadership when we see a non-ApiException from a ControllerWriteEvent. As far as I can tell, this does not hit the fault handler. `KafkaEventQueue#run` traps exceptions and calls `Event#handleException`. In `ControllerEvent#handleException`, we call `QuorumController#handleEventException` which just does logging and resignation. `handleEventException` wraps the exception with UnknownServerException, but I don't see where we do anything with that in ControllerEvent. I think this is by design since we don't want the controller just crashing for any error, but only for specific cases where we call the fault handler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
mumrah commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173116859 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1123,29 +1165,104 @@ private void claim(int epoch) { // Prepend the activate event. It is important that this event go at the beginning // of the queue rather than the end (hence prepend rather than append). It's also // important not to use prepend for anything else, to preserve the ordering here. -queue.prepend(new ControllerWriteEvent<>("completeActivation[" + epoch + "]", -new CompleteActivationEvent())); +ControllerWriteEvent activationEvent = new ControllerWriteEvent<>("completeActivation[" + epoch + "]", +new CompleteActivationEvent(), +EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, RUNS_IN_PREMIGRATION)); +activationEvent.future.whenComplete((__, t) -> { +if (t != null) { +fatalFaultHandler.handleFault("exception while activating controller", t); Review Comment: In `handleEventException`, we renounce the controller leadership when we see a non-ApiException from a ControllerWriteEvent. As far as I can tell, this does not hit the fault handler. `KafkaEventQueue#run` traps exceptions and calls `Event#handleException`. In `ControllerEvent#handleException`, we call `QuorumController#handleEventException` which just does logging and resignation. `handleEventException` wraps the exception with UnknownServerException, but I don't see where we do anything with that in ControllerEvent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
mumrah commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173109856 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -222,7 +245,7 @@ private boolean isValidStateChange(MigrationDriverState newState) { private void transitionTo(MigrationDriverState newState) { if (!isValidStateChange(newState)) { -log.error("Error transition in migration driver from {} to {}", migrationState, newState); +log.error("Invalid transition in migration driver from {} to {}", migrationState, newState); Review Comment: sure, i'll clean this up -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173092055 ## metadata/src/main/resources/common/metadata/ZkMigrationRecord.json: ## @@ -18,7 +18,9 @@ "type": "metadata", "name": "ZkMigrationStateRecord", // Version 0 adds ZkMigrationState which is used by the KRaft controller to mark the beginning and end - // of the ZK to KRaft migration. Possible values are 1 (PreMigration), 2 (Migration), 3 (PostMigration). + // of the ZK to KRaft migration. Possible values are 0 (None), 1 (PreMigration), 2 (Migration), 3 (PostMigration). Review Comment: I think this comment needs to be revised to remove the comment about the tagged field, and swap the value of PreMigration and Migration -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173091174 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -396,18 +418,36 @@ class WaitForControllerQuorumEvent extends MigrationEvent { @Override public void run() throws Exception { -switch (migrationState) { -case WAIT_FOR_CONTROLLER_QUORUM: -if (isControllerQuorumReadyForMigration()) { -log.debug("Controller Quorum is ready for Zk to KRaft migration"); -// Note that leadership would not change here. Hence we do not need to -// `apply` any leadership state change. -transitionTo(MigrationDriverState.WAIT_FOR_BROKERS); +if (migrationState.equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM)) { +if (!firstPublish) { +log.trace("Waiting until we have received metadata before proceeding with migration"); +return; +} + +ZkMigrationState zkMigrationState = image.features().zkMigrationState(); Review Comment: we can have a switch statement here right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173089410 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -222,7 +245,7 @@ private boolean isValidStateChange(MigrationDriverState newState) { private void transitionTo(MigrationDriverState newState) { if (!isValidStateChange(newState)) { -log.error("Error transition in migration driver from {} to {}", migrationState, newState); +log.error("Invalid transition in migration driver from {} to {}", migrationState, newState); Review Comment: (I realize you aren't adding this, but just saw an opportunity for a minor cleanup) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173089106 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -222,7 +245,7 @@ private boolean isValidStateChange(MigrationDriverState newState) { private void transitionTo(MigrationDriverState newState) { if (!isValidStateChange(newState)) { -log.error("Error transition in migration driver from {} to {}", migrationState, newState); +log.error("Invalid transition in migration driver from {} to {}", migrationState, newState); Review Comment: I'm confused about this function: 1. Shouldn't the transition to UNINITIALIZED being invalid be covered by isValidStateChange returning false if you pass it newState = UNINITIALIZED ? 2. The switch statement below throws an exception on invalid state change, whereas the initial check just logs a message at `error`. The exception seems more reasonable? 3. the switch has a case for INACTIVE that just does nothing? ``` switch (newState) { ... case INACTIVE: // Any state can go to INACTIVE. break; } ``` Just leave out the case for `INACTIVE` since it's a no-op? (And maybe the whole switch, see above.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173082868 ## metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java: ## @@ -105,14 +130,15 @@ private void writeFeatureLevels(ImageWriter writer, ImageWriterOptions options) @Override public int hashCode() { -return finalizedVersions.hashCode(); +return Objects.hash(finalizedVersions, zkMigrationState); } @Override public boolean equals(Object o) { if (!(o instanceof FeaturesImage)) return false; FeaturesImage other = (FeaturesImage) o; -return finalizedVersions.equals(other.finalizedVersions); +return finalizedVersions.equals(other.finalizedVersions) && Review Comment: should test metadataVersion here too, oops -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173082641 ## metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java: ## @@ -105,14 +130,15 @@ private void writeFeatureLevels(ImageWriter writer, ImageWriterOptions options) @Override public int hashCode() { -return finalizedVersions.hashCode(); +return Objects.hash(finalizedVersions, zkMigrationState); Review Comment: I think we should hash metadataVerison here too, oops -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173081897 ## metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java: ## @@ -37,19 +39,30 @@ * This class is thread-safe. */ public final class FeaturesImage { -public static final FeaturesImage EMPTY = new FeaturesImage(Collections.emptyMap(), MetadataVersion.MINIMUM_KRAFT_VERSION); +public static final FeaturesImage EMPTY = new FeaturesImage( +Collections.emptyMap(), +MetadataVersion.MINIMUM_KRAFT_VERSION, +ZkMigrationState.NONE +); private final Map finalizedVersions; private final MetadataVersion metadataVersion; -public FeaturesImage(Map finalizedVersions, MetadataVersion metadataVersion) { +private final ZkMigrationState zkMigrationState; + +public FeaturesImage( +Map finalizedVersions, +MetadataVersion metadataVersion, +ZkMigrationState zkMigrationState +) { this.finalizedVersions = Collections.unmodifiableMap(finalizedVersions); this.metadataVersion = metadataVersion; +this.zkMigrationState = zkMigrationState; } public boolean isEmpty() { -return finalizedVersions.isEmpty(); +return finalizedVersions.isEmpty() && zkMigrationState.equals(ZkMigrationState.NONE); Review Comment: I think we should also check that `metadataVersion` is equal to `MetadataVersion.MINIMUM_KRAFT_VERSION` here. This is an existing bug, oops! (although a very minor one that I doubt has an impact) Also let's add a unit test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173081897 ## metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java: ## @@ -37,19 +39,30 @@ * This class is thread-safe. */ public final class FeaturesImage { -public static final FeaturesImage EMPTY = new FeaturesImage(Collections.emptyMap(), MetadataVersion.MINIMUM_KRAFT_VERSION); +public static final FeaturesImage EMPTY = new FeaturesImage( +Collections.emptyMap(), +MetadataVersion.MINIMUM_KRAFT_VERSION, +ZkMigrationState.NONE +); private final Map finalizedVersions; private final MetadataVersion metadataVersion; -public FeaturesImage(Map finalizedVersions, MetadataVersion metadataVersion) { +private final ZkMigrationState zkMigrationState; + +public FeaturesImage( +Map finalizedVersions, +MetadataVersion metadataVersion, +ZkMigrationState zkMigrationState +) { this.finalizedVersions = Collections.unmodifiableMap(finalizedVersions); this.metadataVersion = metadataVersion; +this.zkMigrationState = zkMigrationState; } public boolean isEmpty() { -return finalizedVersions.isEmpty(); +return finalizedVersions.isEmpty() && zkMigrationState.equals(ZkMigrationState.NONE); Review Comment: I think we should also check that `metadataVersion` is equal to `MetadataVersion.MINIMUM_KRAFT_VERSION` here. This is an existing bug, oops! Also let's add a unit test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173080760 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1123,29 +1165,104 @@ private void claim(int epoch) { // Prepend the activate event. It is important that this event go at the beginning // of the queue rather than the end (hence prepend rather than append). It's also // important not to use prepend for anything else, to preserve the ordering here. -queue.prepend(new ControllerWriteEvent<>("completeActivation[" + epoch + "]", -new CompleteActivationEvent())); +ControllerWriteEvent activationEvent = new ControllerWriteEvent<>("completeActivation[" + epoch + "]", +new CompleteActivationEvent(), +EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, RUNS_IN_PREMIGRATION)); +activationEvent.future.whenComplete((__, t) -> { +if (t != null) { +fatalFaultHandler.handleFault("exception while activating controller", t); +} +}); +queue.prepend(activationEvent); } catch (Throwable e) { fatalFaultHandler.handleFault("exception while claiming leadership", e); } } +/** + * Generate the set of activation records. Until KIP-868 transactions are supported, these records + * are committed to the log as an atomic batch. The records will include the bootstrap metadata records + * (including the bootstrap "metadata.version") and may include a ZK migration record. + */ +public static void generateActivationRecords( +Logger log, +boolean isLogEmpty, +boolean zkMigrationEnabled, +BootstrapMetadata bootstrapMetadata, +FeatureControlManager featureControl, +Consumer recordConsumer +) { +if (isLogEmpty) { +// If no records have been replayed, we need to write out the bootstrap records. +// This will include the new metadata.version, as well as things like SCRAM +// initialization, etc. +log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " + +"at metadata.version {} from {}.", bootstrapMetadata.records().size(), +bootstrapMetadata.metadataVersion(), bootstrapMetadata.source()); +bootstrapMetadata.records().forEach(recordConsumer); + +if (bootstrapMetadata.metadataVersion().isMigrationSupported()) { +if (zkMigrationEnabled) { +log.info("Putting the controller into pre-migration mode. No metadata updates will be allowed until " + +"the ZK metadata has been migrated"); + recordConsumer.accept(ZkMigrationState.PRE_MIGRATION.toRecord()); +} else { +log.debug("Setting the ZK migration state to NONE since this is a de-novo KRaft cluster."); +recordConsumer.accept(ZkMigrationState.NONE.toRecord()); +} +} else { +if (zkMigrationEnabled) { +throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() + +" does not support ZK migrations. Cannot continue with ZK migrations enabled."); +} +} +} else { +// Logs have been replayed. We need to initialize some things here if upgrading from older KRaft versions +if (featureControl.metadataVersion().equals(MetadataVersion.MINIMUM_KRAFT_VERSION)) { +log.info("No metadata.version feature level record was found in the log. " + +"Treating the log as version {}.", MetadataVersion.MINIMUM_KRAFT_VERSION); +} + +if (featureControl.metadataVersion().isMigrationSupported()) { +log.info("Loaded ZK migration state of {}", featureControl.zkMigrationState()); +switch (featureControl.zkMigrationState()) { +case NONE: +// Since this is the default state there may or may not be an actual NONE in the log. Regardless, +// it will eventually be persisted in a snapshot, so we don't need to explicitly write it here. +if (zkMigrationEnabled) { +throw new RuntimeException("Should not have ZK migrations enabled on a cluster that was created in KRaft mode."); +} +break; +case PRE_MIGRATION: Review Comment: (So in other words, I think this is worthy of a `warn` level log, but not an exception) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173079770 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1123,29 +1165,104 @@ private void claim(int epoch) { // Prepend the activate event. It is important that this event go at the beginning // of the queue rather than the end (hence prepend rather than append). It's also // important not to use prepend for anything else, to preserve the ordering here. -queue.prepend(new ControllerWriteEvent<>("completeActivation[" + epoch + "]", -new CompleteActivationEvent())); +ControllerWriteEvent activationEvent = new ControllerWriteEvent<>("completeActivation[" + epoch + "]", +new CompleteActivationEvent(), +EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, RUNS_IN_PREMIGRATION)); +activationEvent.future.whenComplete((__, t) -> { +if (t != null) { +fatalFaultHandler.handleFault("exception while activating controller", t); +} +}); +queue.prepend(activationEvent); } catch (Throwable e) { fatalFaultHandler.handleFault("exception while claiming leadership", e); } } +/** + * Generate the set of activation records. Until KIP-868 transactions are supported, these records + * are committed to the log as an atomic batch. The records will include the bootstrap metadata records + * (including the bootstrap "metadata.version") and may include a ZK migration record. + */ +public static void generateActivationRecords( +Logger log, +boolean isLogEmpty, +boolean zkMigrationEnabled, +BootstrapMetadata bootstrapMetadata, +FeatureControlManager featureControl, +Consumer recordConsumer +) { +if (isLogEmpty) { +// If no records have been replayed, we need to write out the bootstrap records. +// This will include the new metadata.version, as well as things like SCRAM +// initialization, etc. +log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " + +"at metadata.version {} from {}.", bootstrapMetadata.records().size(), +bootstrapMetadata.metadataVersion(), bootstrapMetadata.source()); +bootstrapMetadata.records().forEach(recordConsumer); + +if (bootstrapMetadata.metadataVersion().isMigrationSupported()) { +if (zkMigrationEnabled) { +log.info("Putting the controller into pre-migration mode. No metadata updates will be allowed until " + +"the ZK metadata has been migrated"); + recordConsumer.accept(ZkMigrationState.PRE_MIGRATION.toRecord()); +} else { +log.debug("Setting the ZK migration state to NONE since this is a de-novo KRaft cluster."); +recordConsumer.accept(ZkMigrationState.NONE.toRecord()); +} +} else { +if (zkMigrationEnabled) { +throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() + +" does not support ZK migrations. Cannot continue with ZK migrations enabled."); +} +} +} else { +// Logs have been replayed. We need to initialize some things here if upgrading from older KRaft versions +if (featureControl.metadataVersion().equals(MetadataVersion.MINIMUM_KRAFT_VERSION)) { +log.info("No metadata.version feature level record was found in the log. " + +"Treating the log as version {}.", MetadataVersion.MINIMUM_KRAFT_VERSION); +} + +if (featureControl.metadataVersion().isMigrationSupported()) { +log.info("Loaded ZK migration state of {}", featureControl.zkMigrationState()); +switch (featureControl.zkMigrationState()) { +case NONE: +// Since this is the default state there may or may not be an actual NONE in the log. Regardless, +// it will eventually be persisted in a snapshot, so we don't need to explicitly write it here. +if (zkMigrationEnabled) { +throw new RuntimeException("Should not have ZK migrations enabled on a cluster that was created in KRaft mode."); +} +break; +case PRE_MIGRATION: +throw new RuntimeException("Detected an failed migration state during bootstrap, cannot continue."); +case MIGRATION: +if (!zkMigrationEnabled) { +
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173079249 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1123,29 +1165,104 @@ private void claim(int epoch) { // Prepend the activate event. It is important that this event go at the beginning // of the queue rather than the end (hence prepend rather than append). It's also // important not to use prepend for anything else, to preserve the ordering here. -queue.prepend(new ControllerWriteEvent<>("completeActivation[" + epoch + "]", -new CompleteActivationEvent())); +ControllerWriteEvent activationEvent = new ControllerWriteEvent<>("completeActivation[" + epoch + "]", +new CompleteActivationEvent(), +EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, RUNS_IN_PREMIGRATION)); +activationEvent.future.whenComplete((__, t) -> { +if (t != null) { +fatalFaultHandler.handleFault("exception while activating controller", t); +} +}); +queue.prepend(activationEvent); } catch (Throwable e) { fatalFaultHandler.handleFault("exception while claiming leadership", e); } } +/** + * Generate the set of activation records. Until KIP-868 transactions are supported, these records + * are committed to the log as an atomic batch. The records will include the bootstrap metadata records + * (including the bootstrap "metadata.version") and may include a ZK migration record. + */ +public static void generateActivationRecords( +Logger log, +boolean isLogEmpty, +boolean zkMigrationEnabled, +BootstrapMetadata bootstrapMetadata, +FeatureControlManager featureControl, +Consumer recordConsumer +) { +if (isLogEmpty) { +// If no records have been replayed, we need to write out the bootstrap records. +// This will include the new metadata.version, as well as things like SCRAM +// initialization, etc. +log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " + +"at metadata.version {} from {}.", bootstrapMetadata.records().size(), +bootstrapMetadata.metadataVersion(), bootstrapMetadata.source()); +bootstrapMetadata.records().forEach(recordConsumer); + +if (bootstrapMetadata.metadataVersion().isMigrationSupported()) { +if (zkMigrationEnabled) { +log.info("Putting the controller into pre-migration mode. No metadata updates will be allowed until " + +"the ZK metadata has been migrated"); + recordConsumer.accept(ZkMigrationState.PRE_MIGRATION.toRecord()); +} else { +log.debug("Setting the ZK migration state to NONE since this is a de-novo KRaft cluster."); +recordConsumer.accept(ZkMigrationState.NONE.toRecord()); +} +} else { +if (zkMigrationEnabled) { +throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() + +" does not support ZK migrations. Cannot continue with ZK migrations enabled."); +} +} +} else { +// Logs have been replayed. We need to initialize some things here if upgrading from older KRaft versions +if (featureControl.metadataVersion().equals(MetadataVersion.MINIMUM_KRAFT_VERSION)) { +log.info("No metadata.version feature level record was found in the log. " + +"Treating the log as version {}.", MetadataVersion.MINIMUM_KRAFT_VERSION); +} + +if (featureControl.metadataVersion().isMigrationSupported()) { +log.info("Loaded ZK migration state of {}", featureControl.zkMigrationState()); +switch (featureControl.zkMigrationState()) { +case NONE: +// Since this is the default state there may or may not be an actual NONE in the log. Regardless, +// it will eventually be persisted in a snapshot, so we don't need to explicitly write it here. +if (zkMigrationEnabled) { +throw new RuntimeException("Should not have ZK migrations enabled on a cluster that was created in KRaft mode."); +} +break; +case PRE_MIGRATION: +throw new RuntimeException("Detected an failed migration state during bootstrap, cannot continue."); +case MIGRATION: +if (!zkMigrationEnabled) { +
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173078385 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1123,29 +1165,104 @@ private void claim(int epoch) { // Prepend the activate event. It is important that this event go at the beginning // of the queue rather than the end (hence prepend rather than append). It's also // important not to use prepend for anything else, to preserve the ordering here. -queue.prepend(new ControllerWriteEvent<>("completeActivation[" + epoch + "]", -new CompleteActivationEvent())); +ControllerWriteEvent activationEvent = new ControllerWriteEvent<>("completeActivation[" + epoch + "]", +new CompleteActivationEvent(), +EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, RUNS_IN_PREMIGRATION)); +activationEvent.future.whenComplete((__, t) -> { +if (t != null) { +fatalFaultHandler.handleFault("exception while activating controller", t); +} +}); +queue.prepend(activationEvent); } catch (Throwable e) { fatalFaultHandler.handleFault("exception while claiming leadership", e); } } +/** + * Generate the set of activation records. Until KIP-868 transactions are supported, these records + * are committed to the log as an atomic batch. The records will include the bootstrap metadata records + * (including the bootstrap "metadata.version") and may include a ZK migration record. + */ +public static void generateActivationRecords( +Logger log, +boolean isLogEmpty, +boolean zkMigrationEnabled, +BootstrapMetadata bootstrapMetadata, +FeatureControlManager featureControl, +Consumer recordConsumer +) { +if (isLogEmpty) { +// If no records have been replayed, we need to write out the bootstrap records. +// This will include the new metadata.version, as well as things like SCRAM +// initialization, etc. +log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " + +"at metadata.version {} from {}.", bootstrapMetadata.records().size(), +bootstrapMetadata.metadataVersion(), bootstrapMetadata.source()); +bootstrapMetadata.records().forEach(recordConsumer); + +if (bootstrapMetadata.metadataVersion().isMigrationSupported()) { +if (zkMigrationEnabled) { +log.info("Putting the controller into pre-migration mode. No metadata updates will be allowed until " + +"the ZK metadata has been migrated"); + recordConsumer.accept(ZkMigrationState.PRE_MIGRATION.toRecord()); +} else { +log.debug("Setting the ZK migration state to NONE since this is a de-novo KRaft cluster."); +recordConsumer.accept(ZkMigrationState.NONE.toRecord()); +} +} else { +if (zkMigrationEnabled) { +throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() + +" does not support ZK migrations. Cannot continue with ZK migrations enabled."); +} +} +} else { +// Logs have been replayed. We need to initialize some things here if upgrading from older KRaft versions +if (featureControl.metadataVersion().equals(MetadataVersion.MINIMUM_KRAFT_VERSION)) { +log.info("No metadata.version feature level record was found in the log. " + +"Treating the log as version {}.", MetadataVersion.MINIMUM_KRAFT_VERSION); +} + +if (featureControl.metadataVersion().isMigrationSupported()) { +log.info("Loaded ZK migration state of {}", featureControl.zkMigrationState()); +switch (featureControl.zkMigrationState()) { +case NONE: +// Since this is the default state there may or may not be an actual NONE in the log. Regardless, +// it will eventually be persisted in a snapshot, so we don't need to explicitly write it here. +if (zkMigrationEnabled) { +throw new RuntimeException("Should not have ZK migrations enabled on a cluster that was created in KRaft mode."); +} +break; +case PRE_MIGRATION: Review Comment: I don't think this is correct. We could have the following sequence: 1. controller 0 writes migration record setting PRE_MIGRATION state 2. controller 0 tries to load ZK image, fails for s
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173076552 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1123,29 +1165,104 @@ private void claim(int epoch) { // Prepend the activate event. It is important that this event go at the beginning // of the queue rather than the end (hence prepend rather than append). It's also // important not to use prepend for anything else, to preserve the ordering here. -queue.prepend(new ControllerWriteEvent<>("completeActivation[" + epoch + "]", -new CompleteActivationEvent())); +ControllerWriteEvent activationEvent = new ControllerWriteEvent<>("completeActivation[" + epoch + "]", +new CompleteActivationEvent(), +EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, RUNS_IN_PREMIGRATION)); +activationEvent.future.whenComplete((__, t) -> { +if (t != null) { +fatalFaultHandler.handleFault("exception while activating controller", t); +} +}); +queue.prepend(activationEvent); } catch (Throwable e) { fatalFaultHandler.handleFault("exception while claiming leadership", e); } } +/** + * Generate the set of activation records. Until KIP-868 transactions are supported, these records + * are committed to the log as an atomic batch. The records will include the bootstrap metadata records + * (including the bootstrap "metadata.version") and may include a ZK migration record. + */ +public static void generateActivationRecords( Review Comment: Can we just return a `List`? I don't think we're creating enough records to make a new list an efficiency concern... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173073635 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1123,29 +1165,104 @@ private void claim(int epoch) { // Prepend the activate event. It is important that this event go at the beginning // of the queue rather than the end (hence prepend rather than append). It's also // important not to use prepend for anything else, to preserve the ordering here. -queue.prepend(new ControllerWriteEvent<>("completeActivation[" + epoch + "]", -new CompleteActivationEvent())); +ControllerWriteEvent activationEvent = new ControllerWriteEvent<>("completeActivation[" + epoch + "]", +new CompleteActivationEvent(), +EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, RUNS_IN_PREMIGRATION)); +activationEvent.future.whenComplete((__, t) -> { +if (t != null) { +fatalFaultHandler.handleFault("exception while activating controller", t); Review Comment: Yeah, this is fair, I guess. We should invoke the fault handler if we have a failure in the activation event. Although we'll already catch non-`ApiException`. But still. A good check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173072509 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1085,7 +1123,11 @@ private void maybeCompleteAuthorizerInitialLoad() { } private boolean isActiveController() { -return curClaimEpoch != -1; +return isActiveController(curClaimEpoch); +} + +private boolean isActiveController(int claimEpoch) { Review Comment: function can be `static` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173069848 ## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ## @@ -304,6 +338,13 @@ public void replay(FeatureLevelRecord record) { } } +public void replay(ZkMigrationStateRecord record) { +ZkMigrationState recordState = ZkMigrationState.of(record.zkMigrationState()); +ZkMigrationState currentState = migrationControlState.get(); +log.info("Transitioning ZK migration state from {} to {}", currentState, recordState); Review Comment: Maybe we should not log this (or log at DEBUG only) if the current state and new state are the same? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173068660 ## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ## @@ -232,13 +248,20 @@ private ApiError updateMetadataVersion( Consumer recordConsumer ) { MetadataVersion currentVersion = metadataVersion(); +ZkMigrationState zkMigrationState = zkMigrationState(); final MetadataVersion newVersion; try { newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel); } catch (IllegalArgumentException e) { return invalidMetadataVersion(newVersionLevel, "Unknown metadata.version."); } +// Don't allow metadata.version changes while we're migrating +if (EnumSet.of(ZkMigrationState.PRE_MIGRATION, ZkMigrationState.MIGRATION).contains(zkMigrationState)) { Review Comment: Hmm, seems like this might come up in a bunch of places. Maybe we can have a member function in `ZkMigrationState.java` like boolean inProgress() { return this == PRE_MIGRATION || this == POST_MIGRATION; } ``` ## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ## @@ -232,13 +248,20 @@ private ApiError updateMetadataVersion( Consumer recordConsumer ) { MetadataVersion currentVersion = metadataVersion(); +ZkMigrationState zkMigrationState = zkMigrationState(); final MetadataVersion newVersion; try { newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel); } catch (IllegalArgumentException e) { return invalidMetadataVersion(newVersionLevel, "Unknown metadata.version."); } +// Don't allow metadata.version changes while we're migrating +if (EnumSet.of(ZkMigrationState.PRE_MIGRATION, ZkMigrationState.MIGRATION).contains(zkMigrationState)) { Review Comment: Hmm, seems like this might come up in a bunch of places. Maybe we can have a member function in `ZkMigrationState.java` like ``` boolean inProgress() { return this == PRE_MIGRATION || this == POST_MIGRATION; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173068660 ## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ## @@ -232,13 +248,20 @@ private ApiError updateMetadataVersion( Consumer recordConsumer ) { MetadataVersion currentVersion = metadataVersion(); +ZkMigrationState zkMigrationState = zkMigrationState(); final MetadataVersion newVersion; try { newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel); } catch (IllegalArgumentException e) { return invalidMetadataVersion(newVersionLevel, "Unknown metadata.version."); } +// Don't allow metadata.version changes while we're migrating +if (EnumSet.of(ZkMigrationState.PRE_MIGRATION, ZkMigrationState.MIGRATION).contains(zkMigrationState)) { Review Comment: Hmm, seems like this might come up in a bunch of places. Maybe we can have something like `boolean ZkMigrationState#inProgress() { return this == PRE_MIGRATION || this == POST_MIGRATION; }`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173066452 ## core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala: ## @@ -145,25 +146,29 @@ class BrokerRegistrationRequestTest { } @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3, -serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true"))) - def testRegisterZkWithKRaftOldMetadataVersion(clusterInstance: ClusterInstance): Unit = { +serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "false"))) Review Comment: Do we have a test of trying to start the controller with `zookeeper.metadata.migration.enable = true` and a bootstrap MV that's too old? It should exit the controller, right? (Due to throwing that RuntimeException)? It would be good to have an integration test of that... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173062692 ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -324,4 +329,29 @@ class ZkMigrationIntegrationTest { assertTrue(firstProducerIdBlock.firstProducerId() < producerIdBlock.firstProducerId()) } } + + /** + * Start a KRaft cluster with migrations enabled, verify that the controller does not accept metadata changes + * through the RPCs + */ Review Comment: We should also explain in this comment why the migration does not proceed beyond premigration (I assume it's because we're waiting for brokers and we don't have any due to brokers = 0) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173062239 ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -324,4 +329,29 @@ class ZkMigrationIntegrationTest { assertTrue(firstProducerIdBlock.firstProducerId() < producerIdBlock.firstProducerId()) } } + + /** + * Start a KRaft cluster with migrations enabled, verify that the controller does not accept metadata changes + * through the RPCs + */ + @ClusterTests(Array( +new ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0, + serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true"))), +new ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_5_IV0, + serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true"))), +new ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_5_IV1, + serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true"))) + )) + def testPreMigrationMode(clusterInstance: ClusterInstance): Unit = { Review Comment: Hmm... I don't see how this can work without brokers, since the admin client needs to bootstrap through broker servers, right? So I don't think this is testing what you might think. You might just have to send a raw RPC, as unpleasant as that is to do. That will also allow you to verify that you get NOT_CONTROLLER. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] szalapski commented on pull request #5876: KAFKA-7509: Avoid passing most non-applicable properties to producer, consumer, and admin client
szalapski commented on PR #5876: URL: https://github.com/apache/kafka/pull/5876#issuecomment-1516905377 Sad that this didn't get in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1173050527 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. Review Comment: okay -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
philipnee commented on PR #12149: URL: https://github.com/apache/kafka/pull/12149#issuecomment-1516893617 Failing tests don't seem to be related. ``` Build / JDK 8 and Scala 2.12 / testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest 1m 46s Build / JDK 8 and Scala 2.12 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest 2m 26s Build / JDK 8 and Scala 2.12 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest 1m 46s Build / JDK 11 and Scala 2.13 / testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest 3m 15s Build / JDK 11 and Scala 2.13 / testCreatePartitionsUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest 2m 3s Build / JDK 11 and Scala 2.13 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest 2m 41s Build / JDK 11 and Scala 2.13 / testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest 2m 23s Build / JDK 11 and Scala 2.13 / testCreatePartitionsUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest 1m 55s Build / JDK 11 and Scala 2.13 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest 2m 7s Build / JDK 11 and Scala 2.13 / testWithGroupMetadata() – kafka.api.TransactionsBounceTest 1m 22s Build / JDK 11 and Scala 2.13 / testCreateClusterAndCreateAndManyTopics() – kafka.server.KRaftClusterTest 18s Build / JDK 17 and Scala 2.13 / testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest 2m 14s Build / JDK 17 and Scala 2.13 / testCreatePartitionsUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest 2m 22s Build / JDK 17 and Scala 2.13 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest 2m 16s Build / JDK 17 and Scala 2.13 / testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest 2m 16s Build / JDK 17 and Scala 2.13 / testCreatePartitionsUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest 2m 53s Build / JDK 17 and Scala 2.13 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest 2m 13s Build / JDK 17 and Scala 2.13 / testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest 1m 14s Existing failures - 3 Build / JDK 8 and Scala 2.12 / putTopicStateRetriableFailure – org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest <1s Build / JDK 11 and Scala 2.13 / putTopicStateRetriableFailure – org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest <1s Build / JDK 17 and Scala 2.13 / putTopicStateRetriableFailure – org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
philipnee commented on PR #12149: URL: https://github.com/apache/kafka/pull/12149#issuecomment-1516893160 Hey @jolshan - Thanks for the review. i reverted those documentation/comment changes in the senderTest.java (for the produceResponse authorization error). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14904) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()
[ https://issues.apache.org/jira/browse/KAFKA-14904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-14904. Resolution: Fixed > Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId() > -- > > Key: KAFKA-14904 > URL: https://issues.apache.org/jira/browse/KAFKA-14904 > Project: Kafka > Issue Type: Test >Affects Versions: 3.5.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > > After merging KAFKA-14561 I noticed this test still occasionally failed via > org.apache.kafka.common.errors.TimeoutException: Timeout expired after > 6ms while awaiting EndTxn(true) > I will investigate the cause. > Note: This error occurs when we are waiting for the transaction to be > committed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-14884) Include check transaction is still ongoing right before append
[ https://issues.apache.org/jira/browse/KAFKA-14884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan reopened KAFKA-14884: I'm confused by all my blockers 🤦♀️ > Include check transaction is still ongoing right before append > --- > > Key: KAFKA-14884 > URL: https://issues.apache.org/jira/browse/KAFKA-14884 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.5.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > > Even after checking via AddPartitionsToTxn, the transaction could be aborted > after the response. We can add one more check before appending. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata
jeffkbkim commented on code in PR #13537: URL: https://github.com/apache/kafka/pull/13537#discussion_r1172953390 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ClientAssignor.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; + +import java.nio.ByteBuffer; +import java.util.Objects; + +/** + * An immutable representation of a client side assignor within a consumer group member. + */ +public class ClientAssignor { +/** + * The name of the assignor. + */ +private final String name; + +/** + * The reason reported by the assignor. + */ +private final byte reason; + +/** + * The minimum metadata version supported by the assignor. + */ +private final short minimumVersion; + +/** + * The maximum metadata version supported by the assignor. + */ +private final short maximumVersion; + +/** + * The versioned metadata. + */ +private final VersionedMetadata metadata; Review Comment: sorry to have hijacked this thread, but something like ``` /** * The error assigned to the member. Depends on the client assignor * implementation and commonly used for Streams */ private final byte error; ``` if you think what we have is sufficient, i am fine with it. just wanted to mention that i didn't fully understand how it would be used -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata
dajac commented on code in PR #13537: URL: https://github.com/apache/kafka/pull/13537#discussion_r1172945841 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ClientAssignor.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; + +import java.nio.ByteBuffer; +import java.util.Objects; + +/** + * An immutable representation of a client side assignor within a consumer group member. + */ +public class ClientAssignor { +/** + * The name of the assignor. + */ +private final String name; + +/** + * The reason reported by the assignor. + */ +private final byte reason; + +/** + * The minimum metadata version supported by the assignor. + */ +private final short minimumVersion; + +/** + * The maximum metadata version supported by the assignor. + */ +private final short maximumVersion; + +/** + * The versioned metadata. + */ +private final VersionedMetadata metadata; + +public ClientAssignor( +String name, +byte reason, +short minimumVersion, +short maximumVersion, +VersionedMetadata metadata +) { +this.name = Objects.requireNonNull(name); +this.reason = reason; +this.minimumVersion = minimumVersion; Review Comment: I think that I have added the ones where relevant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata
dajac commented on code in PR #13537: URL: https://github.com/apache/kafka/pull/13537#discussion_r1172946358 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ClientAssignorTest.java: ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ClientAssignorTest { + +@Test +public void testNameAndMetadataCannotBeBull() { Review Comment: This is not necessary in my opinion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] emissionnebula commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures
emissionnebula commented on code in PR #13437: URL: https://github.com/apache/kafka/pull/13437#discussion_r1172848492 ## server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java: ## @@ -225,77 +225,59 @@ public void testDelegationOfReplaceAll() { new PCollectionsHashMapWrapperDelegationChecker<>() .defineMockConfigurationForVoidMethodInvocation(mock -> mock.replaceAll(eq(mockBiFunction))) .defineWrapperVoidMethodInvocation(wrapper -> wrapper.replaceAll(mockBiFunction)) -.doVoidMethodDelegationCheck(); +.doUnsupportedVoidFunctionDelegrationCheck(); } @Test public void testDelegationOfPutIfAbsent() { new PCollectionsHashMapWrapperDelegationChecker<>() -.defineMockConfigurationForFunctionInvocation(mock -> mock.putIfAbsent(eq(this), eq(this)), this) - .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.putIfAbsent(this, this), identity()) -.doFunctionDelegationCheck(); -} - -@ParameterizedTest -@ValueSource(booleans = {true, false}) -public void testDelegationOfRemoveByKeyAndValue(boolean mockFunctionReturnValue) { -new PCollectionsHashMapWrapperDelegationChecker<>() -.defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this), eq(this)), mockFunctionReturnValue) - .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this, this), identity()) -.doFunctionDelegationCheck(); -} - -@ParameterizedTest -@ValueSource(booleans = {true, false}) -public void testDelegationOfReplaceWhenMappedToSpecificValue(boolean mockFunctionReturnValue) { -new PCollectionsHashMapWrapperDelegationChecker<>() -.defineMockConfigurationForFunctionInvocation(mock -> mock.replace(eq(this), eq(this), eq(this)), mockFunctionReturnValue) - .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.replace(this, this, this), identity()) -.doFunctionDelegationCheck(); Review Comment: These two functions are not overridden by PCollection classes. `mock.replace()` or `mock.remove()` will call Java's default implementation in `Map.replace()` or `AbstractMap.remove()`. Mocking and testing that I felt is not required. For example: `Map.replace()` looks like this: ``` default boolean replace(K key, V oldValue, V newValue) { Object curValue = get(key); if (!Objects.equals(curValue, oldValue) || (curValue == null && !containsKey(key))) { return false; } put(key, newValue); return true; } ``` Since it internally calls `put()` which we are already testing, so that's why I thought it was not necessary to test these functions. Likewise for `remove()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata
jolshan commented on code in PR #13537: URL: https://github.com/apache/kafka/pull/13537#discussion_r1172939909 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ClientAssignorTest.java: ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ClientAssignorTest { + +@Test +public void testNameAndMetadataCannotBeBull() { Review Comment: also did we want tests for some of the other errors we throw? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata
jolshan commented on code in PR #13537: URL: https://github.com/apache/kafka/pull/13537#discussion_r1172939598 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ClientAssignorTest.java: ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ClientAssignorTest { + +@Test +public void testNameAndMetadataCannotBeBull() { Review Comment: nit: Null -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata
jolshan commented on code in PR #13537: URL: https://github.com/apache/kafka/pull/13537#discussion_r1172939142 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ClientAssignor.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; + +import java.nio.ByteBuffer; +import java.util.Objects; + +/** + * An immutable representation of a client side assignor within a consumer group member. + */ +public class ClientAssignor { +/** + * The name of the assignor. + */ +private final String name; + +/** + * The reason reported by the assignor. + */ +private final byte reason; + +/** + * The minimum metadata version supported by the assignor. + */ +private final short minimumVersion; + +/** + * The maximum metadata version supported by the assignor. + */ +private final short maximumVersion; + +/** + * The versioned metadata. + */ +private final VersionedMetadata metadata; + +public ClientAssignor( +String name, +byte reason, +short minimumVersion, +short maximumVersion, +VersionedMetadata metadata +) { +this.name = Objects.requireNonNull(name); +this.reason = reason; +this.minimumVersion = minimumVersion; Review Comment: There were a few others in other classes. Did we want to do the same for those? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13572: KAFKA-14908: Set setReuseAddress on the kafka server socket
divijvaidya commented on PR #13572: URL: https://github.com/apache/kafka/pull/13572#issuecomment-1516728614 Hey @cmccabe, we can have a community discussion over the mailing list but I am not following one of your statements here. > since it will allow multiple Kafka servers to be started on the same port Can you please help me understand why? I am asking this because AFAIK (correct me if I am wrong), all that this flag does is to allow re-use of a socket which is in TIME_WAIT state. If it is in another state, i.e. it is being used by a different Kafka process, the binding is not allowed, hence, two Kafka servers cannot run on the same port (or rather in this context, reuse the socket). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13572: KAFKA-14908: Set setReuseAddress on the kafka server socket
cmccabe commented on PR #13572: URL: https://github.com/apache/kafka/pull/13572#issuecomment-1516694321 Hi all, Unfortunately I had to revert this. This is a change to our public API since it will allow multiple Kafka servers to be started on the same port. This kind of change needs a KIP (Kafka Improvement Proposal) so we can discuss the pros and cons. The change also does very different things on different operating systems. There is a rundown here: https://stackoverflow.com/questions/14388706/how-do-so-reuseaddr-and-so-reuseport-differ > Security: SO_REUSEADDR can allow an attacker to perform DDOS by creating a genuine connection with same IP and port. Prior to this change, the TIME_WAIT state of the socket would have prevented immediate re-connection. This is an acceptable risk for Kafka because we have connection throttling available in code for IP addresses and a user may choose to configure it to prevent a DDOS. Again, this needs a KIP so we can discuss whether the increased DDOS risk is acceptable or not. The answer may be different for different users. Personally, I think that at minimum SO_REUSEADDR should be an option (not mandatory) and probably not the default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1172892464 ## clients/src/test/java/org/apache/kafka/common/utils/SkippableChunkedBytesStreamTest.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class SkippableChunkedBytesStreamTest { +private static final Random RANDOM = new Random(1337); +private final BufferSupplier supplier = BufferSupplier.NO_CACHING; + +@ParameterizedTest +@MethodSource("provideSourceSkipValuesForTest") +public void skip_testCorrectness(int bytesToPreRead, ByteBuffer inputBuf, int numBytesToSkip) throws IOException { +int expectedInpLeftAfterSkip = inputBuf.remaining() - bytesToPreRead - numBytesToSkip; +int expectedSkippedBytes = Math.min(inputBuf.remaining() - bytesToPreRead, numBytesToSkip); + +try (BytesStream is = new ChunkedBytesStream(new ByteBufferInputStream(inputBuf.duplicate()), supplier, 10)) { +int cnt = 0; +while (cnt++ < bytesToPreRead) { +is.readByte(); +} + +int res = is.skipBytes(numBytesToSkip); +assertEquals(expectedSkippedBytes, res); + +// verify that we are able to read rest of the input +cnt = 0; +while (cnt++ < expectedInpLeftAfterSkip) { +is.readByte(); +} +} +} + +@Test +public void skip_testEndOfSource() throws IOException { Review Comment: Fixed in latest commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1172889948 ## clients/src/main/java/org/apache/kafka/common/utils/SkippableChunkedBytesStream.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.io.IOException; +import java.io.InputStream; + +/** + * SkippableChunkedBytesStream is a variant of ChunkedBytesStream which does not push skip() to the sourceStream. + * + * Unlike BufferedInputStream.skip() and ChunkedBytesStream.skip(), this does not push skip() to sourceStream. + * We want to avoid pushing this to sourceStream because it's implementation maybe inefficient, e.g. the case of Z + * stdInputStream which allocates a new buffer from buffer pool, per skip call. + * + * @see ChunkedBytesStream + */ +public class SkippableChunkedBytesStream extends ChunkedBytesStream { Review Comment: 1. done. I removed this SkippableChunkedBytesStream and control this behaviour in ChunkedBytesStream using a flag. 2. No. The change I made was to introduce a new Stream which doesn't require us to create a buffer when passing compressed data to Zstd. The new stream was added in [ZstdBufferDecompressingStreamNoFinalizer](https://github.com/luben/zstd-jni/pull/244) but it's usage in Zstd is pending completion of https://github.com/luben/zstd-jni/issues/252 which I plan to pick up next week. The migration to using this new Stream will be done in a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1172884561 ## clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * ChunkedBytesStream is a ByteReader which reads from source stream in chunks of configurable size. The + * implementation of this reader is optimized to reduce the number of calls to sourceStream#read(). This works best in + * scenarios where sourceStream#read() call is expensive, e.g. when the call crosses JNI boundary. + * + * The functionality of this stream is a combination of DataInput and BufferedInputStream with the following + * differences: + * - Unlike BufferedInputStream.skip() Review Comment: Fixed. ## clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * ChunkedBytesStream is a ByteReader which reads from source stream in chunks of configurable size. The + * implementation of this reader is optimized to reduce the number of calls to sourceStream#read(). This works best in + * scenarios where sourceStream#read() call is expensive, e.g. when the call crosses JNI boundary. + * + * The functionality of this stream is a combination of DataInput and BufferedInputStream with the following + * differences: + * - Unlike BufferedInputStream.skip() + * - Unlike BufferedInputStream, which allocates an intermediate buffer, this uses a buffer supplier to create the + * intermediate buffer + * - Unlike DataInputStream, the readByte method does not push the reading of a byte to sourceStream. + * + * Note that: + * - this class is not thread safe and shouldn't be used in scenarios where multiple threads access this. + * - many method are un-supported in this class because they aren't currently used in the caller code. Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1172883282 ## clients/src/main/java/org/apache/kafka/common/record/CompressionType.java: ## @@ -157,7 +186,14 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu *batch. As such, a supplier that reuses buffers will have a significant *performance impact. */ -public abstract InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier); +public abstract BytesStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier); + +/** + * Recommended size of buffer for storing decompressed output. + */ +public int getRecommendedDOutSize() { Review Comment: changed in latest commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1172883027 ## clients/src/main/java/org/apache/kafka/common/record/CompressionType.java: ## @@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier); } + +@Override +public int getRecommendedDOutSize() { +return 16 * 1024; // 16KB Review Comment: It was 16KB itself (+2KB for skipBuffer which we removed). https://github.com/apache/kafka/blob/ef09a2e3fc11a738f6681fd57fb84ad109593fd3/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java#L68 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1172882018 ## clients/src/main/java/org/apache/kafka/common/record/CompressionType.java: ## @@ -108,12 +117,18 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer @Override public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { try { -return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier, -messageVersion == RecordBatch.MAGIC_VALUE_V0); +return new ChunkedDataInputStream( +new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier, messageVersion == RecordBatch.MAGIC_VALUE_V0), +decompressionBufferSupplier, getRecommendedDOutSize()); } catch (Throwable e) { throw new KafkaException(e); } } + +@Override +public int getRecommendedDOutSize() { +return 2 * 1024; // 2KB Review Comment: Done in latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1172881788 ## clients/src/main/java/org/apache/kafka/common/record/CompressionType.java: ## @@ -90,8 +95,13 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer } @Override -public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { -return SnappyFactory.wrapForInput(buffer); +public BytesStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { +return new SkippableChunkedBytesStream(SnappyFactory.wrapForInput(buffer), decompressionBufferSupplier, getRecommendedDOutSize()); +} + +@Override +public int getRecommendedDOutSize() { +return 8 * 1024; // 8KB Review Comment: I honestly don't remember now. I changed it back to 2KB and benchmarked again, it didn't change anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14884) Include check transaction is still ongoing right before append
[ https://issues.apache.org/jira/browse/KAFKA-14884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-14884. Resolution: Fixed > Include check transaction is still ongoing right before append > --- > > Key: KAFKA-14884 > URL: https://issues.apache.org/jira/browse/KAFKA-14884 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.5.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > > Even after checking via AddPartitionsToTxn, the transaction could be aborted > after the response. We can add one more check before appending. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14916) Fix code that assumes transactional ID implies all records are transactional
[ https://issues.apache.org/jira/browse/KAFKA-14916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-14916: --- Description: KAFKA-14561 wrote code that assumed that if a transactional ID was included, all record batches were transactional and had the same producer ID. This work with improve validation and fix the code that assumes all batches are transactional. Currently the code does not enforce that there can not be differing producer IDs. This will be enforced. Further, KAFKA-14561 will not assume all records are transactional. was: KAFKA-14561 wrote code that assumed that if a transactional ID was included, all record batches were transactional and had the same producer ID. This work with improve validation and fix the code that assumes all batches are transactional. > Fix code that assumes transactional ID implies all records are transactional > > > Key: KAFKA-14916 > URL: https://issues.apache.org/jira/browse/KAFKA-14916 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > > KAFKA-14561 wrote code that assumed that if a transactional ID was included, > all record batches were transactional and had the same producer ID. > This work with improve validation and fix the code that assumes all batches > are transactional. > Currently the code does not enforce that there can not be differing producer > IDs. This will be enforced. > Further, KAFKA-14561 will not assume all records are transactional. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] emissionnebula commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures
emissionnebula commented on code in PR #13437: URL: https://github.com/apache/kafka/pull/13437#discussion_r1172850344 ## server-common/src/test/java/org/apache/kafka/server/immutable/DelegationChecker.java: ## @@ -139,6 +163,16 @@ public void doFunctionDelegationCheck() { } } +public void doUnsupportedFunctionDelegrationCheck() { Review Comment: yup, I thought it made sense to test it explicitly. Updated the functions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] emissionnebula commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures
emissionnebula commented on code in PR #13437: URL: https://github.com/apache/kafka/pull/13437#discussion_r1172848492 ## server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java: ## @@ -225,77 +225,59 @@ public void testDelegationOfReplaceAll() { new PCollectionsHashMapWrapperDelegationChecker<>() .defineMockConfigurationForVoidMethodInvocation(mock -> mock.replaceAll(eq(mockBiFunction))) .defineWrapperVoidMethodInvocation(wrapper -> wrapper.replaceAll(mockBiFunction)) -.doVoidMethodDelegationCheck(); +.doUnsupportedVoidFunctionDelegrationCheck(); } @Test public void testDelegationOfPutIfAbsent() { new PCollectionsHashMapWrapperDelegationChecker<>() -.defineMockConfigurationForFunctionInvocation(mock -> mock.putIfAbsent(eq(this), eq(this)), this) - .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.putIfAbsent(this, this), identity()) -.doFunctionDelegationCheck(); -} - -@ParameterizedTest -@ValueSource(booleans = {true, false}) -public void testDelegationOfRemoveByKeyAndValue(boolean mockFunctionReturnValue) { -new PCollectionsHashMapWrapperDelegationChecker<>() -.defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this), eq(this)), mockFunctionReturnValue) - .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this, this), identity()) -.doFunctionDelegationCheck(); -} - -@ParameterizedTest -@ValueSource(booleans = {true, false}) -public void testDelegationOfReplaceWhenMappedToSpecificValue(boolean mockFunctionReturnValue) { -new PCollectionsHashMapWrapperDelegationChecker<>() -.defineMockConfigurationForFunctionInvocation(mock -> mock.replace(eq(this), eq(this), eq(this)), mockFunctionReturnValue) - .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.replace(this, this, this), identity()) -.doFunctionDelegationCheck(); Review Comment: These two functions are not overridden by PCollection classes. `mock.replace()` or `mock.remove()` will call Java's default implementation in `Map.replace()` or `AbstractMap.remove()`. Mocking that testing that seemed not required. For example: `Map.replace()` looks like this: ``` default boolean replace(K key, V oldValue, V newValue) { Object curValue = get(key); if (!Objects.equals(curValue, oldValue) || (curValue == null && !containsKey(key))) { return false; } put(key, newValue); return true; } ``` Since it internally calls `put()` which we are already testing, so that's why I thought it was not necessary to test these functions. Likewise for `remove()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] emissionnebula commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures
emissionnebula commented on code in PR #13437: URL: https://github.com/apache/kafka/pull/13437#discussion_r1172848492 ## server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java: ## @@ -225,77 +225,59 @@ public void testDelegationOfReplaceAll() { new PCollectionsHashMapWrapperDelegationChecker<>() .defineMockConfigurationForVoidMethodInvocation(mock -> mock.replaceAll(eq(mockBiFunction))) .defineWrapperVoidMethodInvocation(wrapper -> wrapper.replaceAll(mockBiFunction)) -.doVoidMethodDelegationCheck(); +.doUnsupportedVoidFunctionDelegrationCheck(); } @Test public void testDelegationOfPutIfAbsent() { new PCollectionsHashMapWrapperDelegationChecker<>() -.defineMockConfigurationForFunctionInvocation(mock -> mock.putIfAbsent(eq(this), eq(this)), this) - .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.putIfAbsent(this, this), identity()) -.doFunctionDelegationCheck(); -} - -@ParameterizedTest -@ValueSource(booleans = {true, false}) -public void testDelegationOfRemoveByKeyAndValue(boolean mockFunctionReturnValue) { -new PCollectionsHashMapWrapperDelegationChecker<>() -.defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this), eq(this)), mockFunctionReturnValue) - .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this, this), identity()) -.doFunctionDelegationCheck(); -} - -@ParameterizedTest -@ValueSource(booleans = {true, false}) -public void testDelegationOfReplaceWhenMappedToSpecificValue(boolean mockFunctionReturnValue) { -new PCollectionsHashMapWrapperDelegationChecker<>() -.defineMockConfigurationForFunctionInvocation(mock -> mock.replace(eq(this), eq(this), eq(this)), mockFunctionReturnValue) - .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.replace(this, this, this), identity()) -.doFunctionDelegationCheck(); Review Comment: These two functions are not overridden by PCollection classes. `mock.replace()` or `mock.remove()` will call Java's default implementation in `Map.replace()` or `AbstractMap.remove()`. Mocking that testing that seemed not required. For example: `Map.replace()` looks like this: ``` default boolean replace(K key, V oldValue, V newValue) { Object curValue = get(key); if (!Objects.equals(curValue, oldValue) || (curValue == null && !containsKey(key))) { return false; } put(key, newValue); return true; } ``` Since it internally calls put which we are already testing, so that's why I thought it was not necessary to test these functions. Likewise for `remove()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #13555: MINOR: Move `ControllerPurgatory` to `server-common`
jsancio commented on code in PR #13555: URL: https://github.com/apache/kafka/pull/13555#discussion_r1172839235 ## server-common/src/main/java/org/apache/kafka/deferred/DeferredEvent.java: ## @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.kafka.controller; +package org.apache.kafka.deferred; /** * Represents a deferred event in the controller purgatory. Review Comment: Let's fix this comment. Maybe: > Represents a deferred event in the {@code DeferredEventQueue}. ## server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java: ## @@ -77,13 +76,13 @@ void failAll(Exception exception) { * @param offsetThe offset to add the new event at. Review Comment: Above we have: > Add a new purgatory event. how about: > Add a new deferred event to be completed by the provided offset. ## server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java: ## @@ -77,13 +76,13 @@ void failAll(Exception exception) { * @param offsetThe offset to add the new event at. * @param event The new event. */ -void add(long offset, DeferredEvent event) { +public void add(long offset, DeferredEvent event) { if (!pending.isEmpty()) { long lastKey = pending.lastKey(); if (offset < lastKey) { throw new RuntimeException("There is already a purgatory event with " + Review Comment: How about throwing an `IllegalArgumentException` instead of `RuntimeException` with the following message: > There is already a deferred event with offset ... ## server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java: ## @@ -60,7 +59,7 @@ void completeUpTo(long offset) { * Review Comment: The line above has > Fail all the pending purgatory entries. how about: > Fail all deferred events with the provided exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tinaselenge commented on pull request #13459: KAFKA-14592: Move FeatureCommand to tools
tinaselenge commented on PR #13459: URL: https://github.com/apache/kafka/pull/13459#issuecomment-1516631750 @showuon Thank you so much. I have addressed the comments but left couple of them to clarify first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13579: KAFKA-14904: Pending state blocked verification of transactions
jolshan commented on PR #13579: URL: https://github.com/apache/kafka/pull/13579#issuecomment-1516624342 Will also cherry-pick to 3.5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan merged pull request #13579: KAFKA-14904: Pending state blocked verification of transactions
jolshan merged PR #13579: URL: https://github.com/apache/kafka/pull/13579 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13579: KAFKA-14904: Pending state blocked verification of transactions
jolshan commented on PR #13579: URL: https://github.com/apache/kafka/pull/13579#issuecomment-1516620151 Many many connect failures still 😔 Failures appear to be unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tinaselenge commented on a diff in pull request #13459: KAFKA-14592: Move FeatureCommand to tools
tinaselenge commented on code in PR #13459: URL: https://github.com/apache/kafka/pull/13459#discussion_r1172832601 ## tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java: ## @@ -307,25 +294,24 @@ private static void update(String op, Admin admin, Map up } }); -AtomicInteger numFailures = new AtomicInteger(); -errors.keySet().forEach(feature -> { -short level = updates.get(feature).maxVersionLevel(); -Optional maybeThrowable = errors.get(feature); +int numFailures = 0; +for (Map.Entry> entry : errors.entrySet()) { Review Comment: Spotbug is not happy when using keySet iterator and doing error.get(feature). And because we are not iterating on keyset, to traverse with sorted keys, we have to use TreeSet? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1172824513 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +members.put(consumerA, new AssignmentMemberSpec( +Optional.empty(), + Optional.empty(), + Collections.emptyList(), + Collections.emptyMap())); Review Comment: I had changed it in my IDE and it looked good, idky the formatting changed in the PR :( Will take a look at it thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #13555: MINOR: Move `ControllerPurgatory` to `server-common`
jsancio commented on code in PR #13555: URL: https://github.com/apache/kafka/pull/13555#discussion_r1172793737 ## server-common/src/main/java/org/apache/kafka/purgatory/DeferredEventPurgatory.java: ## @@ -26,10 +26,9 @@ /** * The purgatory which holds events that have been started, but not yet completed. Review Comment: This is Merriam Webster definition: > 1 : an intermediate state after death for expiatory purification specifically : a place or state of punishment wherein according to Roman Catholic doctrine the souls of those who die in God's grace may make satisfaction for past sins and so become fit for heaven > 2 : a place or state of temporary suffering or misery I don't think either of those definition is accurate for this problem and solution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication
gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1172772709 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); -offsetSyncs.put(sourceTopicPartition, offsetSync); +offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); +offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); +} + +private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { +// Make a copy of the array before mutating it, so that readers do not see inconsistent data +// TODO: batch updates so that this copy can be performed less often for high-volume sync topics. +OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); +updateSyncArray(mutableSyncs, offsetSync); +if (log.isTraceEnabled()) { +StringBuilder stateString = new StringBuilder(); +stateString.append("["); +for (int i = 0; i < Long.SIZE; i++) { +if (i != 0) { +stateString.append(","); +} +if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { +// Print only if the sync is interesting, a series of repeated syncs will appear as , +stateString.append(mutableSyncs[i].upstreamOffset()); +stateString.append(":"); +stateString.append(mutableSyncs[i].downstreamOffset()); +} +} +stateString.append("]"); +log.trace("New sync {} applied, new state is {}", offsetSync, stateString); +} +return mutableSyncs; +} + +private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { +OffsetSync[] syncs = new OffsetSync[Long.SIZE]; +clearSyncArray(syncs, firstSync); +return syncs; +} + +private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +for (int i = 0; i < Long.SIZE; i++) { +syncs[i] = offsetSync; +} +} + +private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +long upstreamOffset = offsetSync.upstreamOffset(); +// Old offsets are invalid, so overwrite them all. +if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { Review Comment: @urbandan Before https://issues.apache.org/jira/browse/KAFKA-13659 and https://issues.apache.org/jira/browse/KAFKA-12468 this logic did not have monotonicity guarantees, but that does not mean that behavior was acceptable to users. These tickets were opened and voted on by multiple users before we took action. If we eliminate the in-memory deduplication and the forced read-to-end in this PR, we will re-introduce behavior that we have already fixed on trunk. I do not think that is acceptable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures
rondagostino commented on code in PR #13437: URL: https://github.com/apache/kafka/pull/13437#discussion_r1172661731 ## server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableNavigableSet.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable; + +import org.apache.kafka.server.immutable.pcollections.PCollectionsImmutableNavigableSet; + +import java.util.NavigableSet; + +/** + * A persistent Tree-based NavigableSet wrapper + * java.util.Set methods that mutate in-place will throw UnsupportedOperationException + * + * @param the element type + */ +public interface ImmutableNavigableSet extends ImmutableSet, NavigableSet { +/** + * @return a wrapped tree-based persistent navigable set that is empty + * @param the element type + */ +static > ImmutableNavigableSet empty() { +return PCollectionsImmutableNavigableSet.empty(); +} + +/** + * @param e the element + * @return a wrapped hash-based persistent set that is empty + * @param the element type + */ +static > ImmutableNavigableSet singleton(E e) { +return PCollectionsImmutableNavigableSet.singleton(e); +} + +/** + * @param e the element + * @return a wrapped persistent sorted set that differs from this one in that the given element is added (if necessary) + */ +ImmutableNavigableSet added(E e); + +/** + * @param e the element + * @return a wrapped persistent sorted set that differs from this one in that the given element is added (if necessary) Review Comment: s/sorted/navigable/ ## server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableNavigableSet.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable; + +import org.apache.kafka.server.immutable.pcollections.PCollectionsImmutableNavigableSet; + +import java.util.NavigableSet; + +/** + * A persistent Tree-based NavigableSet wrapper + * java.util.Set methods that mutate in-place will throw UnsupportedOperationException + * + * @param the element type + */ +public interface ImmutableNavigableSet extends ImmutableSet, NavigableSet { +/** + * @return a wrapped tree-based persistent navigable set that is empty + * @param the element type + */ +static > ImmutableNavigableSet empty() { +return PCollectionsImmutableNavigableSet.empty(); +} + +/** + * @param e the element + * @return a wrapped hash-based persistent set that is empty Review Comment: Looks like copy/paste error -- not hash-based. ## server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableNavigableSetTest.java: ## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS
[GitHub] [kafka] dajac commented on a diff in pull request #13555: MINOR: Move `ControllerPurgatory` to `server-common`
dajac commented on code in PR #13555: URL: https://github.com/apache/kafka/pull/13555#discussion_r1172738445 ## server-common/src/main/java/org/apache/kafka/purgatory/DeferredEventPurgatory.java: ## @@ -26,10 +26,9 @@ /** * The purgatory which holds events that have been started, but not yet completed. - * We wait for the high water mark of the metadata log to advance before completing - * them. + * We wait for the high watermark of the log to advance before completing them. */ -class ControllerPurgatory { +public class DeferredEventPurgatory { Review Comment: Naming things is hard :). `Completion` sounds a bit weird here. I can't really think of a better name... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13555: MINOR: Move `ControllerPurgatory` to `server-common`
dajac commented on code in PR #13555: URL: https://github.com/apache/kafka/pull/13555#discussion_r1172735940 ## server-common/src/main/java/org/apache/kafka/purgatory/DeferredEventPurgatory.java: ## @@ -26,10 +26,9 @@ /** * The purgatory which holds events that have been started, but not yet completed. Review Comment: Why do you think that it is not accurate? For me, a purgatory is a data structure holding something which is completed later. That seems to fit here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #13621: KAFKA-14133: Migrate ChangeLogReader mock in TaskManagerTest to Mockito
clolov commented on PR #13621: URL: https://github.com/apache/kafka/pull/13621#issuecomment-1516499410 Heya @cadonna! I hope this attempt is what you had in mind? Unless I am wrong the removals detailed in the pull request are sensible as Mockito should be returning empty collections for method invocations on mocks based on "By default, for all methods that return a value, a mock will return either null, a primitive/primitive wrapper value, or an empty collection, as appropriate" (source: https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#stubbing) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov opened a new pull request, #13621: KAFKA-14133: Migrate ChangeLogReader mock in TaskManagerTest to Mockito
clolov opened a new pull request, #13621: URL: https://github.com/apache/kafka/pull/13621 This pull request takes a similar approach as the one outlined in https://github.com/apache/kafka/pull/13529 to move each mock separately to ease reviewing the code. Once https://github.com/apache/kafka/pull/13529 is merged this pull request will be rebased on top (as some of the changes are present in both pull requests to make the tests pass). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #13555: MINOR: Move `ControllerPurgatory` to `server-common`
jsancio commented on code in PR #13555: URL: https://github.com/apache/kafka/pull/13555#discussion_r1172724317 ## server-common/src/main/java/org/apache/kafka/purgatory/DeferredEventPurgatory.java: ## @@ -26,10 +26,9 @@ /** * The purgatory which holds events that have been started, but not yet completed. Review Comment: Minor but can we avoid the use of the word purgatory? It is not accurate for what this type and package are doing. ## server-common/src/main/java/org/apache/kafka/purgatory/DeferredEventPurgatory.java: ## @@ -26,10 +26,9 @@ /** * The purgatory which holds events that have been started, but not yet completed. - * We wait for the high water mark of the metadata log to advance before completing - * them. + * We wait for the high watermark of the log to advance before completing them. */ -class ControllerPurgatory { +public class DeferredEventPurgatory { Review Comment: Maybe a more accurate name could be `OffsetBasedCompletion` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on pull request #13603: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…
jeffkbkim commented on PR #13603: URL: https://github.com/apache/kafka/pull/13603#issuecomment-1516453173 @dajac ``` org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest. shouldWorkWithRebalance kafka.api.ConsumerBounceTest.testSubscribeWhenTopicUnavailable() ``` passed locally -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #13461: KAFKA-14840: Support for snapshots during ZK migration
mumrah commented on PR #13461: URL: https://github.com/apache/kafka/pull/13461#issuecomment-1516441727 > Should we wrap the calls to quotaEntityConsumer.accept (and other consumers) so that if it throws an exception, we log an ERROR message to log4j to let us know what failed? Yes, that's a good idea. I'll add this to all the externalized calls (consumers/visitors). > Can we have a ZkConfigMigrationClientTest and so on, for the other new migrationclient classes? For sure. I'll work on these next. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on pull request #13604: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…
jeffkbkim commented on PR #13604: URL: https://github.com/apache/kafka/pull/13604#issuecomment-1516438325 @dajac i see Selector test failures that pass locally. Can we re-trigger the build? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13555: MINOR: Move `ControllerPurgatory` to `server-common`
dajac commented on code in PR #13555: URL: https://github.com/apache/kafka/pull/13555#discussion_r1172674549 ## server-common/src/main/java/org/apache/kafka/purgatory/Purgatory.java: ## @@ -26,10 +26,9 @@ /** * The purgatory which holds events that have been started, but not yet completed. - * We wait for the high water mark of the metadata log to advance before completing - * them. + * We wait for the high watermark of the log to advance before completing them. */ -class ControllerPurgatory { +public class Purgatory { Review Comment: +1 for `DeferredEventPurgatory`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14923) Upgrade io.netty_netty-codec for CVE-2022-41881
[ https://issues.apache.org/jira/browse/KAFKA-14923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714612#comment-17714612 ] Vikash Mishra commented on KAFKA-14923: --- [~yash.mayya] Awesome, thanks for the details. Not sure if there is a patch planned for 3.4.0 but it's good to know that at least 3.5.0 will have fixes. > Upgrade io.netty_netty-codec for CVE-2022-41881 > --- > > Key: KAFKA-14923 > URL: https://issues.apache.org/jira/browse/KAFKA-14923 > Project: Kafka > Issue Type: Task >Affects Versions: 3.4.0, 3.3.2 >Reporter: Vikash Mishra >Priority: Critical > > Currently used io.netty_netty-codec version 4.1.78 has high severity CVE: > [NVD - CVE-2022-41881 > (nist.gov)|https://nvd.nist.gov/vuln/detail/CVE-2022-41881] > Fix was patched in version 4.1.86.Final. As we have higher stable version > 4.1.91.Final available we should upgrade to same to fix mentioned CVE. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mumrah commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration
mumrah commented on code in PR #13461: URL: https://github.com/apache/kafka/pull/13461#discussion_r1172646961 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -174,12 +173,12 @@ private boolean areZkBrokersReadyForMigration() { /** * Apply a function which transforms our internal migration state. * - * @param name A descriptive name of the function that is being applied - * @param stateMutator A function which performs some migration operations and possibly transforms our internal state + * @param name A descriptive name of the function that is being applied + * @param migrationOp A function which performs some migration operations and possibly transforms our internal state */ -private void apply(String name, Function stateMutator) { +private void applyMigrationOperation(String name, KRaftMigrationOperation migrationOp) { ZkMigrationLeadershipState beforeState = this.migrationLeadershipState; -ZkMigrationLeadershipState afterState = stateMutator.apply(beforeState); +ZkMigrationLeadershipState afterState = migrationOp.apply(beforeState); log.trace("{} transitioned from {} to {}", name, beforeState, afterState); Review Comment: I'll add this to the current PR. Shouldn't be too hard -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata
dajac commented on code in PR #13537: URL: https://github.com/apache/kafka/pull/13537#discussion_r1172621475 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ClientAssignor.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; + +import java.nio.ByteBuffer; +import java.util.Objects; + +/** + * An immutable representation of a client side assignor within a consumer group member. + */ +public class ClientAssignor { +/** + * The name of the assignor. + */ +private final String name; + +/** + * The reason reported by the assignor. + */ +private final byte reason; + +/** + * The minimum metadata version supported by the assignor. + */ +private final short minimumVersion; + +/** + * The maximum metadata version supported by the assignor. + */ +private final short maximumVersion; + +/** + * The versioned metadata. + */ +private final VersionedMetadata metadata; Review Comment: ``` /** * The error assigned to the member. */ private final byte error; ``` you mean more than this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode
mumrah commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1172621439 ## metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java: ## @@ -115,6 +115,9 @@ private void publishDelta(MetadataDelta delta) { } } changes.apply(metrics); +if (delta.featuresDelta() != null) { Review Comment: I wonder if we should only set the zk state metric if we're on a metadata.version that supports it. Otherwise we could see clusters go from NONE -> MIGRATION -> POST_MIGRATION, which is a bit odd. (NONE is supposed to be a terminal state) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata
jeffkbkim commented on code in PR #13537: URL: https://github.com/apache/kafka/pull/13537#discussion_r1172600639 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ClientAssignor.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; + +import java.nio.ByteBuffer; +import java.util.Objects; + +/** + * An immutable representation of a client side assignor within a consumer group member. + */ +public class ClientAssignor { +/** + * The name of the assignor. + */ +private final String name; + +/** + * The reason reported by the assignor. + */ +private final byte reason; + +/** + * The minimum metadata version supported by the assignor. + */ +private final short minimumVersion; + +/** + * The maximum metadata version supported by the assignor. + */ +private final short maximumVersion; + +/** + * The versioned metadata. + */ +private final VersionedMetadata metadata; Review Comment: that makes sense, and we added information inside VersionedMetadata. can we have a description for Assignment.error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #13555: MINOR: Move `ControllerPurgatory` to `server-common`
dengziming commented on code in PR #13555: URL: https://github.com/apache/kafka/pull/13555#discussion_r1172598460 ## server-common/src/main/java/org/apache/kafka/purgatory/Purgatory.java: ## @@ -26,10 +26,9 @@ /** * The purgatory which holds events that have been started, but not yet completed. - * We wait for the high water mark of the metadata log to advance before completing - * them. + * We wait for the high watermark of the log to advance before completing them. */ -class ControllerPurgatory { +public class Purgatory { Review Comment: It seems better to add a prefix since we have several `Purgatory` classes, we have a `FuturePurgatory` in raft module and its similar to this `ControllerPurgatory`, we may call it `DeferredEventPurgatory` or HwmPurgatory(high watermark). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14922) kafka-streams-application-reset deletes topics not belonging to specified application-id
[ https://issues.apache.org/jira/browse/KAFKA-14922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714597#comment-17714597 ] Jørgen commented on KAFKA-14922: An _improvement_ would be to only return topics that exactly contains the applicationId provided. This would fix the case where an applicationId doesn't exist instead of returning all topics that starts with provided applicationId (like "f" in the example). Would not cover the case where other applicationIds starts with applicationId provided (foo-v1 would delete foo-v1-2 topics, etc) > kafka-streams-application-reset deletes topics not belonging to specified > application-id > > > Key: KAFKA-14922 > URL: https://issues.apache.org/jira/browse/KAFKA-14922 > Project: Kafka > Issue Type: Bug > Components: streams, tools >Affects Versions: 3.4.0 >Reporter: Jørgen >Priority: Major > > Slack-thread: > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1681908267206849] > When running the command _kafka-streams-application-reset --bootstrap-servers > $BOOTSTRAP --application-id foo_ all internal topics that _starts with_ foo > is deleted. This happens even if there's no application-id named foo. > Example: > {code:java} > Application IDs: > foo-v1 > foo-v2 > Internal topics: > foo-v1-repartition-topic-repartition > foo-v2-repartition-topic-repartition > Application reset: > kafka-streams-application-reset --bootstrap-servers $BOOTSTRAP > --application-id foo > > No input or intermediate topics specified. Skipping seek. > Deleting inferred internal topics [foo-v2-repartition-topic-repartition, > foo-v1-repartition-topic-repartition] > Done.{code} > Expected behaviour is that the command fails as there are no application-id's > with the name foo instead of deleting all foo* topics. > This is critical on typos or if application-ids starts with the same name as > others (for example if we had foo-v21 and wanted to reset foo-v2) > The bug should be located here: > [https://github.com/apache/kafka/blob/c14f56b48461f01743146d58987bc8661ba0d459/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java#L693] > Should check that the topics matches the application-id exactly instead of > checking that it starts with the application-id. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata
dajac commented on code in PR #13537: URL: https://github.com/apache/kafka/pull/13537#discussion_r1172597532 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ClientAssignor.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; + +import java.nio.ByteBuffer; +import java.util.Objects; + +/** + * An immutable representation of a client side assignor within a consumer group member. + */ +public class ClientAssignor { +/** + * The name of the assignor. + */ +private final String name; + +/** + * The reason reported by the assignor. + */ +private final byte reason; + +/** + * The minimum metadata version supported by the assignor. + */ +private final short minimumVersion; + +/** + * The maximum metadata version supported by the assignor. + */ +private final short maximumVersion; + +/** + * The versioned metadata. + */ +private final VersionedMetadata metadata; Review Comment: I am not sure to understand what you would like to see. `ClientAssignor` is all about the client assignor so everything in this class is about this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tinaselenge commented on a diff in pull request #13459: KAFKA-14592: Move FeatureCommand to tools
tinaselenge commented on code in PR #13459: URL: https://github.com/apache/kafka/pull/13459#discussion_r1172577875 ## tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java: ## @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.server.common.MetadataVersion; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import static java.lang.String.format; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; + +import static org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.SAFE_DOWNGRADE; +import static org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.KRAFT) +@Tag("integration") +public class FeatureCommandTest { + +private final ClusterInstance cluster; +public FeatureCommandTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1) +public void testDescribeWithZK() { +String commandOutput = ToolsTestUtils.captureStandardOut(() -> +assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe")) +); +assertEquals("", commandOutput); +} + +@ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1) +public void testDescribeWithKRaft() { +String commandOutput = ToolsTestUtils.captureStandardOut(() -> +assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe")) +); +assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + +"SupportedMaxVersion: 3.5-IV1\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput)); +} + +@ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1) +public void testUpgradeMetadataVersionWithZk() { +String commandOutput = ToolsTestUtils.captureStandardOut(() -> +assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), +"upgrade", "--metadata", "3.3-IV2")) +); +assertEquals("Could not upgrade metadata.version to 6. Could not apply finalized feature " + +"update because the provided feature is not supported.", commandOutput); +} + +@ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1) +public void testUpgradeMetadataVersionWithKraft() { +String commandOutput = ToolsTestUtils.captureStandardOut(() -> +assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), +"upgrade", "--feature", "metadata.version=5")) +); +assertEquals("metadata.version was upgraded to 5.", commandOutput); + +commandOutput = ToolsTestUtils.captureStandardOut(() -> +assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), +"upgrade", "--metadata", "3.3-IV2")) +); +assertEquals("metadata.version was upgraded to 6.", commandOutput); +} + +@ClusterTest(clusterType = Type.ZK, metadataVersion = Metad
[jira] [Assigned] (KAFKA-14877) refactor InMemoryLeaderEpochCheckpoint
[ https://issues.apache.org/jira/browse/KAFKA-14877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya reassigned KAFKA-14877: Assignee: Divij Vaidya > refactor InMemoryLeaderEpochCheckpoint > -- > > Key: KAFKA-14877 > URL: https://issues.apache.org/jira/browse/KAFKA-14877 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Divij Vaidya >Priority: Major > > follow up with this comment: > https://github.com/apache/kafka/pull/13456#discussion_r1154306477 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14877) refactor InMemoryLeaderEpochCheckpoint
[ https://issues.apache.org/jira/browse/KAFKA-14877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714594#comment-17714594 ] Divij Vaidya commented on KAFKA-14877: -- I will pick this up if there is no objection by anyone. > refactor InMemoryLeaderEpochCheckpoint > -- > > Key: KAFKA-14877 > URL: https://issues.apache.org/jira/browse/KAFKA-14877 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Divij Vaidya >Priority: Major > > follow up with this comment: > https://github.com/apache/kafka/pull/13456#discussion_r1154306477 -- This message was sent by Atlassian Jira (v8.20.10#820010)