Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-31 Thread via GitHub


dajac commented on code in PR #16120:
URL: https://github.com/apache/kafka/pull/16120#discussion_r1621844225


##
metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java:
##
@@ -74,7 +74,7 @@ public void testDefaultFeatureMapWithUnstable() {
 for (Features feature : Features.PRODUCTION_FEATURES) {
 expectedFeatures.put(feature.featureName(), VersionRange.of(
 0,
-feature.defaultValue(MetadataVersion.LATEST_PRODUCTION)
+feature.defaultValue(MetadataVersion.latestTesting())

Review Comment:
   Indeed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-31 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1621843719


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -71,63 +69,54 @@ public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignment
  */
 private final Set subscribedTopicIds;
 
-/**
- * The number of members to receive an extra partition beyond the minimum 
quota.
- * Minimum Quota = Total Partitions / Total Members
- * Example: If there are 11 partitions to be distributed among 3 members,
- *  each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 
3) members get an extra partition.
- */
-private int remainingMembersToGetAnExtraPartition;
-
 /**
  * Members mapped to the remaining number of partitions needed to meet the 
minimum quota.
- * Minimum quota = total partitions / total members.
  */
-private Map potentiallyUnfilledMembers;
+private final List potentiallyUnfilledMembers;

Review Comment:
   There is a set number of extra partitions that we can give out once every 
member gets an equal number of partitions. For exmaple if there were 5 
partitions and 3 members. Each member gets 1 each and then we have 2 extras. 
Let's say A,B,C have all received 1 partition, they are all potentially 
unfilled. Once A and B get their extra partitions, C isn't unfilled and doesn't 
receive any partitions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-31 Thread via GitHub


dajac commented on code in PR #16120:
URL: https://github.com/apache/kafka/pull/16120#discussion_r1621843638


##
core/src/test/java/kafka/test/ClusterInstance.java:
##
@@ -159,9 +158,7 @@ default Set supportedGroupProtocols() {
 Set supportedGroupProtocols = new HashSet<>();
 supportedGroupProtocols.add(CLASSIC);
 
-// KafkaConfig#isNewGroupCoordinatorEnabled check both 
NEW_GROUP_COORDINATOR_ENABLE_CONFIG and 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG
-if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
"").equals("true") ||

Review Comment:
   We will eventually remove it for sure. This (private) config is still the 
one use to determine whether the new group coordinator should be enabled or 
not. The trick to know is that it is automatically set to true when `consumer` 
is specified as a protocol. We made this to simplify the early access and the 
preview.
   
   The handling was a bit messy before this patch as the new protocol was 
enabled whenever the new coordinator ran. It is better now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-05-31 Thread via GitHub


rreddy-22 commented on PR #16068:
URL: https://github.com/apache/kafka/pull/16068#issuecomment-2141348418

   There are two tests failing in GroupMetadataManagerTest, will look into it 
tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-31 Thread via GitHub


ableegoldman commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1621827532


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));
+} catch (final StreamsException e) {

Review Comment:
   Good point -- I agree with Bruno, the catch block should be for just 
ConfigException or StreamsException



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling fetches from remote storage [kafka]

2024-05-30 Thread via GitHub


abhijeetk88 commented on code in PR #16071:
URL: https://github.com/apache/kafka/pull/16071#discussion_r1621675519


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -6597,6 +6597,79 @@ class ReplicaManagerTest {
   ))
 }
   }
+
+  @Test
+  def testRemoteReadQuotaExceeded(): Unit = {
+when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(true)
+
+val tp0 = new TopicPartition(topic, 0)
+val tpId0 = new TopicIdPartition(topicId, tp0)
+val fetch: Seq[(TopicIdPartition, LogReadResult)] = 
readFromLogWithOffsetOutOfRange(tp0)
+
+assertEquals(1, fetch.size)
+assertEquals(tpId0, fetch.head._1)
+val fetchInfo = fetch.head._2.info
+assertEquals(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, 
fetchInfo.fetchOffsetMetadata)
+assertFalse(fetchInfo.records.records().iterator().hasNext)
+assertFalse(fetchInfo.firstEntryIncomplete)
+assertFalse(fetchInfo.abortedTransactions.isPresent)
+assertFalse(fetchInfo.delayedRemoteStorageFetch.isPresent)
+  }
+
+  @Test
+  def testRemoteReadQuotaNotExceeded(): Unit = {
+when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(false)
+
+val tp0 = new TopicPartition(topic, 0)
+val tpId0 = new TopicIdPartition(topicId, tp0)
+val fetch: Seq[(TopicIdPartition, LogReadResult)] = 
readFromLogWithOffsetOutOfRange(tp0)
+
+assertEquals(1, fetch.size)
+assertEquals(tpId0, fetch.head._1)
+val fetchInfo = fetch.head._2.info
+assertEquals(1L, fetchInfo.fetchOffsetMetadata.messageOffset)
+assertEquals(UnifiedLog.UnknownOffset, 
fetchInfo.fetchOffsetMetadata.segmentBaseOffset)
+assertEquals(-1, fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
+assertEquals(MemoryRecords.EMPTY, fetchInfo.records)
+assertTrue(fetchInfo.delayedRemoteStorageFetch.isPresent)
+  }
+
+  private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): 
Seq[(TopicIdPartition, LogReadResult)] = {
+val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true)
+try {
+  val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+  replicaManager.createPartition(tp).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, None)
+  val partition0Replicas = Seq[Integer](0, 1).asJava
+  val topicIds = Map(tp.topic -> topicId).asJava
+  val leaderEpoch = 0
+  val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+Seq(
+  new LeaderAndIsrPartitionState()
+.setTopicName(tp.topic)
+.setPartitionIndex(tp.partition)
+.setControllerEpoch(0)
+.setLeader(leaderEpoch)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-30 Thread via GitHub


abhijeetk88 commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2141192632

   Thanks @chia7712 @jolshan . Apologies for the miss.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] ignore [kafka]

2024-05-30 Thread via GitHub


github-actions[bot] commented on PR #15355:
URL: https://github.com/apache/kafka/pull/15355#issuecomment-2141172395

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-30 Thread via GitHub


gongxuanzhang commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1621586240


##
checkstyle/suppressions.xml:
##
@@ -361,4 +361,7 @@
 
 
+
+

Review Comment:
   I think checkstyle should consistent with auto format . If you open the A 
module auto format, we should open the module check rule.
   `ImportOrder` rule can't custom in each module(It's going to take a lot of 
changes,maybe should add `build.gradle` every module).
   So i add this line in order that open rule some module in future,This is 
what I think is a more convenient way to modify by module



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-30 Thread via GitHub


gongxuanzhang commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1621586240


##
checkstyle/suppressions.xml:
##
@@ -361,4 +361,7 @@
 
 
+
+

Review Comment:
   I think checkstyle should consistent with auto format . If you open the A 
module auto format, we should open the module check rule.
   `ImportOrder` rule can't custom in each module(It's going to take a lot of 
changes,maybe should add build.gradle every module).
   So i add this line in order that open rule some module in future,This is 
what I think is a more convenient way to modify by module



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-30 Thread via GitHub


gongxuanzhang commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1621614857


##
build.gradle:
##
@@ -787,6 +800,12 @@ subprojects {
 skipProjects = [ ":jmh-benchmarks", ":trogdor" ]
 skipConfigurations = [ "zinc" ]
   }
+
+  afterEvaluate {

Review Comment:
   I change the PR,please review it @chia7712 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-05-30 Thread via GitHub


rreddy-22 commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1621587957


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java:
##
@@ -585,34 +586,34 @@ public void 
testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith
 ));
 
 // Initial subscriptions were [T1, T2]
-Map members = new HashMap<>();
+Map members = new TreeMap<>();
+Map>> assignedPartitions = new 
HashMap<>();
 
 Map> currentAssignmentForA = mkAssignment(
 mkTopicAssignment(topic1Uuid, 0, 2),
 mkTopicAssignment(topic2Uuid, 1, 3)
 );
-members.put(memberA, new AssignmentMemberSpec(
+assignedPartitions.put(memberA, currentAssignmentForA);
+members.put(memberA, new MemberSubscriptionSpecImpl(
 Optional.empty(),
-Optional.empty(),
-Collections.singleton(topic1Uuid),
-currentAssignmentForA
+Collections.singleton(topic1Uuid)
 ));
 
 Map> currentAssignmentForB = mkAssignment(
 mkTopicAssignment(topic1Uuid, 1),
 mkTopicAssignment(topic2Uuid, 0, 2, 4)
 );
-members.put(memberB, new AssignmentMemberSpec(
-Optional.empty(),
+assignedPartitions.put(memberB, currentAssignmentForB);
+members.put(memberB, new MemberSubscriptionSpecImpl(
 Optional.empty(),
-mkSet(topic1Uuid, topic2Uuid),
-currentAssignmentForB
+new HashSet<>(Arrays.asList(topic1Uuid, topic2Uuid))
 ));
 
 GroupSpec groupSpec = new GroupSpecImpl(
 members,
 HETEROGENEOUS,
-invertedTargetAssignment(members)
+assignedPartitions,
+invertedTargetAssignment(assignedPartitions, members)
 );
 SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
 

Review Comment:
   yep sounds good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-30 Thread via GitHub


gongxuanzhang commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1621586240


##
checkstyle/suppressions.xml:
##
@@ -361,4 +361,7 @@
 
 
+
+

Review Comment:
   I think checkstyle should consistent with auto format . If you open the A 
module auto format, we should open the module check rule.
   So i add this line in order that open rule some module in future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-30 Thread via GitHub


gongxuanzhang commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1621584578


##
build.gradle:
##
@@ -47,7 +47,7 @@ plugins {
   // Updating the shadow plugin version to 8.1.1 causes issue with signing and 
publishing the shadowed
   // artifacts - see https://github.com/johnrengelman/shadow/issues/901
   id 'com.github.johnrengelman.shadow' version '8.1.0' apply false
-  id 'com.diffplug.spotless' version '6.14.0' apply false // 6.14.1 and newer 
require Java 11 at compile time, so we can't upgrade until AK 4.0
+  id 'com.diffplug.spotless' version "${spotlessVersion}" apply false

Review Comment:
   i write in `gradle.properties` comment, because after 6.13.0 require Java 11 
, we should compatibility java 8
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-30 Thread via GitHub


satishd commented on PR #15820:
URL: https://github.com/apache/kafka/pull/15820#issuecomment-2141069773

   @abhijeetk88 Can you resolve the conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16866 RemoteLogManagerTest.testCopyQuotaManagerConfig failing [kafka]

2024-05-30 Thread via GitHub


satishd merged PR #16146:
URL: https://github.com/apache/kafka/pull/16146


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16866 RemoteLogManagerTest.testCopyQuotaManagerConfig failing [kafka]

2024-05-30 Thread via GitHub


satishd commented on PR #16146:
URL: https://github.com/apache/kafka/pull/16146#issuecomment-2141065172

   A few failing tests are unrelated to the change. Merging this change to the 
trunk to unblock the test failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1621535872


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, 
Map 
partitionsWithUpdatedLeaderInfo = new HashMap<>();
 produceResponse.data().responses().forEach(r -> 
r.partitionResponses().forEach(p -> {
-TopicPartition tp = new TopicPartition(r.name(), 
p.index());
+// Version 12 drop topic name and add support to topic id. 
However, metadata can be used to map topic id to topic name.
+String topicName = (r.name() == null || 
r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name();

Review Comment:
   Yes. For the fetch request for example, there is code to make sure that all 
topics have IDs before we can send the fetch request. This is a bit less of an 
issue now, but if we have a cluster that is running on a MV < 2.8, topics will 
not have IDs. So when we decide which version of produce we want to send, we 
want to be aware of this.
   
   Not only that, but even if the broker supports topic IDs on all topics, we 
also may have a case where we need to do a rolling upgrade to get the code that 
supports handling the latest API version. This may be less complicated for 
Produce since it is a client only API and doesn't rely on MV/IBP, so the 
apiVersions exchange between the client and the broker may be enough to ensure 
api compatibility. 
   
   We just want to confirm these upgrade paths are compatible since produce is 
the hot path and we don't want any (or at least not extended) downtime in the 
middle of an upgrade.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1621535872


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, 
Map 
partitionsWithUpdatedLeaderInfo = new HashMap<>();
 produceResponse.data().responses().forEach(r -> 
r.partitionResponses().forEach(p -> {
-TopicPartition tp = new TopicPartition(r.name(), 
p.index());
+// Version 12 drop topic name and add support to topic id. 
However, metadata can be used to map topic id to topic name.
+String topicName = (r.name() == null || 
r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name();

Review Comment:
   Yes. For the fetch request for example, there is code to make sure that all 
topics have IDs before we can send the fetch request. This is a bit less of an 
issue now, but if we have a cluster that is running on a MV < 2.8, not all 
topics will have IDs. So when we decide which version of produce we want to 
send, we want to be aware of this.
   
   Not only that, but even if the broker supports topic IDs on all topics, we 
also may have a case where we need to do a rolling upgrade to get the code that 
supports handling the latest API version. This may be less complicated for 
Produce since it is a client only API and doesn't rely on MV/IBP, so the 
apiVersions exchange between the client and the broker may be enough to ensure 
api compatibility. 
   
   We just want to confirm these upgrade paths are compatible since produce is 
the hot path and we don't want any (or at least not extended) downtime in the 
middle of an upgrade.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-30 Thread via GitHub


ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1621532246


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final 
Set topicPa
  */
 private static boolean canPerformRackAwareOptimization(final 
ApplicationState applicationState,
final 
AssignedTask.Type taskType) {
-final String rackAwareAssignmentStrategy = 
applicationState.assignmentConfigs().rackAwareAssignmentStrategy();
+final AssignmentConfigs assignmentConfigs = 
applicationState.assignmentConfigs();
+final String rackAwareAssignmentStrategy = 
assignmentConfigs.rackAwareAssignmentStrategy();
 if 
(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy))
 {
+LOG.warn("Rack aware task assignment optimization disabled: rack 
aware strategy was set to {}",
+rackAwareAssignmentStrategy);
+return false;
+}
+
+if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) {
+LOG.warn("Rack aware task assignment optimization unavailable: the 
traffic cost configuration was not set.");

Review Comment:
   We should log the exact config name since otherwise people won't necessarily 
know what this is referring to (especially since they already forgot to set 
this config). 
   
   ```suggestion
   LOG.warn("Rack aware task assignment optimization unavailable: 
must configure {}", StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final 
Set topicPa
  */
 private static boolean canPerformRackAwareOptimization(final 
ApplicationState applicationState,
final 
AssignedTask.Type taskType) {
-final String rackAwareAssignmentStrategy = 
applicationState.assignmentConfigs().rackAwareAssignmentStrategy();
+final AssignmentConfigs assignmentConfigs = 
applicationState.assignmentConfigs();
+final String rackAwareAssignmentStrategy = 
assignmentConfigs.rackAwareAssignmentStrategy();
 if 
(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy))
 {
+LOG.warn("Rack aware task assignment optimization disabled: rack 
aware strategy was set to {}",
+rackAwareAssignmentStrategy);
+return false;
+}
+
+if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) {
+LOG.warn("Rack aware task assignment optimization unavailable: the 
traffic cost configuration was not set.");
 return false;
 }
+
+if (!assignmentConfigs.rackAwareNonOverlapCost().isPresent()) {
+LOG.warn("Rack aware task assignment optimization unavailable: the 
non-overlap cost configuration was not set.");

Review Comment:
   ```suggestion
   LOG.warn("Rack aware task assignment optimization unavailable: 
must configure {}", 
StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##
@@ -40,8 +41,8 @@ public AssignmentConfigs(final StreamsConfig configs) {
 configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
 configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
+
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)),
+
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG)),

Review Comment:
   don't we need to check `if 
(assignorClassName.equals("org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor"))`
 and set these to the sticky assignor defaults if true?
   
   Where `assignorClassName` is equal to 
`streamsConfig.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG)` -- I guess 
maybe we do want the public  `AssignmentConfigs` constructor to take in the 
StreamsConfig after all?



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final 
Set topicPa
  */
 private static boolean canPerformRackAwareOptimization(final 
ApplicationState applicationState,
  

Re: [PR] MINOR: Add more unit tests to LogSegments [kafka]

2024-05-30 Thread via GitHub


showuon commented on code in PR #16085:
URL: https://github.com/apache/kafka/pull/16085#discussion_r1621524702


##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -4251,6 +4251,22 @@ class UnifiedLogTest {
 assertEquals(new LogOffsetMetadata(14, -1L, -1), 
log.maybeConvertToOffsetMetadata(14))
   }
 
+  @Test
+  def testGetFirstBatchTimestampForSegments(): Unit = {
+val log = createLog(logDir, LogTestUtils.createLogConfig())
+
+val segments: java.util.List[LogSegment] = new 
java.util.ArrayList[LogSegment]()
+val seg1 = LogTestUtils.createSegment(1, logDir, 10, Time.SYSTEM)
+val seg2 = LogTestUtils.createSegment(2, logDir, 10, Time.SYSTEM)

Review Comment:
   Forgot to close them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-05-30 Thread via GitHub


rreddy-22 commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1621518800


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java:
##
@@ -18,48 +18,60 @@
 
 import org.apache.kafka.common.Uuid;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
 
 /**
  * The assignment specification for a consumer group.
  */
 public class GroupSpecImpl implements GroupSpec {
 /**
- * The member metadata keyed by member Id.
+ * Member subscription metadata keyed by member Id.
  */
-private final Map members;
+private final Map memberSubscriptions;
 
 /**
- * The subscription type followed by the group.
+ * The subscription type of the group.
  */
 private final SubscriptionType subscriptionType;
 
+/**
+ * Partitions currently assigned to each member keyed by topicId.
+ */
+private final Map>> currentAssignment;

Review Comment:
   I used memberAssigment and invertedMemberAssignment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-05-30 Thread via GitHub


rreddy-22 commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1621508931


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##
@@ -39,4 +41,20 @@ public interface GroupSpec {
  * False, otherwise.
  */
 boolean isPartitionAssigned(Uuid topicId, int partitionId);
+
+/**
+ * Gets the member subscription specification for a member.
+ *
+ * @param memberId  The member Id.
+ * @return The member's subscription metadata.
+ */
+MemberSubscriptionSpec memberSubscriptionSpec(String memberId);

Review Comment:
   discussed offline, we want to throw an IllegalStateException when the 
memberId is not found



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16120:
URL: https://github.com/apache/kafka/pull/16120#discussion_r1621497961


##
core/src/test/java/kafka/test/ClusterInstance.java:
##
@@ -159,9 +158,7 @@ default Set supportedGroupProtocols() {
 Set supportedGroupProtocols = new HashSet<>();
 supportedGroupProtocols.add(CLASSIC);
 
-// KafkaConfig#isNewGroupCoordinatorEnabled check both 
NEW_GROUP_COORDINATOR_ENABLE_CONFIG and 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG
-if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
"").equals("true") ||

Review Comment:
   Mostly confused because we don't check for the config in kafka apis anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16120:
URL: https://github.com/apache/kafka/pull/16120#discussion_r1621496853


##
metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java:
##
@@ -74,7 +74,7 @@ public void testDefaultFeatureMapWithUnstable() {
 for (Features feature : Features.PRODUCTION_FEATURES) {
 expectedFeatures.put(feature.featureName(), VersionRange.of(
 0,
-feature.defaultValue(MetadataVersion.LATEST_PRODUCTION)
+feature.defaultValue(MetadataVersion.latestTesting())

Review Comment:
   Hmm was this just a bug in the test...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]

2024-05-30 Thread via GitHub


pasharik commented on code in PR #15830:
URL: https://github.com/apache/kafka/pull/15830#discussion_r1621488481


##
core/src/main/scala/kafka/admin/AclCommand.scala:
##
@@ -115,8 +115,6 @@ object AclCommand extends Logging {
   val aclBindings = acls.map(acl => new AclBinding(resource, 
acl)).asJavaCollection
   adminClient.createAcls(aclBindings).all().get()
 }
-
-listAcls(adminClient)

Review Comment:
   - I've moved KRaft tests into a new `AclCommandIntegrationTest.java`
   - Left old Zookeeper tests in `AclCommandTest.scala`. As I understand, we 
are going to completely delete this test file, once fully moved to KRaft, am I 
right? Do you think it's worth migrating those tests to java at this stage, if 
they are going to be deleted anyway?
   - Race condition described above, is still reproduced on a new 
infrastructure :cry: So if there are no objections, we can probably remove 
console output `Current ACLs`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16120:
URL: https://github.com/apache/kafka/pull/16120#discussion_r1621487824


##
core/src/test/java/kafka/test/ClusterInstance.java:
##
@@ -159,9 +158,7 @@ default Set supportedGroupProtocols() {
 Set supportedGroupProtocols = new HashSet<>();
 supportedGroupProtocols.add(CLASSIC);
 
-// KafkaConfig#isNewGroupCoordinatorEnabled check both 
NEW_GROUP_COORDINATOR_ENABLE_CONFIG and 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG
-if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
"").equals("true") ||

Review Comment:
   do we plan to remove this config value from GroupCoordinatorConfig? I see it 
was removed from a lot of files, but there are still a few where it is used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]

2024-05-30 Thread via GitHub


soarez commented on PR #15945:
URL: https://github.com/apache/kafka/pull/15945#issuecomment-2140953114

   There are some conflicts that need addressing, and the JDK 21 pipeline 
didn't run. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]

2024-05-30 Thread via GitHub


soarez commented on code in PR #15945:
URL: https://github.com/apache/kafka/pull/15945#discussion_r1621485662


##
metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Tracks the registration of a specific broker, and executes a callback if it 
should be refreshed.
+ *
+ * This tracker handles cases where we might want to re-register the broker. 
The only such case
+ * right now is during the transition from non-JBOD mode, to JBOD mode. In 
other words, the
+ * transition from a MetadataVersion less than 3.7-IV2, to one greater than or 
equal to 3.7-IV2.
+ * In this case, the broker registration will start out containing no 
directories, and we need to
+ * resend the BrokerRegistrationRequest to fix that.
+ *
+ * As much as possible, the goal here is to keep things simple. We just 
compare the desired state
+ * with the actual state, and try to make changes only if necessary.
+ */
+public class BrokerRegistrationTracker implements MetadataPublisher {
+private final Logger log;
+private final int id;
+private final Runnable refreshRegistrationCallback;
+
+/**
+ * Create the tracker.
+ *
+ * @param idThe ID of this broker.
+ * @param targetDirectories The directories managed by this 
broker.
+ * @param refreshRegistrationCallback   Callback to run if we need to 
refresh the registration.
+ */
+public BrokerRegistrationTracker(
+int id,
+List targetDirectories,
+Runnable refreshRegistrationCallback
+) {
+this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] 
").
+logger(BrokerRegistrationTracker.class);
+this.id = id;
+this.refreshRegistrationCallback = refreshRegistrationCallback;
+}
+
+@Override
+public String name() {
+return "BrokerRegistrationTracker(id=" + id + ")";
+}
+
+@Override
+public void onMetadataUpdate(
+MetadataDelta delta,
+MetadataImage newImage,
+LoaderManifest manifest
+) {
+boolean checkBrokerRegistration = false;
+if (delta.featuresDelta() != null) {
+if (delta.metadataVersionChanged().isPresent()) {
+if (log.isTraceEnabled()) {
+log.trace("Metadata version change is present: {}",
+delta.metadataVersionChanged());
+}
+checkBrokerRegistration = true;
+}
+}
+if (delta.clusterDelta() != null) {
+if (delta.clusterDelta().changedBrokers().get(id) != null) {
+if (log.isTraceEnabled()) {
+log.trace("Broker change is present: {}",
+delta.clusterDelta().changedBrokers().get(id));
+}
+checkBrokerRegistration = true;
+}
+}
+if (checkBrokerRegistration) {
+if 
(brokerRegistrationNeedsRefresh(newImage.features().metadataVersion(),
+delta.clusterDelta().broker(id))) {
+refreshRegistrationCallback.run();
+}
+}
+}
+
+/**
+ * Check if the current broker registration needs to be refreshed.
+ *
+ * @param registration  The current broker registration, or null if there 
is none.
+ * @return  True only if we should refresh.
+ */
+boolean brokerRegistrationNeedsRefresh(
+MetadataVersion metadataVersion,
+BrokerRegistration registration
+) {
+// If there is no existing registration, the BrokerLifecycleManager 
must still be sending it.
+// So we don't 

Re: [PR] KAFKA-15045: (KIP-924 pt. 15) Implement #defaultStandbyTaskAssignment and finish rack-aware standby optimization [kafka]

2024-05-30 Thread via GitHub


apourchet commented on code in PR #16129:
URL: https://github.com/apache/kafka/pull/16129#discussion_r1621479244


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -244,18 +271,112 @@ public static Map 
optimizeRackAwareStandbyTas
 );
 LOG.info("Assignment before standby task optimization has cost {}", 
initialCost);
 
-throw new UnsupportedOperationException("Not yet Implemented.");
+final MoveStandbyTaskPredicate moveablePredicate = 
getStandbyTaskMovePredicate(applicationState);
+final BiFunction> getMovableTasks = (source, destination) -> {
+return source.tasks().values().stream()
+.filter(task -> task.type() == AssignedTask.Type.STANDBY)
+.filter(task -> !destination.tasks().containsKey(task.id()))
+.filter(task -> {
+final KafkaStreamsState sourceState = 
kafkaStreamsStates.get(source.processId());
+final KafkaStreamsState destinationState = 
kafkaStreamsStates.get(source.processId());
+return moveablePredicate.canMoveStandbyTask(sourceState, 
destinationState, task.id(), kafkaStreamsAssignments);
+})
+.map(AssignedTask::id)
+.sorted()
+.collect(Collectors.toList());
+};
+
+final long startTime = System.currentTimeMillis();
+boolean taskMoved = true;
+int round = 0;
+final RackAwareGraphConstructor 
graphConstructor = RackAwareGraphConstructorFactory.create(
+
applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), taskIds);
+while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) {
+taskMoved = false;
+round++;
+for (int i = 0; i < kafkaStreamsAssignments.size(); i++) {
+final UUID clientId1 = clientIds.get(i);
+final KafkaStreamsAssignment clientState1 = 
kafkaStreamsAssignments.get(new ProcessId(clientId1));
+for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) {
+final UUID clientId2 = clientIds.get(i);
+final KafkaStreamsAssignment clientState2 = 
kafkaStreamsAssignments.get(new ProcessId(clientId2));
+
+final String rack1 = 
clientRacks.get(clientState1.processId().id()).get();
+final String rack2 = 
clientRacks.get(clientState2.processId().id()).get();
+// Cross rack traffic can not be reduced if racks are the 
same
+if (rack1.equals(rack2)) {
+continue;
+}
+
+final List movable1 = 
getMovableTasks.apply(clientState1, clientState2);
+final List movable2 = 
getMovableTasks.apply(clientState2, clientState1);
+
+// There's no needed to optimize if one is empty because 
the optimization
+// can only swap tasks to keep the client's load balanced
+if (movable1.isEmpty() || movable2.isEmpty()) {
+continue;
+}
+
+final List taskIdList = 
Stream.concat(movable1.stream(), movable2.stream())
+.sorted()
+.collect(Collectors.toList());
+final List clients = Stream.of(clientId1, 
clientId2).sorted().collect(Collectors.toList());
+
+final AssignmentGraph assignmentGraph = buildTaskGraph(
+assignmentsByUuid,
+clientRacks,
+taskIdList,
+clients,
+topicPartitionsByTaskId,
+crossRackTrafficCost,
+nonOverlapCost,
+false,
+false,

Review Comment:
   you're right, good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-30 Thread via GitHub


apourchet opened a new pull request, #16147:
URL: https://github.com/apache/kafka/pull/16147

   This PR takes care of making the call back 
to`TaskAssignor.onAssignmentComputed`.
   
   It also contains a change to the public AssignmentConfigs API, as well as 
some simplifications of the StickyTaskAssignor.
   
   This PR also changes the rack information fetching to happen lazily in the 
case where the TaskAssignor makes its decisions without said rack information.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 15) Implement #defaultStandbyTaskAssignment and finish rack-aware standby optimization [kafka]

2024-05-30 Thread via GitHub


ableegoldman merged PR #16129:
URL: https://github.com/apache/kafka/pull/16129


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]

2024-05-30 Thread via GitHub


ableegoldman commented on PR #16129:
URL: https://github.com/apache/kafka/pull/16129#issuecomment-2140934597

   Test failures are unrelated. Merging to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]

2024-05-30 Thread via GitHub


ableegoldman commented on code in PR #16129:
URL: https://github.com/apache/kafka/pull/16129#discussion_r1621467009


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -555,18 +556,21 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
 
 RackUtils.annotateTopicPartitionsWithRackInfo(cluster, 
internalTopicManager, allTopicPartitions);
 
-final Set logicalTasks = logicalTaskIds.stream().map(taskId 
-> {
-final Set stateStoreNames = topologyMetadata
-.stateStoreNameToSourceTopicsForTopology(taskId.topologyName())
-.keySet();
-final Set topicPartitions = 
topicPartitionsForTask.get(taskId);
-return new DefaultTaskInfo(
-taskId,
-!stateStoreNames.isEmpty(),
-stateStoreNames,
-topicPartitions
-);
-}).collect(Collectors.toSet());
+final Map logicalTasks = 
logicalTaskIds.stream().collect(Collectors.toMap(
+Function.identity(),
+taskId -> {
+final Set stateStoreNames = topologyMetadata
+
.stateStoreNameToSourceTopicsForTopology(taskId.topologyName())

Review Comment:
   Ah somehow I missed this before -- this is actually returning _all_ the 
state stores for this topology, it's not specific to the taskId. This was an 
existing issue so we don't need to fix it in this PR, it can be addressed in a 
followup. It might be a bit complicated so I'll take a look at how we can get 
this info
   
   Would've caught this during testing since we definitely want tests with 
mixed stateless-and-stateful tasks, but still good to fix ASAP



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -72,6 +80,27 @@ public static Map 
identityAssignment(final Ap
 return assignments;
 }
 
+/**
+ * Assign standby tasks to KafkaStreams clients according to the default 
logic.
+ * 
+ * If rack-aware client tags are configured, the rack-aware standby task 
assignor will be used
+ *
+ * @param applicationStatethe metadata and other info describing 
the current application state
+ * @param kafkaStreamsAssignments the current assignment of tasks to 
KafkaStreams clients
+ *
+ * @return a new map containing the mappings from KafkaStreamsAssignments 
updated with the default standby assignment
+ */
+public static Map 
defaultStandbyTaskAssignment(final ApplicationState applicationState,
+   
   final Map kafkaStreamsAssignments) {
+if 
(!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty()) {
+return tagBasedStandbyTaskAssignment(applicationState, 
kafkaStreamsAssignments);
+} else if (canPerformRackAwareOptimization(applicationState, 
AssignedTask.Type.STANDBY)) {
+return tagBasedStandbyTaskAssignment(applicationState, 
kafkaStreamsAssignments);

Review Comment:
   Address in a followup:
   
   We should just remove this case entirely right? basically it's "if 
hasRackAwareTags then do tag-based standby task assignment, if doesNotHaveTags 
then do default standby task assignment"
   
   Note that the tag-based rack aware assignment has nothing to do with the 
rack ids. So `canPerformRackAwareOptimization` is kind of irrelevant to the 
question here



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -407,4 +543,345 @@ private static boolean hasValidRackInformation(final 
TaskInfo task,
 }
 return true;
 }
+
+private static Map 
tagBasedStandbyTaskAssignment(final ApplicationState applicationState,
+   
 final Map kafkaStreamsAssignments) {
+final int numStandbyReplicas = 
applicationState.assignmentConfigs().numStandbyReplicas();
+final Map tasksToRemainingStandbys = 
applicationState.allTasks().values().stream()
+.collect(Collectors.toMap(TaskInfo::id, taskInfo -> 
numStandbyReplicas));
+final Map streamStates = 
applicationState.kafkaStreamsStates(false);
+
+final Set rackAwareAssignmentTags = new 
HashSet<>(getRackAwareAssignmentTags(applicationState));
+final TagStatistics tagStatistics = new 
TagStatistics(applicationState);
+
+final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = 
standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments);
+
+final Set statefulTaskIds = 
applicationState.allTasks().values().stream()
+.filter(TaskInfo::isStateful)
+.map(TaskInfo::id)
+.collect(Collectors.toSet());
+final Map clientsByUuid = 

Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-30 Thread via GitHub


junrao commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140919323

   @jolshan : Thanks for pointing this out. Sorry that I didn't look at the 
test results carefully before merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]

2024-05-30 Thread via GitHub


jolshan merged PR #16130:
URL: https://github.com/apache/kafka/pull/16130


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16130:
URL: https://github.com/apache/kafka/pull/16130#discussion_r1621463208


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -449,8 +449,8 @@ object KafkaConfig {
   /** Internal Configurations **/
   // This indicates whether unreleased APIs should be advertised by this 
node.
   .defineInternal(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, 
BOOLEAN, false, HIGH)
-  // This indicates whether unreleased MetadataVersions should be enabled 
on this node.
-  .defineInternal(ServerConfigs.UNSTABLE_METADATA_VERSIONS_ENABLE_CONFIG, 
BOOLEAN, false, HIGH)
+  // This indicates whether unreleased MetadataVersions or other feature 
versions should be enabled on this node.
+  .defineInternal(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, 
BOOLEAN, false, HIGH)

Review Comment:
   Yup -- this is the text in the KIP:
   > Add INTERNAL configuration unstable.feature.versions.enable to allow for 
non production ready features to be used (for testing)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]

2024-05-30 Thread via GitHub


jolshan commented on PR #16130:
URL: https://github.com/apache/kafka/pull/16130#issuecomment-2140914372

   I filed https://issues.apache.org/jira/browse/KAFKA-16866 for the one 
failure and that is getting fixed separately. 
   
   As for the others, looks like they are frequent flakes. I will go ahead and 
merge. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1621445974


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment = 

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1621442335


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java:
##
@@ -66,21 +66,19 @@ public GroupAssignment assign(
 GroupSpec groupSpec,
 SubscribedTopicDescriber subscribedTopicDescriber
 ) throws PartitionAssignorException {
-AbstractUniformAssignmentBuilder assignmentBuilder;
-
 if (groupSpec.members().isEmpty())
 return new GroupAssignment(Collections.emptyMap());
 
 if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
 LOG.debug("Detected that all members are subscribed to the same 
set of topics, invoking the "
 + "optimized assignment algorithm");
-assignmentBuilder = new 
OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber);
+return new OptimizedUniformAssignmentBuilder(groupSpec, 
subscribedTopicDescriber)
+.build();

Review Comment:
   any reason why we changed the name to not match the general assignor? Or is 
this also changed in the original that renamed the files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16866 RemoteLogManagerTest.testCopyQuotaManagerConfig failing [kafka]

2024-05-30 Thread via GitHub


chia7712 opened a new pull request, #16146:
URL: https://github.com/apache/kafka/pull/16146

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140866041

   @jolshan I file https://github.com/apache/kafka/pull/16146 to fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1621424229


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -71,63 +69,54 @@ public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignment
  */
 private final Set subscribedTopicIds;
 
-/**
- * The number of members to receive an extra partition beyond the minimum 
quota.
- * Minimum Quota = Total Partitions / Total Members
- * Example: If there are 11 partitions to be distributed among 3 members,
- *  each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 
3) members get an extra partition.
- */
-private int remainingMembersToGetAnExtraPartition;
-
 /**
  * Members mapped to the remaining number of partitions needed to meet the 
minimum quota.
- * Minimum quota = total partitions / total members.
  */
-private Map potentiallyUnfilledMembers;
+private final List potentiallyUnfilledMembers;

Review Comment:
   why do we call this potentiallyUnfilledMembers rather than unfilledMembers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16833: Fixing PartitionInfo and Cluster equals and hashCode [kafka]

2024-05-30 Thread via GitHub


chia7712 merged PR #16062:
URL: https://github.com/apache/kafka/pull/16062


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16833: Fixing PartitionInfo and Cluster equals and hashCode [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on PR #16062:
URL: https://github.com/apache/kafka/pull/16062#issuecomment-2140856964

   The failed test is traced by 
https://issues.apache.org/jira/browse/KAFKA-16866


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1621422471


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment = 

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1621421194


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment = 

Re: [PR] [MINOR] Code Cleanup - Connect Module [kafka]

2024-05-30 Thread via GitHub


chia7712 merged PR #16066:
URL: https://github.com/apache/kafka/pull/16066


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move ZKConfigs related static method out of core and into ZKConfigs [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on PR #16109:
URL: https://github.com/apache/kafka/pull/16109#issuecomment-2140831052

   @OmniaGM nice idea but we need to fix conflicts first :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Adjust validateOffsetCommit in ConsumerGroup to ensure compatibility with classic protocol members [kafka]

2024-05-30 Thread via GitHub


dongnuo123 commented on code in PR #16145:
URL: https://github.com/apache/kafka/pull/16145#discussion_r1621405276


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -325,24 +325,12 @@ private Group validateOffsetCommit(
 }

Review Comment:
   I kind of forget why we wanted to check `GroupIdNotFoundException`. I feel 
the current implementation does support the classic protocol member



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Adjust validateOffsetCommit in ConsumerGroup to ensure compatibility with classic protocol members [kafka]

2024-05-30 Thread via GitHub


dongnuo123 commented on code in PR #16145:
URL: https://github.com/apache/kafka/pull/16145#discussion_r1621403173


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##
@@ -857,6 +857,7 @@ public void validateOffsetCommit(
 throw Errors.UNKNOWN_MEMBER_ID.exception();
 }
 
+// TODO: A temp marker. Will remove it when the pr is open.
 if (!isTransactional && isInState(COMPLETING_REBALANCE)) {

Review Comment:
   Not sure why we only check `COMPLETING_REBALANCE` but not 
`PREPARING_REBALANCE`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Adds a test case to test that an exception is thrown in invalid ports [kafka]

2024-05-30 Thread via GitHub


chia7712 merged PR #16112:
URL: https://github.com/apache/kafka/pull/16112


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-30 Thread via GitHub


kirktrue closed pull request #16031: KAFKA-16200: Enforce that RequestManager 
implementations respect user-provided timeout
URL: https://github.com/apache/kafka/pull/16031


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Adjust validateOffsetCommit in ConsumerGroup to ensure compatibility with classic protocol members [kafka]

2024-05-30 Thread via GitHub


dongnuo123 opened a new pull request, #16145:
URL: https://github.com/apache/kafka/pull/16145

   During online migration, there could be ConsumerGroup that has members that 
uses the classic protocol. In the current implementation, `STALE_MEMBER_EPOCH` 
could be thrown in ConsumerGroup offset fetch/commit validation but it's not 
supported by the classic protocol. Thus this patch changed 
`ConsumerGroup#validateOffsetCommit` to ensure  compatibility.
   
   There's no need to change `ConsumerGroup#validateOffsetFetch` because the 
member id and member epoch are always empty and -1 in the classic protocol, so 
the offset fetch request is always valid.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change KStreamKstreamOuterJoinTest to use distinct left and right types [kafka]

2024-05-30 Thread via GitHub


gharris1727 commented on code in PR #15513:
URL: https://github.com/apache/kafka/pull/15513#discussion_r1621400369


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -737,12 +739,12 @@ public void runOuterJoin(final StreamJoined streamJoine
 inputTopic1.pipeInput(expectedKey, "C" + expectedKey);
 }
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "C0+a0", 0L),
-new KeyValueTimestamp<>(0, "C0+b0", 0L),
-new KeyValueTimestamp<>(1, "C1+a1", 0L),
-new KeyValueTimestamp<>(1, "C1+b1", 0L),
-new KeyValueTimestamp<>(2, "C2+b2", 0L),
-new KeyValueTimestamp<>(3, "C3+b3", 0L)
+new KeyValueTimestamp<>(0, "C0+0", 0L),
+new KeyValueTimestamp<>(0, "C0+0", 0L),
+new KeyValueTimestamp<>(1, "C1+1", 0L),
+new KeyValueTimestamp<>(1, "C1+1", 0L),

Review Comment:
   You're right, I didn't notice this. I did a search-and-replace renaming, and 
reverted the stuff which didn't make sense.
   
   I did have to manually renumber stuff like "a0-0", and some places where 
capital letters "A0" were used on the inputStream2 to fit the pattern better. 
PTAL, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-05-30 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1621395136


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -327,8 +339,11 @@ void testEnsureEventsAreCompleted() {
 assertTrue(applicationEventsQueue.isEmpty());
 }
 
+// Look into this one

Review Comment:
   My mistake leaving that there, that was a comment for myself that I forgot 
to remove



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-05-30 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1621386306


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -149,20 +157,28 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 "The consumer network thread did not stop within " + 
DEFAULT_MAX_WAIT_MS + " ms");
 }
 
+@Test
+void testRequestManagersArePolledOnce() {
+consumerNetworkThread.runOnce();
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm, times(1)).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm, times(1)).maximumTimeToWait(anyLong(;
+verify(networkClientDelegate).poll(anyLong(), anyLong());
+}
+
 @Test
 public void testApplicationEvent() {
 ApplicationEvent e = new PollEvent(100);
 applicationEventsQueue.add(e);
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+verify(applicationEventProcessor).process(e);

Review Comment:
   I checked Mockito documentation and adding times(1) is redundant, so not 
really a big deal either way to keep it or remove it. Do let me know though if 
there is a stylistic preference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-30 Thread via GitHub


jolshan commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140804438

   Can we look at testCopyQuotaManagerConfig() – 
kafka.log.remote.RemoteLogManagerTest? It seems like it is failing pretty 
consistently. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]

2024-05-30 Thread via GitHub


brenden20 commented on PR #16124:
URL: https://github.com/apache/kafka/pull/16124#issuecomment-2140800759

   @kirktrue thank you for the suggestions, I have implemented and pushed your 
suggestions. Let me know if everything looks good!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Adds a test case to test that an exception is thrown in invalid ports [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on PR #16112:
URL: https://github.com/apache/kafka/pull/16112#issuecomment-2140786920

   @ahmedryasser Thanks for your contribution. Could you please add "MINOR: " 
to your title?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]

2024-05-30 Thread via GitHub


cmccabe commented on code in PR #15945:
URL: https://github.com/apache/kafka/pull/15945#discussion_r1621372420


##
metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Tracks the registration of a specific broker, and executes a callback if it 
should be refreshed.
+ *
+ * This tracker handles cases where we might want to re-register the broker. 
The only such case
+ * right now is during the transition from non-JBOD mode, to JBOD mode. In 
other words, the
+ * transition from a MetadataVersion less than 3.7-IV2, to one greater than or 
equal to 3.7-IV2.
+ * In this case, the broker registration will start out containing no 
directories, and we need to
+ * resend the BrokerRegistrationRequest to fix that.
+ *
+ * As much as possible, the goal here is to keep things simple. We just 
compare the desired state
+ * with the actual state, and try to make changes only if necessary.
+ */
+public class BrokerRegistrationTracker implements MetadataPublisher {
+private final Logger log;
+private final int id;
+private final Runnable refreshRegistrationCallback;
+
+/**
+ * Create the tracker.
+ *
+ * @param idThe ID of this broker.
+ * @param targetDirectories The directories managed by this 
broker.
+ * @param refreshRegistrationCallback   Callback to run if we need to 
refresh the registration.
+ */
+public BrokerRegistrationTracker(
+int id,
+List targetDirectories,
+Runnable refreshRegistrationCallback
+) {
+this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] 
").
+logger(BrokerRegistrationTracker.class);
+this.id = id;
+this.refreshRegistrationCallback = refreshRegistrationCallback;
+}
+
+@Override
+public String name() {
+return "BrokerRegistrationTracker(id=" + id + ")";
+}
+
+@Override
+public void onMetadataUpdate(
+MetadataDelta delta,
+MetadataImage newImage,
+LoaderManifest manifest
+) {
+boolean checkBrokerRegistration = false;
+if (delta.featuresDelta() != null) {
+if (delta.metadataVersionChanged().isPresent()) {
+if (log.isTraceEnabled()) {
+log.trace("Metadata version change is present: {}",
+delta.metadataVersionChanged());
+}
+checkBrokerRegistration = true;
+}
+}
+if (delta.clusterDelta() != null) {
+if (delta.clusterDelta().changedBrokers().get(id) != null) {
+if (log.isTraceEnabled()) {
+log.trace("Broker change is present: {}",
+delta.clusterDelta().changedBrokers().get(id));
+}
+checkBrokerRegistration = true;
+}
+}
+if (checkBrokerRegistration) {
+if 
(brokerRegistrationNeedsRefresh(newImage.features().metadataVersion(),
+delta.clusterDelta().broker(id))) {
+refreshRegistrationCallback.run();
+}
+}
+}
+
+/**
+ * Check if the current broker registration needs to be refreshed.
+ *
+ * @param registration  The current broker registration, or null if there 
is none.
+ * @return  True only if we should refresh.
+ */
+boolean brokerRegistrationNeedsRefresh(
+MetadataVersion metadataVersion,
+BrokerRegistration registration
+) {
+// If there is no existing registration, the BrokerLifecycleManager 
must still be sending it.
+// So we don't 

Re: [PR] KAFKA-15853: Move configDef out of core [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on PR #16116:
URL: https://github.com/apache/kafka/pull/16116#issuecomment-2140785638

   @OmniaGM Sorry that please fix the conflicts again :_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1621368550


##
build.gradle:
##
@@ -787,6 +800,12 @@ subprojects {
 skipProjects = [ ":jmh-benchmarks", ":trogdor" ]
 skipConfigurations = [ "zinc" ]
   }
+
+  afterEvaluate {

Review Comment:
   Maybe we can set spotless directly. For example:
   ```
 if (project.name in spotlessApplyModules) {
   apply plugin: 'com.diffplug.spotless'
   spotless {
 java {
   importOrder('kafka', 'org.apache.kafka', 'com', 'net', 'org', 
'java', 'javax', '', '\\#')
   removeUnusedImports()
 }
   }
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]

2024-05-30 Thread via GitHub


gharris1727 commented on code in PR #16095:
URL: https://github.com/apache/kafka/pull/16095#discussion_r1621349710


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -231,6 +231,12 @@ private synchronized void onFailure(Throwable t) {
 if (this.state == State.FAILED)
 return;
 
+// Call stop() on the connector to release its resources. Connector
+// could fail in the start() method, which is why we call stop() on
+// INIT state as well.
+if (this.state == State.STARTED || this.state == State.INIT)
+connector.stop();

Review Comment:
   This is a potentially blocking call to the connector, and I don't think 
that's a good fit for this onFailure handler. This call would delay the 
statusListener call, which delays notifying the REST API of the FAILED status 
and updating the metrics. If it blocks indefinitely, the status and metrics are 
never updated.
   
   There is a connector.stop() call in doShutdown that could be changed to 
execute for the INIT and FAILED states. That would leave the resources 
allocated while the connector is waiting in the FAILED state, but would at 
least ensure they don't leak long-term.
   
   We may also change the control flow to make the transition to the FAILED 
state trigger doShutdown early, rather than having it wait() with all the 
resources still allocated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1621358471


##
checkstyle/suppressions.xml:
##
@@ -361,4 +361,7 @@
 
 
+
+

Review Comment:
   why we need this change?



##
build.gradle:
##
@@ -1007,7 +1026,7 @@ project(':core') {
 testImplementation libs.junitJupiter
 testImplementation libs.slf4jlog4j
 testImplementation libs.caffeine
-

Review Comment:
   please revert this change



##
build.gradle:
##
@@ -47,7 +47,7 @@ plugins {
   // Updating the shadow plugin version to 8.1.1 causes issue with signing and 
publishing the shadowed
   // artifacts - see https://github.com/johnrengelman/shadow/issues/901
   id 'com.github.johnrengelman.shadow' version '8.1.0' apply false
-  id 'com.diffplug.spotless' version '6.14.0' apply false // 6.14.1 and newer 
require Java 11 at compile time, so we can't upgrade until AK 4.0
+  id 'com.diffplug.spotless' version "${spotlessVersion}" apply false

Review Comment:
   Do we need this variable? Also, why not using latest version `6.25.0`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Refactor DynamicConfig [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on code in PR #16133:
URL: https://github.com/apache/kafka/pull/16133#discussion_r1621350440


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -675,7 +677,7 @@ object DynamicLogConfig {
   // Exclude message.format.version for now since we need to check that the 
version
   // is supported on all brokers in the cluster.
   @nowarn("cat=deprecation")
-  val ExcludedConfigs = Set(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG)

Review Comment:
   Do we need this variable? Maybe we can remove it by following change.
   ```scala
 // Exclude message.format.version for now since we need to check that the 
version
 // is supported on all brokers in the cluster.
 @nowarn("cat=deprecation")
 val ReconfigurableConfigs = 
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet - 
ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG
   ```



##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -319,7 +321,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   }
 
   private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = 
CoreUtils.inWriteLock(lock) {
-val nonDynamic = 
configNames.filter(DynamicConfig.Broker.nonDynamicProps.contains)
+val nonDynamic = configNames.filter(nonDynamicProps.contains)

Review Comment:
   How about `configNames.intersect(nonDynamicProps)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-30 Thread via GitHub


dajac commented on PR #16120:
URL: https://github.com/apache/kafka/pull/16120#issuecomment-2140752725

   The build does not seem to start… I am not sure why.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it in CommitRequestManagerTest [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on PR #16115:
URL: https://github.com/apache/kafka/pull/16115#issuecomment-2140747685

   @brenden20, as mentioned on another one of your PRs, there's a checkstyle 
violation here. You can run this command locally to avoid waiting for the CI 
infrastructure to catch it:
   
   ```
   ./gradlew check -x test
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-30 Thread via GitHub


dajac commented on code in PR #16120:
URL: https://github.com/apache/kafka/pull/16120#discussion_r1621344907


##
server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum GroupVersion implements FeatureVersion {
+
+// Version 1 enables the classic rebalance protocol. This is the default
+// behavior even if the feature flag is not set.
+GV_1(1, MetadataVersion.IBP_3_8_IV0, Collections.emptyMap()),

Review Comment:
   Updated to use the version 0 approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on PR #16124:
URL: https://github.com/apache/kafka/pull/16124#issuecomment-2140745523

   The test failures are unrelated, FWIW.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1621342452


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -53,89 +39,111 @@
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
 
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation;
 import static 
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
 import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
+import static org.junit.jupiter.api.Assertions.*;

Review Comment:
   ```suggestion
   import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
   import static org.junit.jupiter.api.Assertions.assertEquals;
   import static org.junit.jupiter.api.Assertions.assertFalse;
   import static org.junit.jupiter.api.Assertions.assertTrue;
   import static org.mockito.ArgumentMatchers.any;
   ```



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -53,89 +39,111 @@
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
 
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation;
 import static 
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
 import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
+import static org.junit.jupiter.api.Assertions.*;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;

Review Comment:
   Just bring these explicit imports back to make checkstyle stop complaining
   
   ```suggestion
   import static org.mockito.ArgumentMatchers.eq;
   import static org.mockito.Mockito.doAnswer;
   import static org.mockito.Mockito.doThrow;
   import static org.mockito.Mockito.mock;
   import static org.mockito.Mockito.times;
   import static org.mockito.Mockito.verify;
   import static org.mockito.Mockito.when;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Enable transaction verification with new group coordinator in TransactionsTest [kafka]

2024-05-30 Thread via GitHub


dajac merged PR #16139:
URL: https://github.com/apache/kafka/pull/16139


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on PR #16140:
URL: https://github.com/apache/kafka/pull/16140#issuecomment-2140741298

   Looks like there are some checkstyle failures due to the use of wildcard 
imports.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1621334851


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -1290,4 +1290,4 @@ static class MemberInfo {
 this.memberEpoch = Optional.empty();
 }
 }
-}
+}

Review Comment:
   We should revert/fix this change as it's whitespace only.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -53,89 +39,111 @@
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
 
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation;
 import static 
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
 import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
+import static org.junit.jupiter.api.Assertions.*;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+LogContext logContext = new LogContext();
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = new MockClient(time);
+
+this.networkClient = new NetworkClientDelegate(
+time,
+config,
+logContext,
+client
+);
 
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new 

Re: [PR] KAFKA-16223: Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on code in PR #15989:
URL: https://github.com/apache/kafka/pull/15989#discussion_r1621334504


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java:
##
@@ -1184,6 +1185,141 @@ public void 
testRestoreRestartRequestInconsistentState() {
 verify(configLog).stop();
 }
 
+@Test
+public void testPutTaskConfigsZeroTasks() throws Exception {
+when(configLog.partitionCount()).thenReturn(1);
+
+configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+verifyConfigure();
+configStorage.start();
+
+// Bootstrap as if we had already added the connector, but no tasks 
had been added yet
+whiteBoxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), 
Collections.emptyList());
+
+// Null before writing
+ClusterConfigState configState = configStorage.snapshot();
+assertEquals(-1, configState.offset());
+
+// Task configs should read to end, write to the log, read to end, 
write root.
+
doAnswer(expectReadToEnd(Collections.emptyMap())).when(configLog).readToEnd();
+
+expectConvertWriteRead(
+COMMIT_TASKS_CONFIG_KEYS.get(0), 
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0),
+"tasks", 0); // We have 0 tasks
+
+configStorage.putTaskConfigs("connector1", Collections.emptyList());
+
+// As soon as root is rewritten, we should see a callback notifying us 
that we reconfigured some tasks
+configUpdateListener.onTaskConfigUpdate(Collections.emptyList());

Review Comment:
   We need to use `verify` to make sure this method is called as expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on code in PR #16124:
URL: https://github.com/apache/kafka/pull/16124#discussion_r1621332051


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -478,6 +478,23 @@ public void resetTimer() {
 this.heartbeatTimer.reset(heartbeatIntervalMs);
 }
 
+@Override
+public String toStringBase() {
+return super.toStringBase() +
+", heartbeatTimer=" + heartbeatTimer +
+", heartbeatIntervalMs=" + heartbeatIntervalMs;
+}
+
+// Visible for testing
+protected Timer heartbeatTimer() {
+return this.heartbeatTimer;
+}
+
+// Visible for testing
+protected long heartbeatIntervalMs() {
+return this.heartbeatIntervalMs;

Review Comment:
   ```suggestion
   return heartbeatIntervalMs;
   ```



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -152,6 +152,34 @@ public void cleanup() {
 }
 }
 
+@Test
+public void testHeartBeatRequestStateToStringBase() {
+final long retryBackoffMs = 100;
+final long retryBackoffMaxMs = 1000;
+LogContext logContext = new LogContext();
+HeartbeatRequestState heartbeatRequestState1 = new 
HeartbeatRequestState(

Review Comment:
   Super nit: we can drop the `1` at the end of the variable name, right?
   
   ```suggestion
   HeartbeatRequestState heartbeatRequestState = new 
HeartbeatRequestState(
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -478,6 +478,23 @@ public void resetTimer() {
 this.heartbeatTimer.reset(heartbeatIntervalMs);
 }
 
+@Override
+public String toStringBase() {
+return super.toStringBase() +
+", heartbeatTimer=" + heartbeatTimer +
+", heartbeatIntervalMs=" + heartbeatIntervalMs;
+}
+
+// Visible for testing
+protected Timer heartbeatTimer() {
+return this.heartbeatTimer;

Review Comment:
   ```suggestion
   return heartbeatTimer;
   ```



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -152,6 +152,34 @@ public void cleanup() {
 }
 }
 
+@Test
+public void testHeartBeatRequestStateToStringBase() {
+final long retryBackoffMs = 100;
+final long retryBackoffMaxMs = 1000;
+LogContext logContext = new LogContext();
+HeartbeatRequestState heartbeatRequestState1 = new 
HeartbeatRequestState(
+logContext,
+time,
+10,
+retryBackoffMs,
+retryBackoffMaxMs,
+.2
+);
+
+RequestState requestState = new RequestState(
+logContext,
+
"org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager$HeartbeatRequestState",

Review Comment:
   Perhaps we could make `HeartbeatRequestState` package-protected, then we 
could do this:
   
   ```suggestion
   
HeartbeatRequestManager.HeartbeatRequestState.class.getName(),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16802 : Moving java versions inside java block [kafka]

2024-05-30 Thread via GitHub


gharris1727 merged PR #16135:
URL: https://github.com/apache/kafka/pull/16135


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16807: DescribeLogDirsResponseData#results#topics have unexpected topics having empty partitions [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on PR #16042:
URL: https://github.com/apache/kafka/pull/16042#issuecomment-2140664027

   > The test names are testDescribeLogDirsWithoutAnyPartitionTopic and 
testDescribeLogDirs.
   
   It seems to me the method needs to come with the guarantee: 
`DescribeLogDirsTopic` should not have empty `partitions`
   
   Hence, could you please add following asserts to `testDescribeLogDirs`
   
   ```scala
 responses.foreach { response =>
   assertEquals(Errors.NONE.code, response.errorCode)
   assertTrue(response.totalBytes > 0)
   assertTrue(response.usableBytes >= 0)
   assertFalse(response.topics().isEmpty)
   response.topics().forEach(t => assertFalse(t.partitions().isEmpty))
 }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: make public the consumer group migration policy config [kafka]

2024-05-30 Thread via GitHub


dajac merged PR #16128:
URL: https://github.com/apache/kafka/pull/16128


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on PR #16031:
URL: https://github.com/apache/kafka/pull/16031#issuecomment-2140558404

I'm going to close and reopen to force another build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-30 Thread via GitHub


kirktrue closed pull request #16031: KAFKA-16200: Enforce that RequestManager 
implementations respect user-provided timeout
URL: https://github.com/apache/kafka/pull/16031


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on PR #16031:
URL: https://github.com/apache/kafka/pull/16031#issuecomment-2140558063

   @lianetm—relevant test failures have been addressed. There are three 
unrelated test failures from flaky tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-05-30 Thread via GitHub


rreddy-22 commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1621207749


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberSubscriptionSpecImpl.java:
##
@@ -18,105 +18,63 @@
 
 import org.apache.kafka.common.Uuid;
 
-import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 
 /**
- * The assignment specification for a consumer group member.
+ * Implementation of the {@link MemberSubscriptionSpec} interface.
  */
-public class AssignmentMemberSpec {
-/**
- * The instance ID if provided.
- */
-private final Optional instanceId;
-
-/**
- * The rack ID if provided.
- */
+public class MemberSubscriptionSpecImpl implements MemberSubscriptionSpec {
 private final Optional rackId;
-
-/**
- * Topics Ids that the member is subscribed to.
- */
 private final Set subscribedTopicIds;
 
 /**
- * Partitions assigned keyed by topicId.
- */
-private final Map> assignedPartitions;
-
-/**
- * @return The instance ID as an Optional.
+ * Constructs a new {@code MemberSubscriptionSpecImpl}.
+ *
+ * @param rackIdThe rack Id.
+ * @param subscribedTopicIdsThe set of subscribed topic Ids.
  */
-public Optional instanceId() {
-return instanceId;
+public MemberSubscriptionSpecImpl(
+Optional rackId,
+Set subscribedTopicIds
+) {
+Objects.requireNonNull(rackId);
+Objects.requireNonNull(subscribedTopicIds);
+this.rackId = rackId;
+this.subscribedTopicIds = subscribedTopicIds;

Review Comment:
   I was just following the format I saw in TargetAssignmentResult and a few 
places, wasn't sure what to use



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-05-30 Thread via GitHub


rreddy-22 commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1621196989


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##
@@ -39,4 +41,20 @@ public interface GroupSpec {
  * False, otherwise.
  */
 boolean isPartitionAssigned(Uuid topicId, int partitionId);
+
+/**
+ * Gets the member subscription specification for a member.
+ *
+ * @param memberId  The member Id.
+ * @return The member's subscription metadata.
+ */
+MemberSubscriptionSpec memberSubscriptionSpec(String memberId);

Review Comment:
   I actually wanted to asak about whether we want to return null or return an 
empty object. I returned a new memberSubscriptionSpecImpl object in the impl



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-05-30 Thread via GitHub


rreddy-22 commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1621186636


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##
@@ -39,4 +41,20 @@ public interface GroupSpec {
  * False, otherwise.
  */
 boolean isPartitionAssigned(Uuid topicId, int partitionId);
+
+/**
+ * Gets the member subscription specification for a member.
+ *
+ * @param memberId  The member Id.
+ * @return The member's subscription metadata.
+ */
+MemberSubscriptionSpec memberSubscriptionSpec(String memberId);

Review Comment:
   thanks for the comment! I wanted to say that during our design discussions, 
we had agreed to keep it as memberSubscriptionSpec to maintain consistency and 
streamline the review process so we don't go back and forth. If there are new 
considerations or changes that we might not have anticipated, I would love to 
understand them better so we can make this more efficient going forward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-05-30 Thread via GitHub


rreddy-22 commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1621186636


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##
@@ -39,4 +41,20 @@ public interface GroupSpec {
  * False, otherwise.
  */
 boolean isPartitionAssigned(Uuid topicId, int partitionId);
+
+/**
+ * Gets the member subscription specification for a member.
+ *
+ * @param memberId  The member Id.
+ * @return The member's subscription metadata.
+ */
+MemberSubscriptionSpec memberSubscriptionSpec(String memberId);

Review Comment:
   thanks for the comment! I wanted to say that during our design discussions, 
we had agreed to keep it as memberSubscriptionSpec to maintain consistency and 
streamline the review process so we don't go back and forth. If there are new 
considerations or changes that we might not have anticipated, I would love to 
understand them better so we can make this more efficient going forward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16130:
URL: https://github.com/apache/kafka/pull/16130#discussion_r1621089819


##
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##
@@ -354,6 +354,7 @@ Found problem:
 MetadataVersion.LATEST_PRODUCTION,
 Map(TestFeatureVersion.FEATURE_NAME -> featureLevel),
 allFeatures,
+false,

Review Comment:
   `if (featureLevel <= 
Features.TEST_VERSION.defaultValue(MetadataVersion.LATEST_PRODUCTION))` this 
means we skip version 2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16827: Integrate kafka native-image with system tests [kafka]

2024-05-30 Thread via GitHub


omkreddy merged PR #16046:
URL: https://github.com/apache/kafka/pull/16046


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-05-30 Thread via GitHub


yashmayya commented on PR #13375:
URL: https://github.com/apache/kafka/pull/13375#issuecomment-2140247839

   @mdedetrich apologies for the late response, I didn't get notified for your 
comment oddly enough. Please feel free to take over, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14919 sync topic configs test [kafka]

2024-05-30 Thread via GitHub


gharris1727 commented on PR #16143:
URL: https://github.com/apache/kafka/pull/16143#issuecomment-2140209270

   Hi @anton-liauchuk Thanks for the PR!
   
   Is the KAFKA ticket linked the right one? I don't think this implementation 
matches the description of that ticket. I expected changes to the 
FakeLocalMetadataStore to be necessary.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]

2024-05-30 Thread via GitHub


junrao commented on code in PR #16130:
URL: https://github.com/apache/kafka/pull/16130#discussion_r1621034615


##
server-common/src/main/java/org/apache/kafka/server/common/Features.java:
##
@@ -76,16 +76,21 @@ public short latestProduction() {
 return defaultValue(MetadataVersion.LATEST_PRODUCTION);
 }
 
+public short latestTesting() {
+return featureVersions[featureVersions.length - 1].featureLevel();
+}
+
 /**
  * Creates a FeatureVersion from a level.
  *
  * @param level   the level of the feature
  * @return   the FeatureVersionUtils.FeatureVersion for the feature 
the enum is based on.
  * @throwsIllegalArgumentException if the feature is not known.
  */
-public FeatureVersion fromFeatureLevel(short level) {
+public FeatureVersion fromFeatureLevel(short level,
+   boolean 
allowUnstableFeatureVersions) {

Review Comment:
   Could we add the new param to javadoc?



##
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##
@@ -354,6 +354,7 @@ Found problem:
 MetadataVersion.LATEST_PRODUCTION,
 Map(TestFeatureVersion.FEATURE_NAME -> featureLevel),
 allFeatures,
+false,

Review Comment:
   Hmm, TEST_VERSION level 2 is not in production and should show an exception 
when calling `StorageTool.generateFeatureRecord`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Re-add EventAccumulator.poll(long, TimeUnit) [kafka]

2024-05-30 Thread via GitHub


jeffkbkim opened a new pull request, #16144:
URL: https://github.com/apache/kafka/pull/16144

   We have revamped the thread idle ratio metric in 
https://github.com/apache/kafka/pull/15835. 
https://github.com/apache/kafka/pull/15835#discussion_r1588068337 describes a 
case where the metric loses accuracy and in order to set a lower bound to the 
accuracy, this patch re-adds a poll with a timeout that was removed as part of 
https://github.com/apache/kafka/pull/15430.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14919 sync topic configs test [kafka]

2024-05-30 Thread via GitHub


anton-liauchuk commented on PR #16143:
URL: https://github.com/apache/kafka/pull/16143#issuecomment-2140109909

   @gharris1727 
   
   Please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14919 sync topic configs test [kafka]

2024-05-30 Thread via GitHub


anton-liauchuk opened a new pull request, #16143:
URL: https://github.com/apache/kafka/pull/16143

   KAFKA-14919 sync topic configs test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-30 Thread via GitHub


junrao merged PR #15625:
URL: https://github.com/apache/kafka/pull/15625


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16120:
URL: https://github.com/apache/kafka/pull/16120#discussion_r1621009962


##
server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum GroupVersion implements FeatureVersion {
+
+// Version 1 enables the classic rebalance protocol. This is the default
+// behavior even if the feature flag is not set.
+GV_1(1, MetadataVersion.IBP_3_8_IV0, Collections.emptyMap()),

Review Comment:
   When we bootstrap we technically will not get this version. Does that make 
sense? I'm wondering if the metadata version should be the bootstrap metadata 
version since it's a little odd to say the classic protocol is not set by 
default.
   
   Alternatively we take the version 0 approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16130:
URL: https://github.com/apache/kafka/pull/16130#discussion_r1621005757


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -449,8 +449,8 @@ object KafkaConfig {
   /** Internal Configurations **/
   // This indicates whether unreleased APIs should be advertised by this 
node.
   .defineInternal(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, 
BOOLEAN, false, HIGH)
-  // This indicates whether unreleased MetadataVersions should be enabled 
on this node.
-  .defineInternal(ServerConfigs.UNSTABLE_METADATA_VERSIONS_ENABLE_CONFIG, 
BOOLEAN, false, HIGH)
+  // This indicates whether unreleased MetadataVersions or other feature 
versions should be enabled on this node.
+  .defineInternal(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, 
BOOLEAN, false, HIGH)

Review Comment:
   Yeah, it's a fair question. I think the original intent was to keep it 
internal and only use for testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Revert "KAFKA-16448: Add ProcessingExceptionHandler interface and implementations (#16090)" [kafka]

2024-05-30 Thread via GitHub


cadonna opened a new pull request, #16142:
URL: https://github.com/apache/kafka/pull/16142

   This reverts commit 8d11d9579503426edfaeae791ec4bb212da37ad2.
   
   We decided to not release KIP-1033 with AK 3.8
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Revert "KAFKA-16448: Add ProcessingExceptionHandler in Streams configuration (#16092)" [kafka]

2024-05-30 Thread via GitHub


cadonna merged PR #16141:
URL: https://github.com/apache/kafka/pull/16141


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Revert "KAFKA-16448: Add ProcessingExceptionHandler in Streams configuration (#16092)" [kafka]

2024-05-30 Thread via GitHub


cadonna opened a new pull request, #16141:
URL: https://github.com/apache/kafka/pull/16141

   We decided not to release KIP-1033 with AK 3.8 
   
   This reverts commit 3f70c46874e1dd9591443392f51ff1efc9fdc40e.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   4   5   6   7   8   9   10   >