[GitHub] [kafka] emissionnebula commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures

2023-04-20 Thread via GitHub


emissionnebula commented on code in PR #13437:
URL: https://github.com/apache/kafka/pull/13437#discussion_r1173353923


##
server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java:
##
@@ -68,7 +68,7 @@ public void testUnderlying() {
 }
 
 @Test
-public void testDelegationOfAfterAdding() {
+public void testDelegationOfAfterUpdated() {

Review Comment:
   ohh. I missed this :( 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14827) Support for StandardAuthorizer in Benchmark

2023-04-20 Thread Purshotam Chauhan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Purshotam Chauhan resolved KAFKA-14827.
---
Fix Version/s: 3.5.0
 Reviewer: Manikumar Reddy
   Resolution: Fixed

> Support for StandardAuthorizer in Benchmark
> ---
>
> Key: KAFKA-14827
> URL: https://issues.apache.org/jira/browse/KAFKA-14827
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Purshotam Chauhan
>Assignee: Purshotam Chauhan
>Priority: Major
> Fix For: 3.5.0
>
>
> Support for StandardAuthorizer in Benchmark



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-14827) Support for StandardAuthorizer in Benchmark

2023-04-20 Thread Purshotam Chauhan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Purshotam Chauhan closed KAFKA-14827.
-

> Support for StandardAuthorizer in Benchmark
> ---
>
> Key: KAFKA-14827
> URL: https://issues.apache.org/jira/browse/KAFKA-14827
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Purshotam Chauhan
>Assignee: Purshotam Chauhan
>Priority: Major
> Fix For: 3.5.0
>
>
> Support for StandardAuthorizer in Benchmark



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dengziming commented on a diff in pull request #13555: MINOR: Move `ControllerPurgatory` to `server-common`

2023-04-20 Thread via GitHub


dengziming commented on code in PR #13555:
URL: https://github.com/apache/kafka/pull/13555#discussion_r1173247022


##
server-common/src/main/java/org/apache/kafka/purgatory/DeferredEventPurgatory.java:
##
@@ -26,10 +26,9 @@
 
 /**
  * The purgatory which holds events that have been started, but not yet 
completed.

Review Comment:
   Some of my thoughts: the founder of Kafka had some artistic elements, with 
Kafka(a novelist) mainly representing the modernist school, and "purgatory" 
being a term used in Dante's "Divine Comedy", a representative writer of the 
Renaissance era. Purgatory' is first used in the `TimingWheel` algorithm and 
has been used several times to represent  a `ThresholdFuture`  data structure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on pull request #13532: KAFKA-14887: No shutdown for ZK session expiration in feature processing

2023-04-20 Thread via GitHub


rondagostino commented on PR #13532:
URL: https://github.com/apache/kafka/pull/13532#issuecomment-1517118288

   Merged to 3.5, 3.4, 3.3, 3.2, 3.1, 3.0, 2.8, and 2.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14887) ZK session timeout can cause broker to shutdown

2023-04-20 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-14887:
--
Fix Version/s: 2.7.3
   3.2.4
   3.1.3
   3.0.3
   3.4.1
   3.3.3
   2.8.3

> ZK session timeout can cause broker to shutdown
> ---
>
> Key: KAFKA-14887
> URL: https://issues.apache.org/jira/browse/KAFKA-14887
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.7.0, 2.8.0, 2.7.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0, 3.0.1, 
> 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 
> 3.3.2
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
> Fix For: 2.7.3, 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 3.3.3, 2.8.3
>
>
> We have the following code in FinalizedFeatureChangeListener.scala which will 
> exit regardless of the type of exception that is thrown when trying to 
> process feature changes:
> case e: Exception => {
>   error("Failed to process feature ZK node change event. The broker 
> will eventually exit.", e)
>   throw new FatalExitError(1)
> The issue here is that this does not distinguish between exceptions caused by 
> an inability to process a feature change and an exception caused by a 
> ZooKeeper session timeout.  We want to shut the broker down for the former 
> case, but we do NOT want to shut the broker down in the latter case; the 
> ZooKeeper session will eventually be reestablished, and we can continue 
> processing at that time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14887) ZK session timeout can cause broker to shutdown

2023-04-20 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-14887:
--
Fix Version/s: 3.5.0

> ZK session timeout can cause broker to shutdown
> ---
>
> Key: KAFKA-14887
> URL: https://issues.apache.org/jira/browse/KAFKA-14887
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.7.0, 2.8.0, 2.7.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0, 3.0.1, 
> 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 
> 3.3.2
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
> Fix For: 3.5.0
>
>
> We have the following code in FinalizedFeatureChangeListener.scala which will 
> exit regardless of the type of exception that is thrown when trying to 
> process feature changes:
> case e: Exception => {
>   error("Failed to process feature ZK node change event. The broker 
> will eventually exit.", e)
>   throw new FatalExitError(1)
> The issue here is that this does not distinguish between exceptions caused by 
> an inability to process a feature change and an exception caused by a 
> ZooKeeper session timeout.  We want to shut the broker down for the former 
> case, but we do NOT want to shut the broker down in the latter case; the 
> ZooKeeper session will eventually be reestablished, and we can continue 
> processing at that time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rondagostino commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures

2023-04-20 Thread via GitHub


rondagostino commented on code in PR #13437:
URL: https://github.com/apache/kafka/pull/13437#discussion_r1173188783


##
server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java:
##
@@ -77,7 +77,7 @@ public void testDelegationOfAfterAdding() {
 }
 
 @Test
-public void testDelegationOfAfterRemoving() {
+public void testDelegationOfAfterRemoved() {

Review Comment:
   s/AfterRemoved/Removed/



##
server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSetTest.java:
##
@@ -71,7 +71,7 @@ public void testUnderlying() {
 }
 
 @Test
-public void testDelegationOfAfterAdding() {
+public void testDelegationOfAfterAdded() {

Review Comment:
   s/AfterAdded/Added/



##
server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableNavigableSetTest.java:
##
@@ -82,7 +82,7 @@ public void testDelegationOfAfterAdding() {
 }
 
 @Test
-public void testDelegationOfAfterRemoving() {
+public void testDelegationOfAfterRemoved() {

Review Comment:
   s/AfterRemoved/Removed/



##
server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSetTest.java:
##
@@ -80,7 +80,7 @@ public void testDelegationOfAfterAdding() {
 }
 
 @Test
-public void testDelegationOfAfterRemoving() {
+public void testDelegationOfAfterRemoved() {

Review Comment:
   s/AfterRemoved/Removed/



##
server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableNavigableSetTest.java:
##
@@ -73,7 +73,7 @@ public void testUnderlying() {
 }
 
 @Test
-public void testDelegationOfAfterAdding() {
+public void testDelegationOfAfterAdded() {

Review Comment:
   s/AfterAdded/Added/



##
server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java:
##
@@ -68,7 +68,7 @@ public void testUnderlying() {
 }
 
 @Test
-public void testDelegationOfAfterAdding() {
+public void testDelegationOfAfterUpdated() {

Review Comment:
   s/AfterUpdated/Updated/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures

2023-04-20 Thread via GitHub


rondagostino commented on code in PR #13437:
URL: https://github.com/apache/kafka/pull/13437#discussion_r1173187845


##
server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java:
##
@@ -225,77 +225,59 @@ public void testDelegationOfReplaceAll() {
 new PCollectionsHashMapWrapperDelegationChecker<>()
 .defineMockConfigurationForVoidMethodInvocation(mock -> 
mock.replaceAll(eq(mockBiFunction)))
 .defineWrapperVoidMethodInvocation(wrapper -> 
wrapper.replaceAll(mockBiFunction))
-.doVoidMethodDelegationCheck();
+.doUnsupportedVoidFunctionDelegrationCheck();
 }
 
 @Test
 public void testDelegationOfPutIfAbsent() {
 new PCollectionsHashMapWrapperDelegationChecker<>()
-.defineMockConfigurationForFunctionInvocation(mock -> 
mock.putIfAbsent(eq(this), eq(this)), this)
-
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> 
wrapper.putIfAbsent(this, this), identity())
-.doFunctionDelegationCheck();
-}
-
-@ParameterizedTest
-@ValueSource(booleans = {true, false})
-public void testDelegationOfRemoveByKeyAndValue(boolean 
mockFunctionReturnValue) {
-new PCollectionsHashMapWrapperDelegationChecker<>()
-.defineMockConfigurationForFunctionInvocation(mock -> 
mock.remove(eq(this), eq(this)), mockFunctionReturnValue)
-
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> 
wrapper.remove(this, this), identity())
-.doFunctionDelegationCheck();
-}
-
-@ParameterizedTest
-@ValueSource(booleans = {true, false})
-public void testDelegationOfReplaceWhenMappedToSpecificValue(boolean 
mockFunctionReturnValue) {
-new PCollectionsHashMapWrapperDelegationChecker<>()
-.defineMockConfigurationForFunctionInvocation(mock -> 
mock.replace(eq(this), eq(this), eq(this)), mockFunctionReturnValue)
-
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> 
wrapper.replace(this, this, this), identity())
-.doFunctionDelegationCheck();

Review Comment:
   Yeah, this is why I said in the comment above:
   ```
   I think on the one hand we don't have to test that the real method throws 
the exception -- that's up to the library itself to test in its test suite. All 
we have to test is that we delegate correctly. 
   ```
   
   The underlying library implementation in I guess these two cases doesn't 
explicitly throw the exception but instead reuses a default implementation from 
`java.util`, and that will cause an exception to be thrown if the map ends up 
needing to be mutated.
   
   At a minimum we should put the two test cases back in as I wrote them.  If 
you want to do it with an explicit check for the thrown exception by invoking 
the underlying method then you may have to do a bit more configuration on the 
mock so that the underlying method will actually decide it needs to mutate the 
map.  probably just putting my stuff back in is easier (and also add 
"UnsupportedOperation" into the test method names).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-20 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1173179837


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ *  the number of subscribed members is greater than the number of 
partitions for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ *This can only be done if every member is subscribed to the same topics 
and the topics are co-partitioned.
+ *Two streams are co-partitioned if the following conditions are met:
+ *
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ *
+ *  Members should retain as much as their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given member 
subscriptions.
+ *  Generate a list of members (potentiallyUnfilledMembers) 
that have not met the minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  Add members from the potentiallyUnfilled list to the 
Unfilled list if they haven't met the total required quota i.e. 
minimum number of partitions per member + 1 (if member is designated to receive 
one of the excess partitions) 
+ *  Generate a list of unassigned partitions by calculating the difference 
between total partitions and already assigned (sticky) partitions 
+ *  Iterate through unfilled members and assign partitions from the 
unassigned partitions 
+ * 
+ * 
+ *
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic.computeIfAbsent(topicId, k -> new 
ArrayList<>()).add(memberId);
+} else {
+log.info(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map>

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-20 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1173177108


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ *  the number of subscribed members is greater than the number of 
partitions for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ *This can only be done if every member is subscribed to the same topics 
and the topics are co-partitioned.
+ *Two streams are co-partitioned if the following conditions are met:
+ *
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ *
+ *  Members should retain as much as their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given member 
subscriptions.
+ *  Generate a list of members (potentiallyUnfilledMembers) 
that have not met the minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  Add members from the potentiallyUnfilled list to the 
Unfilled list if they haven't met the total required quota i.e. 
minimum number of partitions per member + 1 (if member is designated to receive 
one of the excess partitions) 
+ *  Generate a list of unassigned partitions by calculating the difference 
between total partitions and already assigned (sticky) partitions 
+ *  Iterate through unfilled members and assign partitions from the 
unassigned partitions 
+ * 
+ * 
+ *
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic.computeIfAbsent(topicId, k -> new 
ArrayList<>()).add(memberId);
+} else {
+log.info(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map>

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-20 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1173174594


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testOneConsumerNonExistentTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+subscribedTopics.add(topic2Uuid);
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 
Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+// Members
+Map members = new HashMap<>();
+// Consumer A
+List subscribedTopicsA = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsA, new HashMap<>()));
+// Consumer B
+List subscribedTopicsB = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsB, new HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1)));
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(2)));
+// Topic 3 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(0)));
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-20 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1173172297


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+members.put(consumerA, new AssignmentMemberSpec(
+Optional.empty(),
+   Optional.empty(),
+  Collections.emptyList(),
+  Collections.emptyMap()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testOneConsumerNonExistentTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-20 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1173171410


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -266,7 +266,7 @@ public synchronized TransactionalRequestResult beginAbort() 
{
 return handleCachedTransactionRequestResult(() -> {
 if (currentState != State.ABORTABLE_ERROR)
 maybeFailWithError();
-transitionTo(State.ABORTING_TRANSACTION);
+transitionTo(State.ABORTING_TRANSACTION, null, true);

Review Comment:
   Good catch. I've created another version of `beginAbort` that takes the flag 
and am passing in `false` from the `Sender` `run` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-20 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1173167778


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -968,13 +968,31 @@ private void transitionTo(State target) {
 }
 
 private void transitionTo(State target, RuntimeException error) {
+transitionTo(target, error, false);

Review Comment:
   > I think changing the default behavior to not throw can cause issues in 
some calls:
   > 
   > 1. TransactionManager.InitProducerIdHandler#handleResponse on line 1303 - 
lastError is explicitly set to null (which shouldn't be done at all, as 
transitionTo already does that if the state transition is valid), which will 
clear the latest error. I think to make this work, that lastError = null should 
be removed from line 1303
   
   I've removed that line. I am not sure why it's there, but `git blame` says 
it's been there since 2017 😬 
   
   > 2. This is a call chain where we transition on direct user action, 
shouldn't this be throwing? KafkaProducer.send -> KafkaProducer.doSend -> 
maybeTransitionToErrorState -> transitionToAbortableError -> transitionTo
   
   Since `maybeTransitionToErrorState` is being called from inside a `catch` 
block, if we did throw an exception, it would mask the root issue 
(`ApiException`), right?
   
   > 3. In TransactionManager.TxnOffsetCommitHandler#handleResponse, there are 
multiple
   > 
   > ```
   > abortableError(...);
   > break;
   > ```
   > 
   > blocks. If abortableError does not throw on invalid state transition 
anymore, the txn commit will be retried, even when in a failed state, which 
doesn't seem correct.
   
   After the `abortableError` call, if the state transition was invalid, then 
`currentState` would be `FATAL_ERROR`. That has the same effect as the last two 
branches of the `if` statement that call the `fatalError` method, right?
   
   Would simply skipping the reenqueue on fatal errors be sufficient?
   
   ```
   if (result.isCompleted()) {
   pendingTxnOffsetCommits.clear();
   } else if (pendingTxnOffsetCommits.isEmpty()) {
   result.done();
   } else {
   if (!hasFatalError()) {
   // Retry the commits which failed with a retriable error
   reenqueue();
   }
   }
   ```
   
   I don't yet understand why we'd want to re-enqueue after those calls to 
`fatalError` anyway 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14922) kafka-streams-application-reset deletes topics not belonging to specified application-id

2023-04-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714770#comment-17714770
 ] 

Matthias J. Sax commented on KAFKA-14922:
-

{quote}We could add a warning with the list of internal topics found to delete 
and ask for a confirmation to make it harder to inadvertently delete internal 
topics of other application IDs.
{quote}

There is already a `--dry-run` option, but we could of course also try adding a 
`--execute` one and flip it around... Would need a KIP of course.
 
{quote}An _improvement_ would be to only return topics that exactly contains 
the applicationId provided.{quote}
{color:#172b4d}I don't believe it's possible to implement this.{color}
 
{quote}Would not cover the case where other applicationIds starts with 
applicationId provided (foo-v1 would delete foo-v1-2 topics, etc)
{quote}
 
Both seem to be the same issue? Note: we already do 
`topicName.startsWith(options.valueOf(applicationIdOption) + "-")`, ie, we add 
the expected `-` – if your app.id uses a dash like `myApp-v1` there is nothing 
we can do about it. It provides a protection for `appV1` vs `appV2` and if you 
pass in `app` it won't match either of them, but if `-` is use inside app.id, 
it seems there is nothing we can do about it.

> kafka-streams-application-reset deletes topics not belonging to specified 
> application-id
> 
>
> Key: KAFKA-14922
> URL: https://issues.apache.org/jira/browse/KAFKA-14922
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 3.4.0
>Reporter: Jørgen
>Priority: Major
>
> Slack-thread: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1681908267206849]
> When running the command _kafka-streams-application-reset --bootstrap-servers 
> $BOOTSTRAP --application-id foo_ all internal topics that _starts with_ foo 
> is deleted. This happens even if there's no application-id named foo.
> Example:
> {code:java}
> Application IDs:
> foo-v1
> foo-v2
> Internal topics:
> foo-v1-repartition-topic-repartition
> foo-v2-repartition-topic-repartition 
> Application reset:
> kafka-streams-application-reset --bootstrap-servers $BOOTSTRAP 
> --application-id foo
> > No input or intermediate topics specified. Skipping seek.
> Deleting inferred internal topics [foo-v2-repartition-topic-repartition, 
> foo-v1-repartition-topic-repartition]
> Done.{code}
> Expected behaviour is that the command fails as there are no application-id's 
> with the name foo instead of deleting all foo* topics. 
> This is critical on typos or if application-ids starts with the same name as 
> others (for example if we had foo-v21 and wanted to reset foo-v2)
> The bug should be located here: 
> [https://github.com/apache/kafka/blob/c14f56b48461f01743146d58987bc8661ba0d459/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java#L693]
> Should check that the topics matches the application-id exactly instead of 
> checking that it starts with the application-id.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on pull request #13614: KAFKA-14586: Adding redirection for StreamsResetter

2023-04-20 Thread via GitHub


mjsax commented on PR #13614:
URL: https://github.com/apache/kafka/pull/13614#issuecomment-1517029646

   Thanks all of you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia opened a new pull request, #13622: KAFKA-14834: [13/N] Docs updates for versioned store semantics

2023-04-20 Thread via GitHub


vcrfxia opened a new pull request, #13622:
URL: https://github.com/apache/kafka/pull/13622

   This PR updates docs for 
[KIP-914](https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores).
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


mumrah commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173127962


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -324,4 +329,29 @@ class ZkMigrationIntegrationTest {
   assertTrue(firstProducerIdBlock.firstProducerId() < 
producerIdBlock.firstProducerId())
 }
   }
+
+  /**
+   * Start a KRaft cluster with migrations enabled, verify that the controller 
does not accept metadata changes
+   * through the RPCs
+   */
+  @ClusterTests(Array(
+new ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, 
metadataVersion = MetadataVersion.IBP_3_4_IV0,
+  serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "true"))),
+new ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, 
metadataVersion = MetadataVersion.IBP_3_5_IV0,
+  serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "true"))),
+new ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, 
metadataVersion = MetadataVersion.IBP_3_5_IV1,
+  serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "true")))
+  ))
+  def testPreMigrationMode(clusterInstance: ClusterInstance): Unit = {

Review Comment:
   Heh, the test framework is politely adding a broker for me :)
   
   This test does seem to do what I want, i see:
   
   ```
   [2023-04-20 17:56:08,267] ERROR [QuorumController id=3000] createTopics: 
unable to start processing because of NotControllerException. Reason: The 
active controller appears to be node 3000 
(org.apache.kafka.controller.QuorumController:445)
   ```
   
   in the log, and then the create topics call times out after a short time. 
I've updated the test to have `brokers = 1`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


mumrah commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173116859


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1123,29 +1165,104 @@ private void claim(int epoch) {
 // Prepend the activate event. It is important that this event go 
at the beginning
 // of the queue rather than the end (hence prepend rather than 
append). It's also
 // important not to use prepend for anything else, to preserve the 
ordering here.
-queue.prepend(new ControllerWriteEvent<>("completeActivation[" + 
epoch + "]",
-new CompleteActivationEvent()));
+ControllerWriteEvent activationEvent = new 
ControllerWriteEvent<>("completeActivation[" + epoch + "]",
+new CompleteActivationEvent(),
+EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, 
RUNS_IN_PREMIGRATION));
+activationEvent.future.whenComplete((__, t) -> {
+if (t != null) {
+fatalFaultHandler.handleFault("exception while activating 
controller", t);

Review Comment:
   In `handleEventException`, we renounce the controller leadership when we see 
a non-ApiException from a ControllerWriteEvent. As far as I can tell, this does 
not hit the fault handler.
   
   `KafkaEventQueue#run` traps exceptions and calls `Event#handleException`. In 
`ControllerEvent#handleException`, we call 
`QuorumController#handleEventException` which just does logging and 
resignation. `handleEventException` wraps the exception with 
UnknownServerException, but I don't see where we do anything with that in 
ControllerEvent. I think this is by design since we don't want the controller 
just crashing for any error, but only for specific cases where we call the 
fault handler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


mumrah commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173116859


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1123,29 +1165,104 @@ private void claim(int epoch) {
 // Prepend the activate event. It is important that this event go 
at the beginning
 // of the queue rather than the end (hence prepend rather than 
append). It's also
 // important not to use prepend for anything else, to preserve the 
ordering here.
-queue.prepend(new ControllerWriteEvent<>("completeActivation[" + 
epoch + "]",
-new CompleteActivationEvent()));
+ControllerWriteEvent activationEvent = new 
ControllerWriteEvent<>("completeActivation[" + epoch + "]",
+new CompleteActivationEvent(),
+EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, 
RUNS_IN_PREMIGRATION));
+activationEvent.future.whenComplete((__, t) -> {
+if (t != null) {
+fatalFaultHandler.handleFault("exception while activating 
controller", t);

Review Comment:
   In `handleEventException`, we renounce the controller leadership when we see 
a non-ApiException from a ControllerWriteEvent. As far as I can tell, this does 
not hit the fault handler.
   
   `KafkaEventQueue#run` traps exceptions and calls `Event#handleException`. In 
`ControllerEvent#handleException`, we call 
`QuorumController#handleEventException` which just does logging and 
resignation. `handleEventException` wraps the exception with 
UnknownServerException, but I don't see where we do anything with that in 
ControllerEvent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


mumrah commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173109856


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -222,7 +245,7 @@ private boolean isValidStateChange(MigrationDriverState 
newState) {
 
 private void transitionTo(MigrationDriverState newState) {
 if (!isValidStateChange(newState)) {
-log.error("Error transition in migration driver from {} to {}", 
migrationState, newState);
+log.error("Invalid transition in migration driver from {} to {}", 
migrationState, newState);

Review Comment:
   sure, i'll clean this up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173092055


##
metadata/src/main/resources/common/metadata/ZkMigrationRecord.json:
##
@@ -18,7 +18,9 @@
   "type": "metadata",
   "name": "ZkMigrationStateRecord",
   // Version 0 adds ZkMigrationState which is used by the KRaft controller to 
mark the beginning and end
-  // of the ZK to KRaft migration. Possible values are 1 (PreMigration), 2 
(Migration), 3 (PostMigration).
+  // of the ZK to KRaft migration. Possible values are 0 (None), 1 
(PreMigration), 2 (Migration), 3 (PostMigration).

Review Comment:
   I think this comment needs to be revised to remove the comment about the 
tagged field, and swap the value of PreMigration and Migration



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173091174


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -396,18 +418,36 @@ class WaitForControllerQuorumEvent extends MigrationEvent 
{
 
 @Override
 public void run() throws Exception {
-switch (migrationState) {
-case WAIT_FOR_CONTROLLER_QUORUM:
-if (isControllerQuorumReadyForMigration()) {
-log.debug("Controller Quorum is ready for Zk to KRaft 
migration");
-// Note that leadership would not change here. Hence 
we do not need to
-// `apply` any leadership state change.
-transitionTo(MigrationDriverState.WAIT_FOR_BROKERS);
+if 
(migrationState.equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM)) {
+if (!firstPublish) {
+log.trace("Waiting until we have received metadata before 
proceeding with migration");
+return;
+}
+
+ZkMigrationState zkMigrationState = 
image.features().zkMigrationState();

Review Comment:
   we can have a switch statement here right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173089410


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -222,7 +245,7 @@ private boolean isValidStateChange(MigrationDriverState 
newState) {
 
 private void transitionTo(MigrationDriverState newState) {
 if (!isValidStateChange(newState)) {
-log.error("Error transition in migration driver from {} to {}", 
migrationState, newState);
+log.error("Invalid transition in migration driver from {} to {}", 
migrationState, newState);

Review Comment:
   (I realize you aren't adding this, but just saw an opportunity for a minor 
cleanup)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173089106


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -222,7 +245,7 @@ private boolean isValidStateChange(MigrationDriverState 
newState) {
 
 private void transitionTo(MigrationDriverState newState) {
 if (!isValidStateChange(newState)) {
-log.error("Error transition in migration driver from {} to {}", 
migrationState, newState);
+log.error("Invalid transition in migration driver from {} to {}", 
migrationState, newState);

Review Comment:
   I'm confused about this function:
   
   1. Shouldn't the transition to UNINITIALIZED being invalid be covered by 
isValidStateChange returning false if you pass it newState = UNINITIALIZED ?
   2. The switch statement below throws an exception on invalid state change, 
whereas the initial check just logs a message at `error`. The exception seems 
more reasonable?
   3. the switch has a case for INACTIVE that just does nothing?
   ```
   switch (newState) {
   ...
   case INACTIVE:
   // Any state can go to INACTIVE.
   break;
   }
   
   ```
   
   Just leave out the case for `INACTIVE` since it's a no-op? (And maybe the 
whole switch, see above.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173082868


##
metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java:
##
@@ -105,14 +130,15 @@ private void writeFeatureLevels(ImageWriter writer, 
ImageWriterOptions options)
 
 @Override
 public int hashCode() {
-return finalizedVersions.hashCode();
+return Objects.hash(finalizedVersions, zkMigrationState);
 }
 
 @Override
 public boolean equals(Object o) {
 if (!(o instanceof FeaturesImage)) return false;
 FeaturesImage other = (FeaturesImage) o;
-return finalizedVersions.equals(other.finalizedVersions);
+return finalizedVersions.equals(other.finalizedVersions) &&

Review Comment:
   should test metadataVersion here too, oops



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173082641


##
metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java:
##
@@ -105,14 +130,15 @@ private void writeFeatureLevels(ImageWriter writer, 
ImageWriterOptions options)
 
 @Override
 public int hashCode() {
-return finalizedVersions.hashCode();
+return Objects.hash(finalizedVersions, zkMigrationState);

Review Comment:
   I think we should hash metadataVerison here too, oops



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173081897


##
metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java:
##
@@ -37,19 +39,30 @@
  * This class is thread-safe.
  */
 public final class FeaturesImage {
-public static final FeaturesImage EMPTY = new 
FeaturesImage(Collections.emptyMap(), MetadataVersion.MINIMUM_KRAFT_VERSION);
+public static final FeaturesImage EMPTY = new FeaturesImage(
+Collections.emptyMap(),
+MetadataVersion.MINIMUM_KRAFT_VERSION,
+ZkMigrationState.NONE
+);
 
 private final Map finalizedVersions;
 
 private final MetadataVersion metadataVersion;
 
-public FeaturesImage(Map finalizedVersions, MetadataVersion 
metadataVersion) {
+private final ZkMigrationState zkMigrationState;
+
+public FeaturesImage(
+Map finalizedVersions,
+MetadataVersion metadataVersion,
+ZkMigrationState zkMigrationState
+) {
 this.finalizedVersions = 
Collections.unmodifiableMap(finalizedVersions);
 this.metadataVersion = metadataVersion;
+this.zkMigrationState = zkMigrationState;
 }
 
 public boolean isEmpty() {
-return finalizedVersions.isEmpty();
+return finalizedVersions.isEmpty() && 
zkMigrationState.equals(ZkMigrationState.NONE);

Review Comment:
   I think we should also check that `metadataVersion` is equal to 
`MetadataVersion.MINIMUM_KRAFT_VERSION` here. This is an existing bug, oops! 
(although a very minor one that I doubt has an impact)
   
   Also let's add a unit test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173081897


##
metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java:
##
@@ -37,19 +39,30 @@
  * This class is thread-safe.
  */
 public final class FeaturesImage {
-public static final FeaturesImage EMPTY = new 
FeaturesImage(Collections.emptyMap(), MetadataVersion.MINIMUM_KRAFT_VERSION);
+public static final FeaturesImage EMPTY = new FeaturesImage(
+Collections.emptyMap(),
+MetadataVersion.MINIMUM_KRAFT_VERSION,
+ZkMigrationState.NONE
+);
 
 private final Map finalizedVersions;
 
 private final MetadataVersion metadataVersion;
 
-public FeaturesImage(Map finalizedVersions, MetadataVersion 
metadataVersion) {
+private final ZkMigrationState zkMigrationState;
+
+public FeaturesImage(
+Map finalizedVersions,
+MetadataVersion metadataVersion,
+ZkMigrationState zkMigrationState
+) {
 this.finalizedVersions = 
Collections.unmodifiableMap(finalizedVersions);
 this.metadataVersion = metadataVersion;
+this.zkMigrationState = zkMigrationState;
 }
 
 public boolean isEmpty() {
-return finalizedVersions.isEmpty();
+return finalizedVersions.isEmpty() && 
zkMigrationState.equals(ZkMigrationState.NONE);

Review Comment:
   I think we should also check that `metadataVersion` is equal to 
`MetadataVersion.MINIMUM_KRAFT_VERSION` here. This is an existing bug, oops!
   
   Also let's add a unit test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173080760


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1123,29 +1165,104 @@ private void claim(int epoch) {
 // Prepend the activate event. It is important that this event go 
at the beginning
 // of the queue rather than the end (hence prepend rather than 
append). It's also
 // important not to use prepend for anything else, to preserve the 
ordering here.
-queue.prepend(new ControllerWriteEvent<>("completeActivation[" + 
epoch + "]",
-new CompleteActivationEvent()));
+ControllerWriteEvent activationEvent = new 
ControllerWriteEvent<>("completeActivation[" + epoch + "]",
+new CompleteActivationEvent(),
+EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, 
RUNS_IN_PREMIGRATION));
+activationEvent.future.whenComplete((__, t) -> {
+if (t != null) {
+fatalFaultHandler.handleFault("exception while activating 
controller", t);
+}
+});
+queue.prepend(activationEvent);
 } catch (Throwable e) {
 fatalFaultHandler.handleFault("exception while claiming 
leadership", e);
 }
 }
 
+/**
+ * Generate the set of activation records. Until KIP-868 transactions are 
supported, these records
+ * are committed to the log as an atomic batch. The records will include 
the bootstrap metadata records
+ * (including the bootstrap "metadata.version") and may include a ZK 
migration record.
+ */
+public static void generateActivationRecords(
+Logger log,
+boolean isLogEmpty,
+boolean zkMigrationEnabled,
+BootstrapMetadata bootstrapMetadata,
+FeatureControlManager featureControl,
+Consumer recordConsumer
+) {
+if (isLogEmpty) {
+// If no records have been replayed, we need to write out the 
bootstrap records.
+// This will include the new metadata.version, as well as things 
like SCRAM
+// initialization, etc.
+log.info("The metadata log appears to be empty. Appending {} 
bootstrap record(s) " +
+"at metadata.version {} from {}.", 
bootstrapMetadata.records().size(),
+bootstrapMetadata.metadataVersion(), 
bootstrapMetadata.source());
+bootstrapMetadata.records().forEach(recordConsumer);
+
+if (bootstrapMetadata.metadataVersion().isMigrationSupported()) {
+if (zkMigrationEnabled) {
+log.info("Putting the controller into pre-migration mode. 
No metadata updates will be allowed until " +
+"the ZK metadata has been migrated");
+
recordConsumer.accept(ZkMigrationState.PRE_MIGRATION.toRecord());
+} else {
+log.debug("Setting the ZK migration state to NONE since 
this is a de-novo KRaft cluster.");
+recordConsumer.accept(ZkMigrationState.NONE.toRecord());
+}
+} else {
+if (zkMigrationEnabled) {
+throw new RuntimeException("The bootstrap metadata.version 
" + bootstrapMetadata.metadataVersion() +
+" does not support ZK migrations. Cannot continue with 
ZK migrations enabled.");
+}
+}
+} else {
+// Logs have been replayed. We need to initialize some things here 
if upgrading from older KRaft versions
+if 
(featureControl.metadataVersion().equals(MetadataVersion.MINIMUM_KRAFT_VERSION))
 {
+log.info("No metadata.version feature level record was found 
in the log. " +
+"Treating the log as version {}.", 
MetadataVersion.MINIMUM_KRAFT_VERSION);
+}
+
+if (featureControl.metadataVersion().isMigrationSupported()) {
+log.info("Loaded ZK migration state of {}", 
featureControl.zkMigrationState());
+switch (featureControl.zkMigrationState()) {
+case NONE:
+// Since this is the default state there may or may 
not be an actual NONE in the log. Regardless,
+// it will eventually be persisted in a snapshot, so 
we don't need to explicitly write it here.
+if (zkMigrationEnabled) {
+throw new RuntimeException("Should not have ZK 
migrations enabled on a cluster that was created in KRaft mode.");
+}
+break;
+case PRE_MIGRATION:

Review Comment:
   (So in other words, I think this is worthy of a `warn` level log, but not an 
exception)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to 

[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173079770


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1123,29 +1165,104 @@ private void claim(int epoch) {
 // Prepend the activate event. It is important that this event go 
at the beginning
 // of the queue rather than the end (hence prepend rather than 
append). It's also
 // important not to use prepend for anything else, to preserve the 
ordering here.
-queue.prepend(new ControllerWriteEvent<>("completeActivation[" + 
epoch + "]",
-new CompleteActivationEvent()));
+ControllerWriteEvent activationEvent = new 
ControllerWriteEvent<>("completeActivation[" + epoch + "]",
+new CompleteActivationEvent(),
+EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, 
RUNS_IN_PREMIGRATION));
+activationEvent.future.whenComplete((__, t) -> {
+if (t != null) {
+fatalFaultHandler.handleFault("exception while activating 
controller", t);
+}
+});
+queue.prepend(activationEvent);
 } catch (Throwable e) {
 fatalFaultHandler.handleFault("exception while claiming 
leadership", e);
 }
 }
 
+/**
+ * Generate the set of activation records. Until KIP-868 transactions are 
supported, these records
+ * are committed to the log as an atomic batch. The records will include 
the bootstrap metadata records
+ * (including the bootstrap "metadata.version") and may include a ZK 
migration record.
+ */
+public static void generateActivationRecords(
+Logger log,
+boolean isLogEmpty,
+boolean zkMigrationEnabled,
+BootstrapMetadata bootstrapMetadata,
+FeatureControlManager featureControl,
+Consumer recordConsumer
+) {
+if (isLogEmpty) {
+// If no records have been replayed, we need to write out the 
bootstrap records.
+// This will include the new metadata.version, as well as things 
like SCRAM
+// initialization, etc.
+log.info("The metadata log appears to be empty. Appending {} 
bootstrap record(s) " +
+"at metadata.version {} from {}.", 
bootstrapMetadata.records().size(),
+bootstrapMetadata.metadataVersion(), 
bootstrapMetadata.source());
+bootstrapMetadata.records().forEach(recordConsumer);
+
+if (bootstrapMetadata.metadataVersion().isMigrationSupported()) {
+if (zkMigrationEnabled) {
+log.info("Putting the controller into pre-migration mode. 
No metadata updates will be allowed until " +
+"the ZK metadata has been migrated");
+
recordConsumer.accept(ZkMigrationState.PRE_MIGRATION.toRecord());
+} else {
+log.debug("Setting the ZK migration state to NONE since 
this is a de-novo KRaft cluster.");
+recordConsumer.accept(ZkMigrationState.NONE.toRecord());
+}
+} else {
+if (zkMigrationEnabled) {
+throw new RuntimeException("The bootstrap metadata.version 
" + bootstrapMetadata.metadataVersion() +
+" does not support ZK migrations. Cannot continue with 
ZK migrations enabled.");
+}
+}
+} else {
+// Logs have been replayed. We need to initialize some things here 
if upgrading from older KRaft versions
+if 
(featureControl.metadataVersion().equals(MetadataVersion.MINIMUM_KRAFT_VERSION))
 {
+log.info("No metadata.version feature level record was found 
in the log. " +
+"Treating the log as version {}.", 
MetadataVersion.MINIMUM_KRAFT_VERSION);
+}
+
+if (featureControl.metadataVersion().isMigrationSupported()) {
+log.info("Loaded ZK migration state of {}", 
featureControl.zkMigrationState());
+switch (featureControl.zkMigrationState()) {
+case NONE:
+// Since this is the default state there may or may 
not be an actual NONE in the log. Regardless,
+// it will eventually be persisted in a snapshot, so 
we don't need to explicitly write it here.
+if (zkMigrationEnabled) {
+throw new RuntimeException("Should not have ZK 
migrations enabled on a cluster that was created in KRaft mode.");
+}
+break;
+case PRE_MIGRATION:
+throw new RuntimeException("Detected an failed 
migration state during bootstrap, cannot continue.");
+case MIGRATION:
+if (!zkMigrationEnabled) {
+ 

[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173079249


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1123,29 +1165,104 @@ private void claim(int epoch) {
 // Prepend the activate event. It is important that this event go 
at the beginning
 // of the queue rather than the end (hence prepend rather than 
append). It's also
 // important not to use prepend for anything else, to preserve the 
ordering here.
-queue.prepend(new ControllerWriteEvent<>("completeActivation[" + 
epoch + "]",
-new CompleteActivationEvent()));
+ControllerWriteEvent activationEvent = new 
ControllerWriteEvent<>("completeActivation[" + epoch + "]",
+new CompleteActivationEvent(),
+EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, 
RUNS_IN_PREMIGRATION));
+activationEvent.future.whenComplete((__, t) -> {
+if (t != null) {
+fatalFaultHandler.handleFault("exception while activating 
controller", t);
+}
+});
+queue.prepend(activationEvent);
 } catch (Throwable e) {
 fatalFaultHandler.handleFault("exception while claiming 
leadership", e);
 }
 }
 
+/**
+ * Generate the set of activation records. Until KIP-868 transactions are 
supported, these records
+ * are committed to the log as an atomic batch. The records will include 
the bootstrap metadata records
+ * (including the bootstrap "metadata.version") and may include a ZK 
migration record.
+ */
+public static void generateActivationRecords(
+Logger log,
+boolean isLogEmpty,
+boolean zkMigrationEnabled,
+BootstrapMetadata bootstrapMetadata,
+FeatureControlManager featureControl,
+Consumer recordConsumer
+) {
+if (isLogEmpty) {
+// If no records have been replayed, we need to write out the 
bootstrap records.
+// This will include the new metadata.version, as well as things 
like SCRAM
+// initialization, etc.
+log.info("The metadata log appears to be empty. Appending {} 
bootstrap record(s) " +
+"at metadata.version {} from {}.", 
bootstrapMetadata.records().size(),
+bootstrapMetadata.metadataVersion(), 
bootstrapMetadata.source());
+bootstrapMetadata.records().forEach(recordConsumer);
+
+if (bootstrapMetadata.metadataVersion().isMigrationSupported()) {
+if (zkMigrationEnabled) {
+log.info("Putting the controller into pre-migration mode. 
No metadata updates will be allowed until " +
+"the ZK metadata has been migrated");
+
recordConsumer.accept(ZkMigrationState.PRE_MIGRATION.toRecord());
+} else {
+log.debug("Setting the ZK migration state to NONE since 
this is a de-novo KRaft cluster.");
+recordConsumer.accept(ZkMigrationState.NONE.toRecord());
+}
+} else {
+if (zkMigrationEnabled) {
+throw new RuntimeException("The bootstrap metadata.version 
" + bootstrapMetadata.metadataVersion() +
+" does not support ZK migrations. Cannot continue with 
ZK migrations enabled.");
+}
+}
+} else {
+// Logs have been replayed. We need to initialize some things here 
if upgrading from older KRaft versions
+if 
(featureControl.metadataVersion().equals(MetadataVersion.MINIMUM_KRAFT_VERSION))
 {
+log.info("No metadata.version feature level record was found 
in the log. " +
+"Treating the log as version {}.", 
MetadataVersion.MINIMUM_KRAFT_VERSION);
+}
+
+if (featureControl.metadataVersion().isMigrationSupported()) {
+log.info("Loaded ZK migration state of {}", 
featureControl.zkMigrationState());
+switch (featureControl.zkMigrationState()) {
+case NONE:
+// Since this is the default state there may or may 
not be an actual NONE in the log. Regardless,
+// it will eventually be persisted in a snapshot, so 
we don't need to explicitly write it here.
+if (zkMigrationEnabled) {
+throw new RuntimeException("Should not have ZK 
migrations enabled on a cluster that was created in KRaft mode.");
+}
+break;
+case PRE_MIGRATION:
+throw new RuntimeException("Detected an failed 
migration state during bootstrap, cannot continue.");
+case MIGRATION:
+if (!zkMigrationEnabled) {
+ 

[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173078385


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1123,29 +1165,104 @@ private void claim(int epoch) {
 // Prepend the activate event. It is important that this event go 
at the beginning
 // of the queue rather than the end (hence prepend rather than 
append). It's also
 // important not to use prepend for anything else, to preserve the 
ordering here.
-queue.prepend(new ControllerWriteEvent<>("completeActivation[" + 
epoch + "]",
-new CompleteActivationEvent()));
+ControllerWriteEvent activationEvent = new 
ControllerWriteEvent<>("completeActivation[" + epoch + "]",
+new CompleteActivationEvent(),
+EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, 
RUNS_IN_PREMIGRATION));
+activationEvent.future.whenComplete((__, t) -> {
+if (t != null) {
+fatalFaultHandler.handleFault("exception while activating 
controller", t);
+}
+});
+queue.prepend(activationEvent);
 } catch (Throwable e) {
 fatalFaultHandler.handleFault("exception while claiming 
leadership", e);
 }
 }
 
+/**
+ * Generate the set of activation records. Until KIP-868 transactions are 
supported, these records
+ * are committed to the log as an atomic batch. The records will include 
the bootstrap metadata records
+ * (including the bootstrap "metadata.version") and may include a ZK 
migration record.
+ */
+public static void generateActivationRecords(
+Logger log,
+boolean isLogEmpty,
+boolean zkMigrationEnabled,
+BootstrapMetadata bootstrapMetadata,
+FeatureControlManager featureControl,
+Consumer recordConsumer
+) {
+if (isLogEmpty) {
+// If no records have been replayed, we need to write out the 
bootstrap records.
+// This will include the new metadata.version, as well as things 
like SCRAM
+// initialization, etc.
+log.info("The metadata log appears to be empty. Appending {} 
bootstrap record(s) " +
+"at metadata.version {} from {}.", 
bootstrapMetadata.records().size(),
+bootstrapMetadata.metadataVersion(), 
bootstrapMetadata.source());
+bootstrapMetadata.records().forEach(recordConsumer);
+
+if (bootstrapMetadata.metadataVersion().isMigrationSupported()) {
+if (zkMigrationEnabled) {
+log.info("Putting the controller into pre-migration mode. 
No metadata updates will be allowed until " +
+"the ZK metadata has been migrated");
+
recordConsumer.accept(ZkMigrationState.PRE_MIGRATION.toRecord());
+} else {
+log.debug("Setting the ZK migration state to NONE since 
this is a de-novo KRaft cluster.");
+recordConsumer.accept(ZkMigrationState.NONE.toRecord());
+}
+} else {
+if (zkMigrationEnabled) {
+throw new RuntimeException("The bootstrap metadata.version 
" + bootstrapMetadata.metadataVersion() +
+" does not support ZK migrations. Cannot continue with 
ZK migrations enabled.");
+}
+}
+} else {
+// Logs have been replayed. We need to initialize some things here 
if upgrading from older KRaft versions
+if 
(featureControl.metadataVersion().equals(MetadataVersion.MINIMUM_KRAFT_VERSION))
 {
+log.info("No metadata.version feature level record was found 
in the log. " +
+"Treating the log as version {}.", 
MetadataVersion.MINIMUM_KRAFT_VERSION);
+}
+
+if (featureControl.metadataVersion().isMigrationSupported()) {
+log.info("Loaded ZK migration state of {}", 
featureControl.zkMigrationState());
+switch (featureControl.zkMigrationState()) {
+case NONE:
+// Since this is the default state there may or may 
not be an actual NONE in the log. Regardless,
+// it will eventually be persisted in a snapshot, so 
we don't need to explicitly write it here.
+if (zkMigrationEnabled) {
+throw new RuntimeException("Should not have ZK 
migrations enabled on a cluster that was created in KRaft mode.");
+}
+break;
+case PRE_MIGRATION:

Review Comment:
   I don't think this is correct. We could have the following sequence:
   
   1. controller 0 writes migration record setting PRE_MIGRATION state
   2. controller 0 tries to load ZK image, fails for s

[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173076552


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1123,29 +1165,104 @@ private void claim(int epoch) {
 // Prepend the activate event. It is important that this event go 
at the beginning
 // of the queue rather than the end (hence prepend rather than 
append). It's also
 // important not to use prepend for anything else, to preserve the 
ordering here.
-queue.prepend(new ControllerWriteEvent<>("completeActivation[" + 
epoch + "]",
-new CompleteActivationEvent()));
+ControllerWriteEvent activationEvent = new 
ControllerWriteEvent<>("completeActivation[" + epoch + "]",
+new CompleteActivationEvent(),
+EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, 
RUNS_IN_PREMIGRATION));
+activationEvent.future.whenComplete((__, t) -> {
+if (t != null) {
+fatalFaultHandler.handleFault("exception while activating 
controller", t);
+}
+});
+queue.prepend(activationEvent);
 } catch (Throwable e) {
 fatalFaultHandler.handleFault("exception while claiming 
leadership", e);
 }
 }
 
+/**
+ * Generate the set of activation records. Until KIP-868 transactions are 
supported, these records
+ * are committed to the log as an atomic batch. The records will include 
the bootstrap metadata records
+ * (including the bootstrap "metadata.version") and may include a ZK 
migration record.
+ */
+public static void generateActivationRecords(

Review Comment:
   Can we just return a `List`? I don't think we're 
creating enough records to make a new list an efficiency concern...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173073635


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1123,29 +1165,104 @@ private void claim(int epoch) {
 // Prepend the activate event. It is important that this event go 
at the beginning
 // of the queue rather than the end (hence prepend rather than 
append). It's also
 // important not to use prepend for anything else, to preserve the 
ordering here.
-queue.prepend(new ControllerWriteEvent<>("completeActivation[" + 
epoch + "]",
-new CompleteActivationEvent()));
+ControllerWriteEvent activationEvent = new 
ControllerWriteEvent<>("completeActivation[" + epoch + "]",
+new CompleteActivationEvent(),
+EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, 
RUNS_IN_PREMIGRATION));
+activationEvent.future.whenComplete((__, t) -> {
+if (t != null) {
+fatalFaultHandler.handleFault("exception while activating 
controller", t);

Review Comment:
   Yeah, this is fair, I guess. We should invoke the fault handler if we have a 
failure in the activation event. Although we'll already catch 
non-`ApiException`. But still. A good check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173072509


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1085,7 +1123,11 @@ private void maybeCompleteAuthorizerInitialLoad() {
 }
 
 private boolean isActiveController() {
-return curClaimEpoch != -1;
+return isActiveController(curClaimEpoch);
+}
+
+private boolean isActiveController(int claimEpoch) {

Review Comment:
   function can be `static` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173069848


##
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##
@@ -304,6 +338,13 @@ public void replay(FeatureLevelRecord record) {
 }
 }
 
+public void replay(ZkMigrationStateRecord record) {
+ZkMigrationState recordState = 
ZkMigrationState.of(record.zkMigrationState());
+ZkMigrationState currentState = migrationControlState.get();
+log.info("Transitioning ZK migration state from {} to {}", 
currentState, recordState);

Review Comment:
   Maybe we should not log this (or log at DEBUG only) if the current state and 
new state are the same?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173068660


##
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##
@@ -232,13 +248,20 @@ private ApiError updateMetadataVersion(
 Consumer recordConsumer
 ) {
 MetadataVersion currentVersion = metadataVersion();
+ZkMigrationState zkMigrationState = zkMigrationState();
 final MetadataVersion newVersion;
 try {
 newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel);
 } catch (IllegalArgumentException e) {
 return invalidMetadataVersion(newVersionLevel, "Unknown 
metadata.version.");
 }
 
+// Don't allow metadata.version changes while we're migrating
+if (EnumSet.of(ZkMigrationState.PRE_MIGRATION, 
ZkMigrationState.MIGRATION).contains(zkMigrationState)) {

Review Comment:
   Hmm, seems like this might come up in a bunch of places. Maybe we can have a 
member function in `ZkMigrationState.java` like 
   boolean inProgress() { return this == PRE_MIGRATION || this == 
POST_MIGRATION; }
   ```



##
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##
@@ -232,13 +248,20 @@ private ApiError updateMetadataVersion(
 Consumer recordConsumer
 ) {
 MetadataVersion currentVersion = metadataVersion();
+ZkMigrationState zkMigrationState = zkMigrationState();
 final MetadataVersion newVersion;
 try {
 newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel);
 } catch (IllegalArgumentException e) {
 return invalidMetadataVersion(newVersionLevel, "Unknown 
metadata.version.");
 }
 
+// Don't allow metadata.version changes while we're migrating
+if (EnumSet.of(ZkMigrationState.PRE_MIGRATION, 
ZkMigrationState.MIGRATION).contains(zkMigrationState)) {

Review Comment:
   Hmm, seems like this might come up in a bunch of places. Maybe we can have a 
member function in `ZkMigrationState.java` like 
   ```
   boolean inProgress() { return this == PRE_MIGRATION || this == 
POST_MIGRATION; }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173068660


##
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##
@@ -232,13 +248,20 @@ private ApiError updateMetadataVersion(
 Consumer recordConsumer
 ) {
 MetadataVersion currentVersion = metadataVersion();
+ZkMigrationState zkMigrationState = zkMigrationState();
 final MetadataVersion newVersion;
 try {
 newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel);
 } catch (IllegalArgumentException e) {
 return invalidMetadataVersion(newVersionLevel, "Unknown 
metadata.version.");
 }
 
+// Don't allow metadata.version changes while we're migrating
+if (EnumSet.of(ZkMigrationState.PRE_MIGRATION, 
ZkMigrationState.MIGRATION).contains(zkMigrationState)) {

Review Comment:
   Hmm, seems like this might come up in a bunch of places. Maybe we can have 
something like `boolean ZkMigrationState#inProgress() { return this == 
PRE_MIGRATION || this == POST_MIGRATION; }`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173066452


##
core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala:
##
@@ -145,25 +146,29 @@ class BrokerRegistrationRequestTest {
   }
 
   @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, 
metadataVersion = MetadataVersion.IBP_3_3_IV3,
-serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "true")))
-  def testRegisterZkWithKRaftOldMetadataVersion(clusterInstance: 
ClusterInstance): Unit = {
+serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "false")))

Review Comment:
   Do we have a test of trying to start the controller with 
`zookeeper.metadata.migration.enable = true` and a bootstrap MV that's too old? 
It should exit the controller, right? (Due to throwing that RuntimeException)?
   
   It would be good to have an integration test of that...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173062692


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -324,4 +329,29 @@ class ZkMigrationIntegrationTest {
   assertTrue(firstProducerIdBlock.firstProducerId() < 
producerIdBlock.firstProducerId())
 }
   }
+
+  /**
+   * Start a KRaft cluster with migrations enabled, verify that the controller 
does not accept metadata changes
+   * through the RPCs
+   */

Review Comment:
   We should also explain in this comment why the migration does not proceed 
beyond premigration (I assume it's because we're waiting for brokers and we 
don't have any due to brokers =  0)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173062239


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -324,4 +329,29 @@ class ZkMigrationIntegrationTest {
   assertTrue(firstProducerIdBlock.firstProducerId() < 
producerIdBlock.firstProducerId())
 }
   }
+
+  /**
+   * Start a KRaft cluster with migrations enabled, verify that the controller 
does not accept metadata changes
+   * through the RPCs
+   */
+  @ClusterTests(Array(
+new ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, 
metadataVersion = MetadataVersion.IBP_3_4_IV0,
+  serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "true"))),
+new ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, 
metadataVersion = MetadataVersion.IBP_3_5_IV0,
+  serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "true"))),
+new ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, 
metadataVersion = MetadataVersion.IBP_3_5_IV1,
+  serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "true")))
+  ))
+  def testPreMigrationMode(clusterInstance: ClusterInstance): Unit = {

Review Comment:
   Hmm... I don't see how this can work without brokers, since the admin client 
needs to bootstrap through broker servers, right? So I don't think this is 
testing what you might think.
   
   You might just have to send a raw RPC, as unpleasant as that is to do. That 
will also allow you to verify that you get NOT_CONTROLLER.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] szalapski commented on pull request #5876: KAFKA-7509: Avoid passing most non-applicable properties to producer, consumer, and admin client

2023-04-20 Thread via GitHub


szalapski commented on PR #5876:
URL: https://github.com/apache/kafka/pull/5876#issuecomment-1516905377

   Sad that this didn't get in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-20 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1173050527


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.

Review Comment:
   okay



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-20 Thread via GitHub


philipnee commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1516893617

   Failing tests don't seem to be related.
   
   ```
   Build / JDK 8 and Scala 2.12 / 
testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   1m 46s
   Build / JDK 8 and Scala 2.12 / 
testSyncTopicConfigUseProvidedForwardingAdmin() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 26s
   Build / JDK 8 and Scala 2.12 / 
testSyncTopicConfigUseProvidedForwardingAdmin() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   1m 46s
   Build / JDK 11 and Scala 2.13 / 
testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   3m 15s
   Build / JDK 11 and Scala 2.13 / 
testCreatePartitionsUseProvidedForwardingAdmin() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 3s
   Build / JDK 11 and Scala 2.13 / 
testSyncTopicConfigUseProvidedForwardingAdmin() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 41s
   Build / JDK 11 and Scala 2.13 / 
testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 23s
   Build / JDK 11 and Scala 2.13 / 
testCreatePartitionsUseProvidedForwardingAdmin() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   1m 55s
   Build / JDK 11 and Scala 2.13 / 
testSyncTopicConfigUseProvidedForwardingAdmin() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 7s
   Build / JDK 11 and Scala 2.13 / testWithGroupMetadata() – 
kafka.api.TransactionsBounceTest
   1m 22s
   Build / JDK 11 and Scala 2.13 / testCreateClusterAndCreateAndManyTopics() – 
kafka.server.KRaftClusterTest
   18s
   Build / JDK 17 and Scala 2.13 / 
testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 14s
   Build / JDK 17 and Scala 2.13 / 
testCreatePartitionsUseProvidedForwardingAdmin() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 22s
   Build / JDK 17 and Scala 2.13 / 
testSyncTopicConfigUseProvidedForwardingAdmin() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 16s
   Build / JDK 17 and Scala 2.13 / 
testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 16s
   Build / JDK 17 and Scala 2.13 / 
testCreatePartitionsUseProvidedForwardingAdmin() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 53s
   Build / JDK 17 and Scala 2.13 / 
testSyncTopicConfigUseProvidedForwardingAdmin() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   2m 13s
   Build / JDK 17 and Scala 2.13 / 
testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted – 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
   1m 14s
   Existing failures - 3
   Build / JDK 8 and Scala 2.12 / putTopicStateRetriableFailure – 
org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest
   <1s
   Build / JDK 11 and Scala 2.13 / putTopicStateRetriableFailure – 
org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest
   <1s
   Build / JDK 17 and Scala 2.13 / putTopicStateRetriableFailure – 
org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-20 Thread via GitHub


philipnee commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1516893160

   Hey @jolshan - Thanks for the review.  i reverted those 
documentation/comment changes in the senderTest.java (for the produceResponse 
authorization error).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14904) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

2023-04-20 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-14904.

Resolution: Fixed

> Flaky Test  kafka.api.TransactionsBounceTest.testWithGroupId()
> --
>
> Key: KAFKA-14904
> URL: https://issues.apache.org/jira/browse/KAFKA-14904
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.5.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
>
> After merging KAFKA-14561 I noticed this test still occasionally failed via 
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6ms while awaiting EndTxn(true)
> I will investigate the cause. 
> Note: This error occurs when we are waiting for the transaction to be 
> committed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-14884) Include check transaction is still ongoing right before append

2023-04-20 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reopened KAFKA-14884:


I'm confused by all my blockers 🤦‍♀️

> Include check transaction is still ongoing right before append 
> ---
>
> Key: KAFKA-14884
> URL: https://issues.apache.org/jira/browse/KAFKA-14884
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.5.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
>
> Even after checking via AddPartitionsToTxn, the transaction could be aborted 
> after the response. We can add one more check before appending.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata

2023-04-20 Thread via GitHub


jeffkbkim commented on code in PR #13537:
URL: https://github.com/apache/kafka/pull/13537#discussion_r1172953390


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ClientAssignor.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * An immutable representation of a client side assignor within a consumer 
group member.
+ */
+public class ClientAssignor {
+/**
+ * The name of the assignor.
+ */
+private final String name;
+
+/**
+ * The reason reported by the assignor.
+ */
+private final byte reason;
+
+/**
+ * The minimum metadata version supported by the assignor.
+ */
+private final short minimumVersion;
+
+/**
+ * The maximum metadata version supported by the assignor.
+ */
+private final short maximumVersion;
+
+/**
+ * The versioned metadata.
+ */
+private final VersionedMetadata metadata;

Review Comment:
   sorry to have hijacked this thread, but something like
   ```
   /**
* The error assigned to the member. Depends on the client assignor 
* implementation and commonly used for Streams
*/
   private final byte error;
   ```
   
   if you think what we have is sufficient, i am fine with it. just wanted to 
mention that i didn't fully understand how it would be used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata

2023-04-20 Thread via GitHub


dajac commented on code in PR #13537:
URL: https://github.com/apache/kafka/pull/13537#discussion_r1172945841


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ClientAssignor.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * An immutable representation of a client side assignor within a consumer 
group member.
+ */
+public class ClientAssignor {
+/**
+ * The name of the assignor.
+ */
+private final String name;
+
+/**
+ * The reason reported by the assignor.
+ */
+private final byte reason;
+
+/**
+ * The minimum metadata version supported by the assignor.
+ */
+private final short minimumVersion;
+
+/**
+ * The maximum metadata version supported by the assignor.
+ */
+private final short maximumVersion;
+
+/**
+ * The versioned metadata.
+ */
+private final VersionedMetadata metadata;
+
+public ClientAssignor(
+String name,
+byte reason,
+short minimumVersion,
+short maximumVersion,
+VersionedMetadata metadata
+) {
+this.name = Objects.requireNonNull(name);
+this.reason = reason;
+this.minimumVersion = minimumVersion;

Review Comment:
   I think that I have added the ones where relevant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata

2023-04-20 Thread via GitHub


dajac commented on code in PR #13537:
URL: https://github.com/apache/kafka/pull/13537#discussion_r1172946358


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ClientAssignorTest.java:
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ClientAssignorTest {
+
+@Test
+public void testNameAndMetadataCannotBeBull() {

Review Comment:
   This is not necessary in my opinion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] emissionnebula commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures

2023-04-20 Thread via GitHub


emissionnebula commented on code in PR #13437:
URL: https://github.com/apache/kafka/pull/13437#discussion_r1172848492


##
server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java:
##
@@ -225,77 +225,59 @@ public void testDelegationOfReplaceAll() {
 new PCollectionsHashMapWrapperDelegationChecker<>()
 .defineMockConfigurationForVoidMethodInvocation(mock -> 
mock.replaceAll(eq(mockBiFunction)))
 .defineWrapperVoidMethodInvocation(wrapper -> 
wrapper.replaceAll(mockBiFunction))
-.doVoidMethodDelegationCheck();
+.doUnsupportedVoidFunctionDelegrationCheck();
 }
 
 @Test
 public void testDelegationOfPutIfAbsent() {
 new PCollectionsHashMapWrapperDelegationChecker<>()
-.defineMockConfigurationForFunctionInvocation(mock -> 
mock.putIfAbsent(eq(this), eq(this)), this)
-
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> 
wrapper.putIfAbsent(this, this), identity())
-.doFunctionDelegationCheck();
-}
-
-@ParameterizedTest
-@ValueSource(booleans = {true, false})
-public void testDelegationOfRemoveByKeyAndValue(boolean 
mockFunctionReturnValue) {
-new PCollectionsHashMapWrapperDelegationChecker<>()
-.defineMockConfigurationForFunctionInvocation(mock -> 
mock.remove(eq(this), eq(this)), mockFunctionReturnValue)
-
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> 
wrapper.remove(this, this), identity())
-.doFunctionDelegationCheck();
-}
-
-@ParameterizedTest
-@ValueSource(booleans = {true, false})
-public void testDelegationOfReplaceWhenMappedToSpecificValue(boolean 
mockFunctionReturnValue) {
-new PCollectionsHashMapWrapperDelegationChecker<>()
-.defineMockConfigurationForFunctionInvocation(mock -> 
mock.replace(eq(this), eq(this), eq(this)), mockFunctionReturnValue)
-
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> 
wrapper.replace(this, this, this), identity())
-.doFunctionDelegationCheck();

Review Comment:
   These two functions are not overridden by PCollection classes. 
`mock.replace()` or `mock.remove()` will call Java's default implementation in 
`Map.replace()` or `AbstractMap.remove()`. Mocking and testing that I felt is 
not required.
   
   For example: `Map.replace()` looks like this: 
   ```
   default boolean replace(K key, V oldValue, V newValue) {
   Object curValue = get(key);
   if (!Objects.equals(curValue, oldValue) ||
   (curValue == null && !containsKey(key))) {
   return false;
   }
   put(key, newValue);
   return true;
   }
   ```
   Since it internally calls `put()` which we are already testing, so that's 
why I thought it was not necessary to test these functions. Likewise for 
`remove()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata

2023-04-20 Thread via GitHub


jolshan commented on code in PR #13537:
URL: https://github.com/apache/kafka/pull/13537#discussion_r1172939909


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ClientAssignorTest.java:
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ClientAssignorTest {
+
+@Test
+public void testNameAndMetadataCannotBeBull() {

Review Comment:
   also did we want tests for some of the other errors we throw?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata

2023-04-20 Thread via GitHub


jolshan commented on code in PR #13537:
URL: https://github.com/apache/kafka/pull/13537#discussion_r1172939598


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ClientAssignorTest.java:
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ClientAssignorTest {
+
+@Test
+public void testNameAndMetadataCannotBeBull() {

Review Comment:
   nit: Null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata

2023-04-20 Thread via GitHub


jolshan commented on code in PR #13537:
URL: https://github.com/apache/kafka/pull/13537#discussion_r1172939142


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ClientAssignor.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * An immutable representation of a client side assignor within a consumer 
group member.
+ */
+public class ClientAssignor {
+/**
+ * The name of the assignor.
+ */
+private final String name;
+
+/**
+ * The reason reported by the assignor.
+ */
+private final byte reason;
+
+/**
+ * The minimum metadata version supported by the assignor.
+ */
+private final short minimumVersion;
+
+/**
+ * The maximum metadata version supported by the assignor.
+ */
+private final short maximumVersion;
+
+/**
+ * The versioned metadata.
+ */
+private final VersionedMetadata metadata;
+
+public ClientAssignor(
+String name,
+byte reason,
+short minimumVersion,
+short maximumVersion,
+VersionedMetadata metadata
+) {
+this.name = Objects.requireNonNull(name);
+this.reason = reason;
+this.minimumVersion = minimumVersion;

Review Comment:
   There were a few others in other classes. Did we want to do the same for 
those? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13572: KAFKA-14908: Set setReuseAddress on the kafka server socket

2023-04-20 Thread via GitHub


divijvaidya commented on PR #13572:
URL: https://github.com/apache/kafka/pull/13572#issuecomment-1516728614

   Hey @cmccabe, we can have a community discussion over the mailing list but I 
am not following one of your statements here.
   
   > since it will allow multiple Kafka servers to be started on the same port
   
   Can you please help me understand why? I am asking this because AFAIK 
(correct me if I am wrong), all that this flag does is to allow re-use of a 
socket which is in TIME_WAIT state. If it is in another state, i.e. it is being 
used by a different Kafka process, the binding is not allowed, hence, two Kafka 
servers cannot run on the same port (or rather in this context, reuse the 
socket). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13572: KAFKA-14908: Set setReuseAddress on the kafka server socket

2023-04-20 Thread via GitHub


cmccabe commented on PR #13572:
URL: https://github.com/apache/kafka/pull/13572#issuecomment-1516694321

   Hi all,
   
   Unfortunately I had to revert this. This is a change to our public API since 
it will allow multiple Kafka servers to be started on the same port. This kind 
of change needs a KIP (Kafka Improvement Proposal) so we can discuss the pros 
and cons.
   
   The change also does very different things on different operating systems. 
There is a rundown here: 
https://stackoverflow.com/questions/14388706/how-do-so-reuseaddr-and-so-reuseport-differ
   
   > Security: SO_REUSEADDR can allow an attacker to perform DDOS by creating a 
genuine connection with same IP and port. Prior to this change, the TIME_WAIT 
state of the socket would have prevented immediate re-connection. This is an 
acceptable risk for Kafka because we have connection throttling available in 
code for IP addresses and a user may choose to configure it to prevent a DDOS.
   
   Again, this needs a KIP so we can discuss whether the increased DDOS risk is 
acceptable or not. The answer may be different for different users.
   
   Personally, I think that at minimum SO_REUSEADDR should be an option (not 
mandatory) and probably not the default.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-04-20 Thread via GitHub


divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1172892464


##
clients/src/test/java/org/apache/kafka/common/utils/SkippableChunkedBytesStreamTest.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SkippableChunkedBytesStreamTest {
+private static final Random RANDOM = new Random(1337);
+private final BufferSupplier supplier = BufferSupplier.NO_CACHING;
+
+@ParameterizedTest
+@MethodSource("provideSourceSkipValuesForTest")
+public void skip_testCorrectness(int bytesToPreRead, ByteBuffer inputBuf, 
int numBytesToSkip) throws IOException {
+int expectedInpLeftAfterSkip = inputBuf.remaining() - bytesToPreRead - 
numBytesToSkip;
+int expectedSkippedBytes = Math.min(inputBuf.remaining() - 
bytesToPreRead, numBytesToSkip);
+
+try (BytesStream is = new ChunkedBytesStream(new 
ByteBufferInputStream(inputBuf.duplicate()), supplier, 10)) {
+int cnt = 0;
+while (cnt++ < bytesToPreRead) {
+is.readByte();
+}
+
+int res = is.skipBytes(numBytesToSkip);
+assertEquals(expectedSkippedBytes, res);
+
+// verify that we are able to read rest of the input
+cnt = 0;
+while (cnt++ < expectedInpLeftAfterSkip) {
+is.readByte();
+}
+}
+}
+
+@Test
+public void skip_testEndOfSource() throws IOException {

Review Comment:
   Fixed in latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-04-20 Thread via GitHub


divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1172889948


##
clients/src/main/java/org/apache/kafka/common/utils/SkippableChunkedBytesStream.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * SkippableChunkedBytesStream is a variant of ChunkedBytesStream which does 
not push skip() to the sourceStream.
+ * 
+ * Unlike BufferedInputStream.skip() and ChunkedBytesStream.skip(), this does 
not push skip() to sourceStream.
+ * We want to avoid pushing this to sourceStream because it's implementation 
maybe inefficient, e.g. the case of Z
+ * stdInputStream which allocates a new buffer from buffer pool, per skip call.
+ *
+ * @see ChunkedBytesStream
+ */
+public class SkippableChunkedBytesStream extends ChunkedBytesStream {

Review Comment:
   1. done. I removed this SkippableChunkedBytesStream and control this 
behaviour in ChunkedBytesStream using a flag.
   
   2. No. The change I made was to introduce a new Stream which doesn't require 
us to create a buffer when passing compressed data to Zstd. The new stream was 
added in 
[ZstdBufferDecompressingStreamNoFinalizer](https://github.com/luben/zstd-jni/pull/244)
  but it's usage in Zstd is pending completion of 
https://github.com/luben/zstd-jni/issues/252 which I plan to pick up next week. 
The migration to using this new Stream will be done in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-04-20 Thread via GitHub


divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1172884561


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks 
of configurable size. The
+ * implementation of this reader is optimized to reduce the number of calls to 
sourceStream#read(). This works best in
+ * scenarios where sourceStream#read() call is expensive, e.g. when the call 
crosses JNI boundary.
+ * 
+ * The functionality of this stream is a combination of DataInput and 
BufferedInputStream with the following
+ * differences:
+ * - Unlike BufferedInputStream.skip()

Review Comment:
   Fixed.



##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks 
of configurable size. The
+ * implementation of this reader is optimized to reduce the number of calls to 
sourceStream#read(). This works best in
+ * scenarios where sourceStream#read() call is expensive, e.g. when the call 
crosses JNI boundary.
+ * 
+ * The functionality of this stream is a combination of DataInput and 
BufferedInputStream with the following
+ * differences:
+ * - Unlike BufferedInputStream.skip()
+ * - Unlike BufferedInputStream, which allocates an intermediate buffer, this 
uses a buffer supplier to create the
+ * intermediate buffer
+ * - Unlike DataInputStream, the readByte method does not push the reading of 
a byte to sourceStream.
+ * 
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where 
multiple threads access this.
+ * - many method are un-supported in this class because they aren't currently 
used in the caller code.

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-04-20 Thread via GitHub


divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1172883282


##
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##
@@ -157,7 +186,14 @@ public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSu
  *batch. As such, a supplier that 
reuses buffers will have a significant
  *performance impact.
  */
-public abstract InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier);
+public abstract BytesStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier);
+
+/**
+ * Recommended size of buffer for storing decompressed output.
+ */
+public int getRecommendedDOutSize() {

Review Comment:
   changed in latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-04-20 Thread via GitHub


divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1172883027


##
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##
@@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
 public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
 return ZstdFactory.wrapForInput(buffer, messageVersion, 
decompressionBufferSupplier);
 }
+
+@Override
+public int getRecommendedDOutSize() {
+return 16 * 1024; // 16KB

Review Comment:
   It was 16KB itself (+2KB for skipBuffer which we removed). 
https://github.com/apache/kafka/blob/ef09a2e3fc11a738f6681fd57fb84ad109593fd3/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java#L68



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-04-20 Thread via GitHub


divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1172882018


##
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##
@@ -108,12 +117,18 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
 @Override
 public InputStream wrapForInput(ByteBuffer inputBuffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
 try {
-return new KafkaLZ4BlockInputStream(inputBuffer, 
decompressionBufferSupplier,
-messageVersion == 
RecordBatch.MAGIC_VALUE_V0);
+return new ChunkedDataInputStream(
+new KafkaLZ4BlockInputStream(inputBuffer, 
decompressionBufferSupplier, messageVersion == RecordBatch.MAGIC_VALUE_V0),
+decompressionBufferSupplier, getRecommendedDOutSize());
 } catch (Throwable e) {
 throw new KafkaException(e);
 }
 }
+
+@Override
+public int getRecommendedDOutSize() {
+return 2 * 1024; // 2KB

Review Comment:
   Done in latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-04-20 Thread via GitHub


divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1172881788


##
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##
@@ -90,8 +95,13 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
 }
 
 @Override
-public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
-return SnappyFactory.wrapForInput(buffer);
+public BytesStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
+return new 
SkippableChunkedBytesStream(SnappyFactory.wrapForInput(buffer), 
decompressionBufferSupplier, getRecommendedDOutSize());
+}
+
+@Override
+public int getRecommendedDOutSize() {
+return 8 * 1024; // 8KB

Review Comment:
   I honestly don't remember now. I changed it back to 2KB and benchmarked 
again, it didn't change anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14884) Include check transaction is still ongoing right before append

2023-04-20 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-14884.

Resolution: Fixed

> Include check transaction is still ongoing right before append 
> ---
>
> Key: KAFKA-14884
> URL: https://issues.apache.org/jira/browse/KAFKA-14884
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.5.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
>
> Even after checking via AddPartitionsToTxn, the transaction could be aborted 
> after the response. We can add one more check before appending.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14916) Fix code that assumes transactional ID implies all records are transactional

2023-04-20 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-14916:
---
Description: 
KAFKA-14561 wrote code that assumed that if a transactional ID was included, 
all record batches were transactional and had the same producer ID.

This work with improve validation and fix the code that assumes all batches are 
transactional.

Currently the code does not enforce that there can not be differing producer 
IDs. This will be enforced. 
Further, KAFKA-14561 will not assume all records are transactional.

  was:
KAFKA-14561 wrote code that assumed that if a transactional ID was included, 
all record batches were transactional and had the same producer ID.

This work with improve validation and fix the code that assumes all batches are 
transactional.


> Fix code that assumes transactional ID implies all records are transactional
> 
>
> Key: KAFKA-14916
> URL: https://issues.apache.org/jira/browse/KAFKA-14916
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
>
> KAFKA-14561 wrote code that assumed that if a transactional ID was included, 
> all record batches were transactional and had the same producer ID.
> This work with improve validation and fix the code that assumes all batches 
> are transactional.
> Currently the code does not enforce that there can not be differing producer 
> IDs. This will be enforced. 
> Further, KAFKA-14561 will not assume all records are transactional.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] emissionnebula commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures

2023-04-20 Thread via GitHub


emissionnebula commented on code in PR #13437:
URL: https://github.com/apache/kafka/pull/13437#discussion_r1172850344


##
server-common/src/test/java/org/apache/kafka/server/immutable/DelegationChecker.java:
##
@@ -139,6 +163,16 @@ public void doFunctionDelegationCheck() {
 }
 }
 
+public void doUnsupportedFunctionDelegrationCheck() {

Review Comment:
   yup, I thought it made sense to test it explicitly.
   
   Updated the functions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] emissionnebula commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures

2023-04-20 Thread via GitHub


emissionnebula commented on code in PR #13437:
URL: https://github.com/apache/kafka/pull/13437#discussion_r1172848492


##
server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java:
##
@@ -225,77 +225,59 @@ public void testDelegationOfReplaceAll() {
 new PCollectionsHashMapWrapperDelegationChecker<>()
 .defineMockConfigurationForVoidMethodInvocation(mock -> 
mock.replaceAll(eq(mockBiFunction)))
 .defineWrapperVoidMethodInvocation(wrapper -> 
wrapper.replaceAll(mockBiFunction))
-.doVoidMethodDelegationCheck();
+.doUnsupportedVoidFunctionDelegrationCheck();
 }
 
 @Test
 public void testDelegationOfPutIfAbsent() {
 new PCollectionsHashMapWrapperDelegationChecker<>()
-.defineMockConfigurationForFunctionInvocation(mock -> 
mock.putIfAbsent(eq(this), eq(this)), this)
-
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> 
wrapper.putIfAbsent(this, this), identity())
-.doFunctionDelegationCheck();
-}
-
-@ParameterizedTest
-@ValueSource(booleans = {true, false})
-public void testDelegationOfRemoveByKeyAndValue(boolean 
mockFunctionReturnValue) {
-new PCollectionsHashMapWrapperDelegationChecker<>()
-.defineMockConfigurationForFunctionInvocation(mock -> 
mock.remove(eq(this), eq(this)), mockFunctionReturnValue)
-
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> 
wrapper.remove(this, this), identity())
-.doFunctionDelegationCheck();
-}
-
-@ParameterizedTest
-@ValueSource(booleans = {true, false})
-public void testDelegationOfReplaceWhenMappedToSpecificValue(boolean 
mockFunctionReturnValue) {
-new PCollectionsHashMapWrapperDelegationChecker<>()
-.defineMockConfigurationForFunctionInvocation(mock -> 
mock.replace(eq(this), eq(this), eq(this)), mockFunctionReturnValue)
-
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> 
wrapper.replace(this, this, this), identity())
-.doFunctionDelegationCheck();

Review Comment:
   These two functions are not overridden by PCollection classes. 
`mock.replace()` or `mock.remove()` will call Java's default implementation in 
`Map.replace()` or `AbstractMap.remove()`. Mocking that testing that seemed not 
required.
   
   For example: `Map.replace()` looks like this: 
   ```
   default boolean replace(K key, V oldValue, V newValue) {
   Object curValue = get(key);
   if (!Objects.equals(curValue, oldValue) ||
   (curValue == null && !containsKey(key))) {
   return false;
   }
   put(key, newValue);
   return true;
   }
   ```
   Since it internally calls `put()` which we are already testing, so that's 
why I thought it was not necessary to test these functions. Likewise for 
`remove()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] emissionnebula commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures

2023-04-20 Thread via GitHub


emissionnebula commented on code in PR #13437:
URL: https://github.com/apache/kafka/pull/13437#discussion_r1172848492


##
server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java:
##
@@ -225,77 +225,59 @@ public void testDelegationOfReplaceAll() {
 new PCollectionsHashMapWrapperDelegationChecker<>()
 .defineMockConfigurationForVoidMethodInvocation(mock -> 
mock.replaceAll(eq(mockBiFunction)))
 .defineWrapperVoidMethodInvocation(wrapper -> 
wrapper.replaceAll(mockBiFunction))
-.doVoidMethodDelegationCheck();
+.doUnsupportedVoidFunctionDelegrationCheck();
 }
 
 @Test
 public void testDelegationOfPutIfAbsent() {
 new PCollectionsHashMapWrapperDelegationChecker<>()
-.defineMockConfigurationForFunctionInvocation(mock -> 
mock.putIfAbsent(eq(this), eq(this)), this)
-
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> 
wrapper.putIfAbsent(this, this), identity())
-.doFunctionDelegationCheck();
-}
-
-@ParameterizedTest
-@ValueSource(booleans = {true, false})
-public void testDelegationOfRemoveByKeyAndValue(boolean 
mockFunctionReturnValue) {
-new PCollectionsHashMapWrapperDelegationChecker<>()
-.defineMockConfigurationForFunctionInvocation(mock -> 
mock.remove(eq(this), eq(this)), mockFunctionReturnValue)
-
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> 
wrapper.remove(this, this), identity())
-.doFunctionDelegationCheck();
-}
-
-@ParameterizedTest
-@ValueSource(booleans = {true, false})
-public void testDelegationOfReplaceWhenMappedToSpecificValue(boolean 
mockFunctionReturnValue) {
-new PCollectionsHashMapWrapperDelegationChecker<>()
-.defineMockConfigurationForFunctionInvocation(mock -> 
mock.replace(eq(this), eq(this), eq(this)), mockFunctionReturnValue)
-
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> 
wrapper.replace(this, this, this), identity())
-.doFunctionDelegationCheck();

Review Comment:
   These two functions are not overridden by PCollection classes. 
`mock.replace()` or `mock.remove()` will call Java's default implementation in 
`Map.replace()` or `AbstractMap.remove()`. Mocking that testing that seemed not 
required.
   
   For example: `Map.replace()` looks like this: 
   ```
   default boolean replace(K key, V oldValue, V newValue) {
   Object curValue = get(key);
   if (!Objects.equals(curValue, oldValue) ||
   (curValue == null && !containsKey(key))) {
   return false;
   }
   put(key, newValue);
   return true;
   }
   ```
   Since it internally calls put which we are already testing, so that's why I 
thought it was not necessary to test these functions. Likewise for `remove()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13555: MINOR: Move `ControllerPurgatory` to `server-common`

2023-04-20 Thread via GitHub


jsancio commented on code in PR #13555:
URL: https://github.com/apache/kafka/pull/13555#discussion_r1172839235


##
server-common/src/main/java/org/apache/kafka/deferred/DeferredEvent.java:
##
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.controller;
+package org.apache.kafka.deferred;
 
 /**
  * Represents a deferred event in the controller purgatory.

Review Comment:
   Let's fix this comment. Maybe:
   > Represents a deferred event in the {@code DeferredEventQueue}.



##
server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java:
##
@@ -77,13 +76,13 @@ void failAll(Exception exception) {
  * @param offsetThe offset to add the new event at.

Review Comment:
   Above we have:
   >  Add a new purgatory event.
   
   how about:
   > Add a new deferred event to be completed by the provided offset.



##
server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java:
##
@@ -77,13 +76,13 @@ void failAll(Exception exception) {
  * @param offsetThe offset to add the new event at.
  * @param event The new event.
  */
-void add(long offset, DeferredEvent event) {
+public void add(long offset, DeferredEvent event) {
 if (!pending.isEmpty()) {
 long lastKey = pending.lastKey();
 if (offset < lastKey) {
 throw new RuntimeException("There is already a purgatory event 
with " +

Review Comment:
   How about throwing an `IllegalArgumentException` instead of 
`RuntimeException` with the following message:
   > There is already a deferred event with offset ...



##
server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java:
##
@@ -60,7 +59,7 @@ void completeUpTo(long offset) {
  *

Review Comment:
   The line above has
   >  Fail all the pending purgatory entries.
   
   how about:
   > Fail all deferred events with the provided exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tinaselenge commented on pull request #13459: KAFKA-14592: Move FeatureCommand to tools

2023-04-20 Thread via GitHub


tinaselenge commented on PR #13459:
URL: https://github.com/apache/kafka/pull/13459#issuecomment-1516631750

   @showuon Thank you so much. I have addressed the comments but left couple of 
them to clarify first. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13579: KAFKA-14904: Pending state blocked verification of transactions

2023-04-20 Thread via GitHub


jolshan commented on PR #13579:
URL: https://github.com/apache/kafka/pull/13579#issuecomment-1516624342

   Will also cherry-pick to 3.5


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan merged pull request #13579: KAFKA-14904: Pending state blocked verification of transactions

2023-04-20 Thread via GitHub


jolshan merged PR #13579:
URL: https://github.com/apache/kafka/pull/13579


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13579: KAFKA-14904: Pending state blocked verification of transactions

2023-04-20 Thread via GitHub


jolshan commented on PR #13579:
URL: https://github.com/apache/kafka/pull/13579#issuecomment-1516620151

   Many many connect failures still 😔 
   Failures appear to be unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tinaselenge commented on a diff in pull request #13459: KAFKA-14592: Move FeatureCommand to tools

2023-04-20 Thread via GitHub


tinaselenge commented on code in PR #13459:
URL: https://github.com/apache/kafka/pull/13459#discussion_r1172832601


##
tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java:
##
@@ -307,25 +294,24 @@ private static void update(String op, Admin admin, 
Map up
 }
 });
 
-AtomicInteger numFailures = new AtomicInteger();
-errors.keySet().forEach(feature -> {
-short level = updates.get(feature).maxVersionLevel();
-Optional maybeThrowable = errors.get(feature);
+int numFailures = 0;
+for (Map.Entry> entry : errors.entrySet()) 
{

Review Comment:
   Spotbug is not happy when using keySet iterator and doing 
error.get(feature). And because we are not iterating on keyset, to traverse 
with sorted keys, we have to use TreeSet? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-20 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1172824513


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+members.put(consumerA, new AssignmentMemberSpec(
+Optional.empty(),
+   Optional.empty(),
+  Collections.emptyList(),
+  Collections.emptyMap()));

Review Comment:
   I had changed it in my IDE and it looked good, idky the formatting changed 
in the PR :( Will take a look at it thanks!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13555: MINOR: Move `ControllerPurgatory` to `server-common`

2023-04-20 Thread via GitHub


jsancio commented on code in PR #13555:
URL: https://github.com/apache/kafka/pull/13555#discussion_r1172793737


##
server-common/src/main/java/org/apache/kafka/purgatory/DeferredEventPurgatory.java:
##
@@ -26,10 +26,9 @@
 
 /**
  * The purgatory which holds events that have been started, but not yet 
completed.

Review Comment:
   This is Merriam Webster definition:
   > 1 : an intermediate state after death for expiatory purification 
specifically : a place or state of punishment wherein according to Roman 
Catholic doctrine the souls of those who die in God's grace may make 
satisfaction for past sins and so become fit for heaven
   > 2 : a place or state of temporary suffering or misery
   
   I don't think either of those definition is accurate for this problem and 
solution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-20 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1172772709


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   @urbandan Before https://issues.apache.org/jira/browse/KAFKA-13659 and 
https://issues.apache.org/jira/browse/KAFKA-12468 this logic did not have 
monotonicity guarantees, but that does not mean that behavior was acceptable to 
users. These tickets were opened and voted on by multiple users before we took 
action.
   
   If we eliminate the in-memory deduplication and the forced read-to-end in 
this PR, we will re-introduce behavior that we have already fixed on trunk. I 
do not think that is acceptable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures

2023-04-20 Thread via GitHub


rondagostino commented on code in PR #13437:
URL: https://github.com/apache/kafka/pull/13437#discussion_r1172661731


##
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableNavigableSet.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import 
org.apache.kafka.server.immutable.pcollections.PCollectionsImmutableNavigableSet;
+
+import java.util.NavigableSet;
+
+/**
+ * A persistent Tree-based NavigableSet wrapper
+ * java.util.Set methods that mutate in-place will throw 
UnsupportedOperationException
+ *
+ * @param  the element type
+ */
+public interface ImmutableNavigableSet extends ImmutableSet, 
NavigableSet {
+/**
+ * @return a wrapped tree-based persistent navigable set that is empty
+ * @param  the element type
+ */
+static > ImmutableNavigableSet empty() {
+return PCollectionsImmutableNavigableSet.empty();
+}
+
+/**
+ * @param e the element
+ * @return a wrapped hash-based persistent set that is empty
+ * @param  the element type
+ */
+static > ImmutableNavigableSet 
singleton(E e) {
+return PCollectionsImmutableNavigableSet.singleton(e);
+}
+
+/**
+ * @param e the element
+ * @return a wrapped persistent sorted set that differs from this one in 
that the given element is added (if necessary)
+ */
+ImmutableNavigableSet added(E e);
+
+/**
+ * @param e the element
+ * @return a wrapped persistent sorted set that differs from this one in 
that the given element is added (if necessary)

Review Comment:
   s/sorted/navigable/



##
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableNavigableSet.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import 
org.apache.kafka.server.immutable.pcollections.PCollectionsImmutableNavigableSet;
+
+import java.util.NavigableSet;
+
+/**
+ * A persistent Tree-based NavigableSet wrapper
+ * java.util.Set methods that mutate in-place will throw 
UnsupportedOperationException
+ *
+ * @param  the element type
+ */
+public interface ImmutableNavigableSet extends ImmutableSet, 
NavigableSet {
+/**
+ * @return a wrapped tree-based persistent navigable set that is empty
+ * @param  the element type
+ */
+static > ImmutableNavigableSet empty() {
+return PCollectionsImmutableNavigableSet.empty();
+}
+
+/**
+ * @param e the element
+ * @return a wrapped hash-based persistent set that is empty

Review Comment:
   Looks like copy/paste error -- not hash-based.



##
server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableNavigableSetTest.java:
##
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS

[GitHub] [kafka] dajac commented on a diff in pull request #13555: MINOR: Move `ControllerPurgatory` to `server-common`

2023-04-20 Thread via GitHub


dajac commented on code in PR #13555:
URL: https://github.com/apache/kafka/pull/13555#discussion_r1172738445


##
server-common/src/main/java/org/apache/kafka/purgatory/DeferredEventPurgatory.java:
##
@@ -26,10 +26,9 @@
 
 /**
  * The purgatory which holds events that have been started, but not yet 
completed.
- * We wait for the high water mark of the metadata log to advance before 
completing
- * them.
+ * We wait for the high watermark of the log to advance before completing them.
  */
-class ControllerPurgatory {
+public class DeferredEventPurgatory {

Review Comment:
   Naming things is hard :). `Completion` sounds a bit weird here. I can't 
really think of a better name...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13555: MINOR: Move `ControllerPurgatory` to `server-common`

2023-04-20 Thread via GitHub


dajac commented on code in PR #13555:
URL: https://github.com/apache/kafka/pull/13555#discussion_r1172735940


##
server-common/src/main/java/org/apache/kafka/purgatory/DeferredEventPurgatory.java:
##
@@ -26,10 +26,9 @@
 
 /**
  * The purgatory which holds events that have been started, but not yet 
completed.

Review Comment:
   Why do you think that it is not accurate? For me, a purgatory is a data 
structure holding something which is completed later. That seems to fit here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13621: KAFKA-14133: Migrate ChangeLogReader mock in TaskManagerTest to Mockito

2023-04-20 Thread via GitHub


clolov commented on PR #13621:
URL: https://github.com/apache/kafka/pull/13621#issuecomment-1516499410

   Heya @cadonna! I hope this attempt is what you had in mind? Unless I am 
wrong the removals detailed in the pull request are sensible as Mockito should 
be returning empty collections for method invocations on mocks based on "By 
default, for all methods that return a value, a mock will return either null, a 
primitive/primitive wrapper value, or an empty collection, as appropriate" 
(source: 
https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#stubbing)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov opened a new pull request, #13621: KAFKA-14133: Migrate ChangeLogReader mock in TaskManagerTest to Mockito

2023-04-20 Thread via GitHub


clolov opened a new pull request, #13621:
URL: https://github.com/apache/kafka/pull/13621

   This pull request takes a similar approach as the one outlined in 
https://github.com/apache/kafka/pull/13529 to move each mock separately to ease 
reviewing the code.
   
   Once https://github.com/apache/kafka/pull/13529 is merged this pull request 
will be rebased on top (as some of the changes are present in both pull 
requests to make the tests pass).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13555: MINOR: Move `ControllerPurgatory` to `server-common`

2023-04-20 Thread via GitHub


jsancio commented on code in PR #13555:
URL: https://github.com/apache/kafka/pull/13555#discussion_r1172724317


##
server-common/src/main/java/org/apache/kafka/purgatory/DeferredEventPurgatory.java:
##
@@ -26,10 +26,9 @@
 
 /**
  * The purgatory which holds events that have been started, but not yet 
completed.

Review Comment:
   Minor but can we avoid the use of the word purgatory? It is not accurate for 
what this type and package are doing.



##
server-common/src/main/java/org/apache/kafka/purgatory/DeferredEventPurgatory.java:
##
@@ -26,10 +26,9 @@
 
 /**
  * The purgatory which holds events that have been started, but not yet 
completed.
- * We wait for the high water mark of the metadata log to advance before 
completing
- * them.
+ * We wait for the high watermark of the log to advance before completing them.
  */
-class ControllerPurgatory {
+public class DeferredEventPurgatory {

Review Comment:
   Maybe a more accurate name could be `OffsetBasedCompletion`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on pull request #13603: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…

2023-04-20 Thread via GitHub


jeffkbkim commented on PR #13603:
URL: https://github.com/apache/kafka/pull/13603#issuecomment-1516453173

   @dajac 
   ```
   org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest. 
shouldWorkWithRebalance
   kafka.api.ConsumerBounceTest.testSubscribeWhenTopicUnavailable()
   ```
   passed locally


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-04-20 Thread via GitHub


mumrah commented on PR #13461:
URL: https://github.com/apache/kafka/pull/13461#issuecomment-1516441727

   > Should we wrap the calls to quotaEntityConsumer.accept (and other 
consumers) so that if it throws an exception, we log an ERROR message to log4j 
to let us know what failed?
   
   Yes, that's a good idea. I'll add this to all the externalized calls 
(consumers/visitors). 
   
   > Can we have a ZkConfigMigrationClientTest and so on, for the other new 
migrationclient classes?
   
   For sure. I'll work on these next.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on pull request #13604: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…

2023-04-20 Thread via GitHub


jeffkbkim commented on PR #13604:
URL: https://github.com/apache/kafka/pull/13604#issuecomment-1516438325

   @dajac i see Selector test failures that pass locally. Can we re-trigger the 
build?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13555: MINOR: Move `ControllerPurgatory` to `server-common`

2023-04-20 Thread via GitHub


dajac commented on code in PR #13555:
URL: https://github.com/apache/kafka/pull/13555#discussion_r1172674549


##
server-common/src/main/java/org/apache/kafka/purgatory/Purgatory.java:
##
@@ -26,10 +26,9 @@
 
 /**
  * The purgatory which holds events that have been started, but not yet 
completed.
- * We wait for the high water mark of the metadata log to advance before 
completing
- * them.
+ * We wait for the high watermark of the log to advance before completing them.
  */
-class ControllerPurgatory {
+public class Purgatory {

Review Comment:
   +1 for `DeferredEventPurgatory`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14923) Upgrade io.netty_netty-codec for CVE-2022-41881

2023-04-20 Thread Vikash Mishra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714612#comment-17714612
 ] 

Vikash Mishra commented on KAFKA-14923:
---

[~yash.mayya] Awesome, thanks for the details. Not sure if there is a patch 
planned for 3.4.0 but it's good to know that at least 3.5.0 will have fixes.

> Upgrade io.netty_netty-codec for CVE-2022-41881
> ---
>
> Key: KAFKA-14923
> URL: https://issues.apache.org/jira/browse/KAFKA-14923
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Vikash Mishra
>Priority: Critical
>
> Currently used io.netty_netty-codec version 4.1.78 has high severity CVE: 
> [NVD - CVE-2022-41881 
> (nist.gov)|https://nvd.nist.gov/vuln/detail/CVE-2022-41881]
> Fix was patched in version 4.1.86.Final. As we have higher stable version 
> 4.1.91.Final available we should upgrade to same to fix mentioned CVE.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-04-20 Thread via GitHub


mumrah commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1172646961


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -174,12 +173,12 @@ private boolean areZkBrokersReadyForMigration() {
 /**
  * Apply a function which transforms our internal migration state.
  *
- * @param name  A descriptive name of the function that is being applied
- * @param stateMutator  A function which performs some migration 
operations and possibly transforms our internal state
+ * @param name A descriptive name of the function that is being 
applied
+ * @param migrationOp  A function which performs some migration operations 
and possibly transforms our internal state
  */
-private void apply(String name, Function stateMutator) {
+private void applyMigrationOperation(String name, KRaftMigrationOperation 
migrationOp) {
 ZkMigrationLeadershipState beforeState = this.migrationLeadershipState;
-ZkMigrationLeadershipState afterState = 
stateMutator.apply(beforeState);
+ZkMigrationLeadershipState afterState = migrationOp.apply(beforeState);
 log.trace("{} transitioned from {} to {}", name, beforeState, 
afterState);

Review Comment:
   I'll add this to the current PR. Shouldn't be too hard



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata

2023-04-20 Thread via GitHub


dajac commented on code in PR #13537:
URL: https://github.com/apache/kafka/pull/13537#discussion_r1172621475


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ClientAssignor.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * An immutable representation of a client side assignor within a consumer 
group member.
+ */
+public class ClientAssignor {
+/**
+ * The name of the assignor.
+ */
+private final String name;
+
+/**
+ * The reason reported by the assignor.
+ */
+private final byte reason;
+
+/**
+ * The minimum metadata version supported by the assignor.
+ */
+private final short minimumVersion;
+
+/**
+ * The maximum metadata version supported by the assignor.
+ */
+private final short maximumVersion;
+
+/**
+ * The versioned metadata.
+ */
+private final VersionedMetadata metadata;

Review Comment:
   ```
   /**
* The error assigned to the member.
*/
   private final byte error;
   ```
   
   you mean more than this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-20 Thread via GitHub


mumrah commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1172621439


##
metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java:
##
@@ -115,6 +115,9 @@ private void publishDelta(MetadataDelta delta) {
 }
 }
 changes.apply(metrics);
+if (delta.featuresDelta() != null) {

Review Comment:
   I wonder if we should only set the zk state metric if we're on a 
metadata.version that supports it. Otherwise we could see clusters go from NONE 
-> MIGRATION -> POST_MIGRATION, which is a bit odd. (NONE is supposed to be a 
terminal state)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata

2023-04-20 Thread via GitHub


jeffkbkim commented on code in PR #13537:
URL: https://github.com/apache/kafka/pull/13537#discussion_r1172600639


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ClientAssignor.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * An immutable representation of a client side assignor within a consumer 
group member.
+ */
+public class ClientAssignor {
+/**
+ * The name of the assignor.
+ */
+private final String name;
+
+/**
+ * The reason reported by the assignor.
+ */
+private final byte reason;
+
+/**
+ * The minimum metadata version supported by the assignor.
+ */
+private final short minimumVersion;
+
+/**
+ * The maximum metadata version supported by the assignor.
+ */
+private final short maximumVersion;
+
+/**
+ * The versioned metadata.
+ */
+private final VersionedMetadata metadata;

Review Comment:
   that makes sense, and we added information inside VersionedMetadata. can we 
have a description for Assignment.error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #13555: MINOR: Move `ControllerPurgatory` to `server-common`

2023-04-20 Thread via GitHub


dengziming commented on code in PR #13555:
URL: https://github.com/apache/kafka/pull/13555#discussion_r1172598460


##
server-common/src/main/java/org/apache/kafka/purgatory/Purgatory.java:
##
@@ -26,10 +26,9 @@
 
 /**
  * The purgatory which holds events that have been started, but not yet 
completed.
- * We wait for the high water mark of the metadata log to advance before 
completing
- * them.
+ * We wait for the high watermark of the log to advance before completing them.
  */
-class ControllerPurgatory {
+public class Purgatory {

Review Comment:
   It seems better to add a prefix since we have several `Purgatory` classes, 
we have a `FuturePurgatory` in raft module and its similar to this 
`ControllerPurgatory`, we may call it `DeferredEventPurgatory` or 
HwmPurgatory(high watermark).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14922) kafka-streams-application-reset deletes topics not belonging to specified application-id

2023-04-20 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-14922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714597#comment-17714597
 ] 

Jørgen commented on KAFKA-14922:


An _improvement_ would be to only return topics that exactly contains the 
applicationId provided.

This would fix the case where an applicationId doesn't exist instead of 
returning all topics that starts with provided applicationId (like "f" in the 
example).

Would not cover the case where other applicationIds starts with applicationId 
provided (foo-v1 would delete foo-v1-2 topics, etc)

> kafka-streams-application-reset deletes topics not belonging to specified 
> application-id
> 
>
> Key: KAFKA-14922
> URL: https://issues.apache.org/jira/browse/KAFKA-14922
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 3.4.0
>Reporter: Jørgen
>Priority: Major
>
> Slack-thread: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1681908267206849]
> When running the command _kafka-streams-application-reset --bootstrap-servers 
> $BOOTSTRAP --application-id foo_ all internal topics that _starts with_ foo 
> is deleted. This happens even if there's no application-id named foo.
> Example:
> {code:java}
> Application IDs:
> foo-v1
> foo-v2
> Internal topics:
> foo-v1-repartition-topic-repartition
> foo-v2-repartition-topic-repartition 
> Application reset:
> kafka-streams-application-reset --bootstrap-servers $BOOTSTRAP 
> --application-id foo
> > No input or intermediate topics specified. Skipping seek.
> Deleting inferred internal topics [foo-v2-repartition-topic-repartition, 
> foo-v1-repartition-topic-repartition]
> Done.{code}
> Expected behaviour is that the command fails as there are no application-id's 
> with the name foo instead of deleting all foo* topics. 
> This is critical on typos or if application-ids starts with the same name as 
> others (for example if we had foo-v21 and wanted to reset foo-v2)
> The bug should be located here: 
> [https://github.com/apache/kafka/blob/c14f56b48461f01743146d58987bc8661ba0d459/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java#L693]
> Should check that the topics matches the application-id exactly instead of 
> checking that it starts with the application-id.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata

2023-04-20 Thread via GitHub


dajac commented on code in PR #13537:
URL: https://github.com/apache/kafka/pull/13537#discussion_r1172597532


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ClientAssignor.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * An immutable representation of a client side assignor within a consumer 
group member.
+ */
+public class ClientAssignor {
+/**
+ * The name of the assignor.
+ */
+private final String name;
+
+/**
+ * The reason reported by the assignor.
+ */
+private final byte reason;
+
+/**
+ * The minimum metadata version supported by the assignor.
+ */
+private final short minimumVersion;
+
+/**
+ * The maximum metadata version supported by the assignor.
+ */
+private final short maximumVersion;
+
+/**
+ * The versioned metadata.
+ */
+private final VersionedMetadata metadata;

Review Comment:
   I am not sure to understand what you would like to see. `ClientAssignor` is 
all about the client assignor so everything in this class is about this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tinaselenge commented on a diff in pull request #13459: KAFKA-14592: Move FeatureCommand to tools

2023-04-20 Thread via GitHub


tinaselenge commented on code in PR #13459:
URL: https://github.com/apache/kafka/pull/13459#discussion_r1172577875


##
tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java:
##
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static java.lang.String.format;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+
+import static 
org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.SAFE_DOWNGRADE;
+import static 
org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.KRAFT)
+@Tag("integration")
+public class FeatureCommandTest {
+
+private final ClusterInstance cluster;
+public FeatureCommandTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
+public void testDescribeWithZK() {
+String commandOutput = ToolsTestUtils.captureStandardOut(() ->
+assertEquals(0, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), 
"describe"))
+);
+assertEquals("", commandOutput);
+}
+
+@ClusterTest(clusterType = Type.KRAFT, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
+public void testDescribeWithKRaft() {
+String commandOutput = ToolsTestUtils.captureStandardOut(() ->
+assertEquals(0, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), 
"describe"))
+);
+assertEquals("Feature: metadata.version\tSupportedMinVersion: 
3.0-IV1\t" +
+"SupportedMaxVersion: 3.5-IV1\tFinalizedVersionLevel: 
3.3-IV1\t", outputWithoutEpoch(commandOutput));
+}
+
+@ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
+public void testUpgradeMetadataVersionWithZk() {
+String commandOutput = ToolsTestUtils.captureStandardOut(() ->
+assertEquals(1, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
+"upgrade", "--metadata", "3.3-IV2"))
+);
+assertEquals("Could not upgrade metadata.version to 6. Could not apply 
finalized feature " +
+"update because the provided feature is not supported.", 
commandOutput);
+}
+
+@ClusterTest(clusterType = Type.KRAFT, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
+public void testUpgradeMetadataVersionWithKraft() {
+String commandOutput = ToolsTestUtils.captureStandardOut(() ->
+assertEquals(0, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
+"upgrade", "--feature", "metadata.version=5"))
+);
+assertEquals("metadata.version was upgraded to 5.", commandOutput);
+
+commandOutput = ToolsTestUtils.captureStandardOut(() ->
+assertEquals(0, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
+"upgrade", "--metadata", "3.3-IV2"))
+);
+assertEquals("metadata.version was upgraded to 6.", commandOutput);
+}
+
+@ClusterTest(clusterType = Type.ZK, metadataVersion = 
Metad

[jira] [Assigned] (KAFKA-14877) refactor InMemoryLeaderEpochCheckpoint

2023-04-20 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya reassigned KAFKA-14877:


Assignee: Divij Vaidya

> refactor InMemoryLeaderEpochCheckpoint
> --
>
> Key: KAFKA-14877
> URL: https://issues.apache.org/jira/browse/KAFKA-14877
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Divij Vaidya
>Priority: Major
>
> follow up with this comment: 
> https://github.com/apache/kafka/pull/13456#discussion_r1154306477



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14877) refactor InMemoryLeaderEpochCheckpoint

2023-04-20 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714594#comment-17714594
 ] 

Divij Vaidya commented on KAFKA-14877:
--

I will pick this up if there is no objection by anyone. 

> refactor InMemoryLeaderEpochCheckpoint
> --
>
> Key: KAFKA-14877
> URL: https://issues.apache.org/jira/browse/KAFKA-14877
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Divij Vaidya
>Priority: Major
>
> follow up with this comment: 
> https://github.com/apache/kafka/pull/13456#discussion_r1154306477



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >