Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
dajac commented on code in PR #16120: URL: https://github.com/apache/kafka/pull/16120#discussion_r1621844225 ## metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java: ## @@ -74,7 +74,7 @@ public void testDefaultFeatureMapWithUnstable() { for (Features feature : Features.PRODUCTION_FEATURES) { expectedFeatures.put(feature.featureName(), VersionRange.of( 0, -feature.defaultValue(MetadataVersion.LATEST_PRODUCTION) +feature.defaultValue(MetadataVersion.latestTesting()) Review Comment: Indeed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1621843719 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -71,63 +69,54 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment */ private final Set subscribedTopicIds; -/** - * The number of members to receive an extra partition beyond the minimum quota. - * Minimum Quota = Total Partitions / Total Members - * Example: If there are 11 partitions to be distributed among 3 members, - * each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition. - */ -private int remainingMembersToGetAnExtraPartition; - /** * Members mapped to the remaining number of partitions needed to meet the minimum quota. - * Minimum quota = total partitions / total members. */ -private Map potentiallyUnfilledMembers; +private final List potentiallyUnfilledMembers; Review Comment: There is a set number of extra partitions that we can give out once every member gets an equal number of partitions. For exmaple if there were 5 partitions and 3 members. Each member gets 1 each and then we have 2 extras. Let's say A,B,C have all received 1 partition, they are all potentially unfilled. Once A and B get their extra partitions, C isn't unfilled and doesn't receive any partitions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
dajac commented on code in PR #16120: URL: https://github.com/apache/kafka/pull/16120#discussion_r1621843638 ## core/src/test/java/kafka/test/ClusterInstance.java: ## @@ -159,9 +158,7 @@ default Set supportedGroupProtocols() { Set supportedGroupProtocols = new HashSet<>(); supportedGroupProtocols.add(CLASSIC); -// KafkaConfig#isNewGroupCoordinatorEnabled check both NEW_GROUP_COORDINATOR_ENABLE_CONFIG and GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "").equals("true") || Review Comment: We will eventually remove it for sure. This (private) config is still the one use to determine whether the new group coordinator should be enabled or not. The trick to know is that it is automatically set to true when `consumer` is specified as a protocol. We made this to simplify the early access and the preview. The handling was a bit messy before this patch as the new protocol was enabled whenever the new coordinator ran. It is better now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on PR #16068: URL: https://github.com/apache/kafka/pull/16068#issuecomment-2141348418 There are two tests failing in GroupMetadataManagerTest, will look into it tomorrow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
ableegoldman commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1621827532 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); +} catch (final StreamsException e) { Review Comment: Good point -- I agree with Bruno, the catch block should be for just ConfigException or StreamsException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling fetches from remote storage [kafka]
abhijeetk88 commented on code in PR #16071: URL: https://github.com/apache/kafka/pull/16071#discussion_r1621675519 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -6597,6 +6597,79 @@ class ReplicaManagerTest { )) } } + + @Test + def testRemoteReadQuotaExceeded(): Unit = { +when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(true) + +val tp0 = new TopicPartition(topic, 0) +val tpId0 = new TopicIdPartition(topicId, tp0) +val fetch: Seq[(TopicIdPartition, LogReadResult)] = readFromLogWithOffsetOutOfRange(tp0) + +assertEquals(1, fetch.size) +assertEquals(tpId0, fetch.head._1) +val fetchInfo = fetch.head._2.info +assertEquals(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, fetchInfo.fetchOffsetMetadata) +assertFalse(fetchInfo.records.records().iterator().hasNext) +assertFalse(fetchInfo.firstEntryIncomplete) +assertFalse(fetchInfo.abortedTransactions.isPresent) +assertFalse(fetchInfo.delayedRemoteStorageFetch.isPresent) + } + + @Test + def testRemoteReadQuotaNotExceeded(): Unit = { +when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(false) + +val tp0 = new TopicPartition(topic, 0) +val tpId0 = new TopicIdPartition(topicId, tp0) +val fetch: Seq[(TopicIdPartition, LogReadResult)] = readFromLogWithOffsetOutOfRange(tp0) + +assertEquals(1, fetch.size) +assertEquals(tpId0, fetch.head._1) +val fetchInfo = fetch.head._2.info +assertEquals(1L, fetchInfo.fetchOffsetMetadata.messageOffset) +assertEquals(UnifiedLog.UnknownOffset, fetchInfo.fetchOffsetMetadata.segmentBaseOffset) +assertEquals(-1, fetchInfo.fetchOffsetMetadata.relativePositionInSegment) +assertEquals(MemoryRecords.EMPTY, fetchInfo.records) +assertTrue(fetchInfo.delayedRemoteStorageFetch.isPresent) + } + + private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): Seq[(TopicIdPartition, LogReadResult)] = { +val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true) +try { + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + replicaManager.createPartition(tp).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, None) + val partition0Replicas = Seq[Integer](0, 1).asJava + val topicIds = Map(tp.topic -> topicId).asJava + val leaderEpoch = 0 + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, +Seq( + new LeaderAndIsrPartitionState() +.setTopicName(tp.topic) +.setPartitionIndex(tp.partition) +.setControllerEpoch(0) +.setLeader(leaderEpoch) Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2141192632 Thanks @chia7712 @jolshan . Apologies for the miss. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] ignore [kafka]
github-actions[bot] commented on PR #15355: URL: https://github.com/apache/kafka/pull/15355#issuecomment-2141172395 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]
gongxuanzhang commented on code in PR #16097: URL: https://github.com/apache/kafka/pull/16097#discussion_r1621586240 ## checkstyle/suppressions.xml: ## @@ -361,4 +361,7 @@ + + Review Comment: I think checkstyle should consistent with auto format . If you open the A module auto format, we should open the module check rule. `ImportOrder` rule can't custom in each module(It's going to take a lot of changes,maybe should add `build.gradle` every module). So i add this line in order that open rule some module in future,This is what I think is a more convenient way to modify by module -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]
gongxuanzhang commented on code in PR #16097: URL: https://github.com/apache/kafka/pull/16097#discussion_r1621586240 ## checkstyle/suppressions.xml: ## @@ -361,4 +361,7 @@ + + Review Comment: I think checkstyle should consistent with auto format . If you open the A module auto format, we should open the module check rule. `ImportOrder` rule can't custom in each module(It's going to take a lot of changes,maybe should add build.gradle every module). So i add this line in order that open rule some module in future,This is what I think is a more convenient way to modify by module -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]
gongxuanzhang commented on code in PR #16097: URL: https://github.com/apache/kafka/pull/16097#discussion_r1621614857 ## build.gradle: ## @@ -787,6 +800,12 @@ subprojects { skipProjects = [ ":jmh-benchmarks", ":trogdor" ] skipConfigurations = [ "zinc" ] } + + afterEvaluate { Review Comment: I change the PR,please review it @chia7712 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on code in PR #16068: URL: https://github.com/apache/kafka/pull/16068#discussion_r1621587957 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java: ## @@ -585,34 +586,34 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith )); // Initial subscriptions were [T1, T2] -Map members = new HashMap<>(); +Map members = new TreeMap<>(); +Map>> assignedPartitions = new HashMap<>(); Map> currentAssignmentForA = mkAssignment( mkTopicAssignment(topic1Uuid, 0, 2), mkTopicAssignment(topic2Uuid, 1, 3) ); -members.put(memberA, new AssignmentMemberSpec( +assignedPartitions.put(memberA, currentAssignmentForA); +members.put(memberA, new MemberSubscriptionSpecImpl( Optional.empty(), -Optional.empty(), -Collections.singleton(topic1Uuid), -currentAssignmentForA +Collections.singleton(topic1Uuid) )); Map> currentAssignmentForB = mkAssignment( mkTopicAssignment(topic1Uuid, 1), mkTopicAssignment(topic2Uuid, 0, 2, 4) ); -members.put(memberB, new AssignmentMemberSpec( -Optional.empty(), +assignedPartitions.put(memberB, currentAssignmentForB); +members.put(memberB, new MemberSubscriptionSpecImpl( Optional.empty(), -mkSet(topic1Uuid, topic2Uuid), -currentAssignmentForB +new HashSet<>(Arrays.asList(topic1Uuid, topic2Uuid)) )); GroupSpec groupSpec = new GroupSpecImpl( members, HETEROGENEOUS, -invertedTargetAssignment(members) +assignedPartitions, +invertedTargetAssignment(assignedPartitions, members) ); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Review Comment: yep sounds good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]
gongxuanzhang commented on code in PR #16097: URL: https://github.com/apache/kafka/pull/16097#discussion_r1621586240 ## checkstyle/suppressions.xml: ## @@ -361,4 +361,7 @@ + + Review Comment: I think checkstyle should consistent with auto format . If you open the A module auto format, we should open the module check rule. So i add this line in order that open rule some module in future -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]
gongxuanzhang commented on code in PR #16097: URL: https://github.com/apache/kafka/pull/16097#discussion_r1621584578 ## build.gradle: ## @@ -47,7 +47,7 @@ plugins { // Updating the shadow plugin version to 8.1.1 causes issue with signing and publishing the shadowed // artifacts - see https://github.com/johnrengelman/shadow/issues/901 id 'com.github.johnrengelman.shadow' version '8.1.0' apply false - id 'com.diffplug.spotless' version '6.14.0' apply false // 6.14.1 and newer require Java 11 at compile time, so we can't upgrade until AK 4.0 + id 'com.diffplug.spotless' version "${spotlessVersion}" apply false Review Comment: i write in `gradle.properties` comment, because after 6.13.0 require Java 11 , we should compatibility java 8 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
satishd commented on PR #15820: URL: https://github.com/apache/kafka/pull/15820#issuecomment-2141069773 @abhijeetk88 Can you resolve the conflicts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16866 RemoteLogManagerTest.testCopyQuotaManagerConfig failing [kafka]
satishd merged PR #16146: URL: https://github.com/apache/kafka/pull/16146 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16866 RemoteLogManagerTest.testCopyQuotaManagerConfig failing [kafka]
satishd commented on PR #16146: URL: https://github.com/apache/kafka/pull/16146#issuecomment-2141065172 A few failing tests are unrelated to the change. Merging this change to the trunk to unblock the test failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]
jolshan commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r1621535872 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, Map partitionsWithUpdatedLeaderInfo = new HashMap<>(); produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> { -TopicPartition tp = new TopicPartition(r.name(), p.index()); +// Version 12 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name. +String topicName = (r.name() == null || r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name(); Review Comment: Yes. For the fetch request for example, there is code to make sure that all topics have IDs before we can send the fetch request. This is a bit less of an issue now, but if we have a cluster that is running on a MV < 2.8, topics will not have IDs. So when we decide which version of produce we want to send, we want to be aware of this. Not only that, but even if the broker supports topic IDs on all topics, we also may have a case where we need to do a rolling upgrade to get the code that supports handling the latest API version. This may be less complicated for Produce since it is a client only API and doesn't rely on MV/IBP, so the apiVersions exchange between the client and the broker may be enough to ensure api compatibility. We just want to confirm these upgrade paths are compatible since produce is the hot path and we don't want any (or at least not extended) downtime in the middle of an upgrade. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]
jolshan commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r1621535872 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, Map partitionsWithUpdatedLeaderInfo = new HashMap<>(); produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> { -TopicPartition tp = new TopicPartition(r.name(), p.index()); +// Version 12 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name. +String topicName = (r.name() == null || r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name(); Review Comment: Yes. For the fetch request for example, there is code to make sure that all topics have IDs before we can send the fetch request. This is a bit less of an issue now, but if we have a cluster that is running on a MV < 2.8, not all topics will have IDs. So when we decide which version of produce we want to send, we want to be aware of this. Not only that, but even if the broker supports topic IDs on all topics, we also may have a case where we need to do a rolling upgrade to get the code that supports handling the latest API version. This may be less complicated for Produce since it is a client only API and doesn't rely on MV/IBP, so the apiVersions exchange between the client and the broker may be enough to ensure api compatibility. We just want to confirm these upgrade paths are compatible since produce is the hot path and we don't want any (or at least not extended) downtime in the middle of an upgrade. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]
ableegoldman commented on code in PR #16147: URL: https://github.com/apache/kafka/pull/16147#discussion_r1621532246 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final Set topicPa */ private static boolean canPerformRackAwareOptimization(final ApplicationState applicationState, final AssignedTask.Type taskType) { -final String rackAwareAssignmentStrategy = applicationState.assignmentConfigs().rackAwareAssignmentStrategy(); +final AssignmentConfigs assignmentConfigs = applicationState.assignmentConfigs(); +final String rackAwareAssignmentStrategy = assignmentConfigs.rackAwareAssignmentStrategy(); if (StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy)) { +LOG.warn("Rack aware task assignment optimization disabled: rack aware strategy was set to {}", +rackAwareAssignmentStrategy); +return false; +} + +if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) { +LOG.warn("Rack aware task assignment optimization unavailable: the traffic cost configuration was not set."); Review Comment: We should log the exact config name since otherwise people won't necessarily know what this is referring to (especially since they already forgot to set this config). ```suggestion LOG.warn("Rack aware task assignment optimization unavailable: must configure {}", StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG); ``` ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final Set topicPa */ private static boolean canPerformRackAwareOptimization(final ApplicationState applicationState, final AssignedTask.Type taskType) { -final String rackAwareAssignmentStrategy = applicationState.assignmentConfigs().rackAwareAssignmentStrategy(); +final AssignmentConfigs assignmentConfigs = applicationState.assignmentConfigs(); +final String rackAwareAssignmentStrategy = assignmentConfigs.rackAwareAssignmentStrategy(); if (StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy)) { +LOG.warn("Rack aware task assignment optimization disabled: rack aware strategy was set to {}", +rackAwareAssignmentStrategy); +return false; +} + +if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) { +LOG.warn("Rack aware task assignment optimization unavailable: the traffic cost configuration was not set."); return false; } + +if (!assignmentConfigs.rackAwareNonOverlapCost().isPresent()) { +LOG.warn("Rack aware task assignment optimization unavailable: the non-overlap cost configuration was not set."); Review Comment: ```suggestion LOG.warn("Rack aware task assignment optimization unavailable: must configure {}", StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG); ``` ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ## @@ -40,8 +41,8 @@ public AssignmentConfigs(final StreamsConfig configs) { configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG), configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG), - configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG), - configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG), + Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)), + Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG)), Review Comment: don't we need to check `if (assignorClassName.equals("org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor"))` and set these to the sticky assignor defaults if true? Where `assignorClassName` is equal to `streamsConfig.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG)` -- I guess maybe we do want the public `AssignmentConfigs` constructor to take in the StreamsConfig after all? ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final Set topicPa */ private static boolean canPerformRackAwareOptimization(final ApplicationState applicationState,
Re: [PR] MINOR: Add more unit tests to LogSegments [kafka]
showuon commented on code in PR #16085: URL: https://github.com/apache/kafka/pull/16085#discussion_r1621524702 ## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ## @@ -4251,6 +4251,22 @@ class UnifiedLogTest { assertEquals(new LogOffsetMetadata(14, -1L, -1), log.maybeConvertToOffsetMetadata(14)) } + @Test + def testGetFirstBatchTimestampForSegments(): Unit = { +val log = createLog(logDir, LogTestUtils.createLogConfig()) + +val segments: java.util.List[LogSegment] = new java.util.ArrayList[LogSegment]() +val seg1 = LogTestUtils.createSegment(1, logDir, 10, Time.SYSTEM) +val seg2 = LogTestUtils.createSegment(2, logDir, 10, Time.SYSTEM) Review Comment: Forgot to close them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on code in PR #16068: URL: https://github.com/apache/kafka/pull/16068#discussion_r1621518800 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java: ## @@ -18,48 +18,60 @@ import org.apache.kafka.common.Uuid; +import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.Objects; +import java.util.Optional; +import java.util.Set; /** * The assignment specification for a consumer group. */ public class GroupSpecImpl implements GroupSpec { /** - * The member metadata keyed by member Id. + * Member subscription metadata keyed by member Id. */ -private final Map members; +private final Map memberSubscriptions; /** - * The subscription type followed by the group. + * The subscription type of the group. */ private final SubscriptionType subscriptionType; +/** + * Partitions currently assigned to each member keyed by topicId. + */ +private final Map>> currentAssignment; Review Comment: I used memberAssigment and invertedMemberAssignment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on code in PR #16068: URL: https://github.com/apache/kafka/pull/16068#discussion_r1621508931 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java: ## @@ -39,4 +41,20 @@ public interface GroupSpec { * False, otherwise. */ boolean isPartitionAssigned(Uuid topicId, int partitionId); + +/** + * Gets the member subscription specification for a member. + * + * @param memberId The member Id. + * @return The member's subscription metadata. + */ +MemberSubscriptionSpec memberSubscriptionSpec(String memberId); Review Comment: discussed offline, we want to throw an IllegalStateException when the memberId is not found -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
jolshan commented on code in PR #16120: URL: https://github.com/apache/kafka/pull/16120#discussion_r1621497961 ## core/src/test/java/kafka/test/ClusterInstance.java: ## @@ -159,9 +158,7 @@ default Set supportedGroupProtocols() { Set supportedGroupProtocols = new HashSet<>(); supportedGroupProtocols.add(CLASSIC); -// KafkaConfig#isNewGroupCoordinatorEnabled check both NEW_GROUP_COORDINATOR_ENABLE_CONFIG and GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "").equals("true") || Review Comment: Mostly confused because we don't check for the config in kafka apis anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
jolshan commented on code in PR #16120: URL: https://github.com/apache/kafka/pull/16120#discussion_r1621496853 ## metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java: ## @@ -74,7 +74,7 @@ public void testDefaultFeatureMapWithUnstable() { for (Features feature : Features.PRODUCTION_FEATURES) { expectedFeatures.put(feature.featureName(), VersionRange.of( 0, -feature.defaultValue(MetadataVersion.LATEST_PRODUCTION) +feature.defaultValue(MetadataVersion.latestTesting()) Review Comment: Hmm was this just a bug in the test... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]
pasharik commented on code in PR #15830: URL: https://github.com/apache/kafka/pull/15830#discussion_r1621488481 ## core/src/main/scala/kafka/admin/AclCommand.scala: ## @@ -115,8 +115,6 @@ object AclCommand extends Logging { val aclBindings = acls.map(acl => new AclBinding(resource, acl)).asJavaCollection adminClient.createAcls(aclBindings).all().get() } - -listAcls(adminClient) Review Comment: - I've moved KRaft tests into a new `AclCommandIntegrationTest.java` - Left old Zookeeper tests in `AclCommandTest.scala`. As I understand, we are going to completely delete this test file, once fully moved to KRaft, am I right? Do you think it's worth migrating those tests to java at this stage, if they are going to be deleted anyway? - Race condition described above, is still reproduced on a new infrastructure :cry: So if there are no objections, we can probably remove console output `Current ACLs` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
jolshan commented on code in PR #16120: URL: https://github.com/apache/kafka/pull/16120#discussion_r1621487824 ## core/src/test/java/kafka/test/ClusterInstance.java: ## @@ -159,9 +158,7 @@ default Set supportedGroupProtocols() { Set supportedGroupProtocols = new HashSet<>(); supportedGroupProtocols.add(CLASSIC); -// KafkaConfig#isNewGroupCoordinatorEnabled check both NEW_GROUP_COORDINATOR_ENABLE_CONFIG and GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "").equals("true") || Review Comment: do we plan to remove this config value from GroupCoordinatorConfig? I see it was removed from a lot of files, but there are still a few where it is used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]
soarez commented on PR #15945: URL: https://github.com/apache/kafka/pull/15945#issuecomment-2140953114 There are some conflicts that need addressing, and the JDK 21 pipeline didn't run. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]
soarez commented on code in PR #15945: URL: https://github.com/apache/kafka/pull/15945#discussion_r1621485662 ## metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java: ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.publisher; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.List; + +/** + * Tracks the registration of a specific broker, and executes a callback if it should be refreshed. + * + * This tracker handles cases where we might want to re-register the broker. The only such case + * right now is during the transition from non-JBOD mode, to JBOD mode. In other words, the + * transition from a MetadataVersion less than 3.7-IV2, to one greater than or equal to 3.7-IV2. + * In this case, the broker registration will start out containing no directories, and we need to + * resend the BrokerRegistrationRequest to fix that. + * + * As much as possible, the goal here is to keep things simple. We just compare the desired state + * with the actual state, and try to make changes only if necessary. + */ +public class BrokerRegistrationTracker implements MetadataPublisher { +private final Logger log; +private final int id; +private final Runnable refreshRegistrationCallback; + +/** + * Create the tracker. + * + * @param idThe ID of this broker. + * @param targetDirectories The directories managed by this broker. + * @param refreshRegistrationCallback Callback to run if we need to refresh the registration. + */ +public BrokerRegistrationTracker( +int id, +List targetDirectories, +Runnable refreshRegistrationCallback +) { +this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] "). +logger(BrokerRegistrationTracker.class); +this.id = id; +this.refreshRegistrationCallback = refreshRegistrationCallback; +} + +@Override +public String name() { +return "BrokerRegistrationTracker(id=" + id + ")"; +} + +@Override +public void onMetadataUpdate( +MetadataDelta delta, +MetadataImage newImage, +LoaderManifest manifest +) { +boolean checkBrokerRegistration = false; +if (delta.featuresDelta() != null) { +if (delta.metadataVersionChanged().isPresent()) { +if (log.isTraceEnabled()) { +log.trace("Metadata version change is present: {}", +delta.metadataVersionChanged()); +} +checkBrokerRegistration = true; +} +} +if (delta.clusterDelta() != null) { +if (delta.clusterDelta().changedBrokers().get(id) != null) { +if (log.isTraceEnabled()) { +log.trace("Broker change is present: {}", +delta.clusterDelta().changedBrokers().get(id)); +} +checkBrokerRegistration = true; +} +} +if (checkBrokerRegistration) { +if (brokerRegistrationNeedsRefresh(newImage.features().metadataVersion(), +delta.clusterDelta().broker(id))) { +refreshRegistrationCallback.run(); +} +} +} + +/** + * Check if the current broker registration needs to be refreshed. + * + * @param registration The current broker registration, or null if there is none. + * @return True only if we should refresh. + */ +boolean brokerRegistrationNeedsRefresh( +MetadataVersion metadataVersion, +BrokerRegistration registration +) { +// If there is no existing registration, the BrokerLifecycleManager must still be sending it. +// So we don't
Re: [PR] KAFKA-15045: (KIP-924 pt. 15) Implement #defaultStandbyTaskAssignment and finish rack-aware standby optimization [kafka]
apourchet commented on code in PR #16129: URL: https://github.com/apache/kafka/pull/16129#discussion_r1621479244 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -244,18 +271,112 @@ public static Map optimizeRackAwareStandbyTas ); LOG.info("Assignment before standby task optimization has cost {}", initialCost); -throw new UnsupportedOperationException("Not yet Implemented."); +final MoveStandbyTaskPredicate moveablePredicate = getStandbyTaskMovePredicate(applicationState); +final BiFunction> getMovableTasks = (source, destination) -> { +return source.tasks().values().stream() +.filter(task -> task.type() == AssignedTask.Type.STANDBY) +.filter(task -> !destination.tasks().containsKey(task.id())) +.filter(task -> { +final KafkaStreamsState sourceState = kafkaStreamsStates.get(source.processId()); +final KafkaStreamsState destinationState = kafkaStreamsStates.get(source.processId()); +return moveablePredicate.canMoveStandbyTask(sourceState, destinationState, task.id(), kafkaStreamsAssignments); +}) +.map(AssignedTask::id) +.sorted() +.collect(Collectors.toList()); +}; + +final long startTime = System.currentTimeMillis(); +boolean taskMoved = true; +int round = 0; +final RackAwareGraphConstructor graphConstructor = RackAwareGraphConstructorFactory.create( + applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), taskIds); +while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) { +taskMoved = false; +round++; +for (int i = 0; i < kafkaStreamsAssignments.size(); i++) { +final UUID clientId1 = clientIds.get(i); +final KafkaStreamsAssignment clientState1 = kafkaStreamsAssignments.get(new ProcessId(clientId1)); +for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) { +final UUID clientId2 = clientIds.get(i); +final KafkaStreamsAssignment clientState2 = kafkaStreamsAssignments.get(new ProcessId(clientId2)); + +final String rack1 = clientRacks.get(clientState1.processId().id()).get(); +final String rack2 = clientRacks.get(clientState2.processId().id()).get(); +// Cross rack traffic can not be reduced if racks are the same +if (rack1.equals(rack2)) { +continue; +} + +final List movable1 = getMovableTasks.apply(clientState1, clientState2); +final List movable2 = getMovableTasks.apply(clientState2, clientState1); + +// There's no needed to optimize if one is empty because the optimization +// can only swap tasks to keep the client's load balanced +if (movable1.isEmpty() || movable2.isEmpty()) { +continue; +} + +final List taskIdList = Stream.concat(movable1.stream(), movable2.stream()) +.sorted() +.collect(Collectors.toList()); +final List clients = Stream.of(clientId1, clientId2).sorted().collect(Collectors.toList()); + +final AssignmentGraph assignmentGraph = buildTaskGraph( +assignmentsByUuid, +clientRacks, +taskIdList, +clients, +topicPartitionsByTaskId, +crossRackTrafficCost, +nonOverlapCost, +false, +false, Review Comment: you're right, good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]
apourchet opened a new pull request, #16147: URL: https://github.com/apache/kafka/pull/16147 This PR takes care of making the call back to`TaskAssignor.onAssignmentComputed`. It also contains a change to the public AssignmentConfigs API, as well as some simplifications of the StickyTaskAssignor. This PR also changes the rack information fetching to happen lazily in the case where the TaskAssignor makes its decisions without said rack information. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 15) Implement #defaultStandbyTaskAssignment and finish rack-aware standby optimization [kafka]
ableegoldman merged PR #16129: URL: https://github.com/apache/kafka/pull/16129 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]
ableegoldman commented on PR #16129: URL: https://github.com/apache/kafka/pull/16129#issuecomment-2140934597 Test failures are unrelated. Merging to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]
ableegoldman commented on code in PR #16129: URL: https://github.com/apache/kafka/pull/16129#discussion_r1621467009 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -555,18 +556,21 @@ private ApplicationState buildApplicationState(final TopologyMetadata topologyMe RackUtils.annotateTopicPartitionsWithRackInfo(cluster, internalTopicManager, allTopicPartitions); -final Set logicalTasks = logicalTaskIds.stream().map(taskId -> { -final Set stateStoreNames = topologyMetadata -.stateStoreNameToSourceTopicsForTopology(taskId.topologyName()) -.keySet(); -final Set topicPartitions = topicPartitionsForTask.get(taskId); -return new DefaultTaskInfo( -taskId, -!stateStoreNames.isEmpty(), -stateStoreNames, -topicPartitions -); -}).collect(Collectors.toSet()); +final Map logicalTasks = logicalTaskIds.stream().collect(Collectors.toMap( +Function.identity(), +taskId -> { +final Set stateStoreNames = topologyMetadata + .stateStoreNameToSourceTopicsForTopology(taskId.topologyName()) Review Comment: Ah somehow I missed this before -- this is actually returning _all_ the state stores for this topology, it's not specific to the taskId. This was an existing issue so we don't need to fix it in this PR, it can be addressed in a followup. It might be a bit complicated so I'll take a look at how we can get this info Would've caught this during testing since we definitely want tests with mixed stateless-and-stateful tasks, but still good to fix ASAP ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -72,6 +80,27 @@ public static Map identityAssignment(final Ap return assignments; } +/** + * Assign standby tasks to KafkaStreams clients according to the default logic. + * + * If rack-aware client tags are configured, the rack-aware standby task assignor will be used + * + * @param applicationStatethe metadata and other info describing the current application state + * @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients + * + * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default standby assignment + */ +public static Map defaultStandbyTaskAssignment(final ApplicationState applicationState, + final Map kafkaStreamsAssignments) { +if (!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty()) { +return tagBasedStandbyTaskAssignment(applicationState, kafkaStreamsAssignments); +} else if (canPerformRackAwareOptimization(applicationState, AssignedTask.Type.STANDBY)) { +return tagBasedStandbyTaskAssignment(applicationState, kafkaStreamsAssignments); Review Comment: Address in a followup: We should just remove this case entirely right? basically it's "if hasRackAwareTags then do tag-based standby task assignment, if doesNotHaveTags then do default standby task assignment" Note that the tag-based rack aware assignment has nothing to do with the rack ids. So `canPerformRackAwareOptimization` is kind of irrelevant to the question here ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -407,4 +543,345 @@ private static boolean hasValidRackInformation(final TaskInfo task, } return true; } + +private static Map tagBasedStandbyTaskAssignment(final ApplicationState applicationState, + final Map kafkaStreamsAssignments) { +final int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas(); +final Map tasksToRemainingStandbys = applicationState.allTasks().values().stream() +.collect(Collectors.toMap(TaskInfo::id, taskInfo -> numStandbyReplicas)); +final Map streamStates = applicationState.kafkaStreamsStates(false); + +final Set rackAwareAssignmentTags = new HashSet<>(getRackAwareAssignmentTags(applicationState)); +final TagStatistics tagStatistics = new TagStatistics(applicationState); + +final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments); + +final Set statefulTaskIds = applicationState.allTasks().values().stream() +.filter(TaskInfo::isStateful) +.map(TaskInfo::id) +.collect(Collectors.toSet()); +final Map clientsByUuid =
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
junrao commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140919323 @jolshan : Thanks for pointing this out. Sorry that I didn't look at the test results carefully before merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]
jolshan merged PR #16130: URL: https://github.com/apache/kafka/pull/16130 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]
jolshan commented on code in PR #16130: URL: https://github.com/apache/kafka/pull/16130#discussion_r1621463208 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -449,8 +449,8 @@ object KafkaConfig { /** Internal Configurations **/ // This indicates whether unreleased APIs should be advertised by this node. .defineInternal(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH) - // This indicates whether unreleased MetadataVersions should be enabled on this node. - .defineInternal(ServerConfigs.UNSTABLE_METADATA_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH) + // This indicates whether unreleased MetadataVersions or other feature versions should be enabled on this node. + .defineInternal(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH) Review Comment: Yup -- this is the text in the KIP: > Add INTERNAL configuration unstable.feature.versions.enable to allow for non production ready features to be used (for testing) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]
jolshan commented on PR #16130: URL: https://github.com/apache/kafka/pull/16130#issuecomment-2140914372 I filed https://issues.apache.org/jira/browse/KAFKA-16866 for the one failure and that is getting fixed separately. As for the others, looks like they are frequent flakes. I will go ahead and merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jolshan commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1621445974 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment =
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jolshan commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1621442335 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java: ## @@ -66,21 +66,19 @@ public GroupAssignment assign( GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber ) throws PartitionAssignorException { -AbstractUniformAssignmentBuilder assignmentBuilder; - if (groupSpec.members().isEmpty()) return new GroupAssignment(Collections.emptyMap()); if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { LOG.debug("Detected that all members are subscribed to the same set of topics, invoking the " + "optimized assignment algorithm"); -assignmentBuilder = new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber); +return new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber) +.build(); Review Comment: any reason why we changed the name to not match the general assignor? Or is this also changed in the original that renamed the files? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16866 RemoteLogManagerTest.testCopyQuotaManagerConfig failing [kafka]
chia7712 opened a new pull request, #16146: URL: https://github.com/apache/kafka/pull/16146 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
chia7712 commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140866041 @jolshan I file https://github.com/apache/kafka/pull/16146 to fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jolshan commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1621424229 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -71,63 +69,54 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment */ private final Set subscribedTopicIds; -/** - * The number of members to receive an extra partition beyond the minimum quota. - * Minimum Quota = Total Partitions / Total Members - * Example: If there are 11 partitions to be distributed among 3 members, - * each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition. - */ -private int remainingMembersToGetAnExtraPartition; - /** * Members mapped to the remaining number of partitions needed to meet the minimum quota. - * Minimum quota = total partitions / total members. */ -private Map potentiallyUnfilledMembers; +private final List potentiallyUnfilledMembers; Review Comment: why do we call this potentiallyUnfilledMembers rather than unfilledMembers? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16833: Fixing PartitionInfo and Cluster equals and hashCode [kafka]
chia7712 merged PR #16062: URL: https://github.com/apache/kafka/pull/16062 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16833: Fixing PartitionInfo and Cluster equals and hashCode [kafka]
chia7712 commented on PR #16062: URL: https://github.com/apache/kafka/pull/16062#issuecomment-2140856964 The failed test is traced by https://issues.apache.org/jira/browse/KAFKA-16866 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jolshan commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1621422471 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment =
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jolshan commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1621421194 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment =
Re: [PR] [MINOR] Code Cleanup - Connect Module [kafka]
chia7712 merged PR #16066: URL: https://github.com/apache/kafka/pull/16066 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move ZKConfigs related static method out of core and into ZKConfigs [kafka]
chia7712 commented on PR #16109: URL: https://github.com/apache/kafka/pull/16109#issuecomment-2140831052 @OmniaGM nice idea but we need to fix conflicts first :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Adjust validateOffsetCommit in ConsumerGroup to ensure compatibility with classic protocol members [kafka]
dongnuo123 commented on code in PR #16145: URL: https://github.com/apache/kafka/pull/16145#discussion_r1621405276 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -325,24 +325,12 @@ private Group validateOffsetCommit( } Review Comment: I kind of forget why we wanted to check `GroupIdNotFoundException`. I feel the current implementation does support the classic protocol member -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Adjust validateOffsetCommit in ConsumerGroup to ensure compatibility with classic protocol members [kafka]
dongnuo123 commented on code in PR #16145: URL: https://github.com/apache/kafka/pull/16145#discussion_r1621403173 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java: ## @@ -857,6 +857,7 @@ public void validateOffsetCommit( throw Errors.UNKNOWN_MEMBER_ID.exception(); } +// TODO: A temp marker. Will remove it when the pr is open. if (!isTransactional && isInState(COMPLETING_REBALANCE)) { Review Comment: Not sure why we only check `COMPLETING_REBALANCE` but not `PREPARING_REBALANCE`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Adds a test case to test that an exception is thrown in invalid ports [kafka]
chia7712 merged PR #16112: URL: https://github.com/apache/kafka/pull/16112 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue closed pull request #16031: KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout URL: https://github.com/apache/kafka/pull/16031 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Adjust validateOffsetCommit in ConsumerGroup to ensure compatibility with classic protocol members [kafka]
dongnuo123 opened a new pull request, #16145: URL: https://github.com/apache/kafka/pull/16145 During online migration, there could be ConsumerGroup that has members that uses the classic protocol. In the current implementation, `STALE_MEMBER_EPOCH` could be thrown in ConsumerGroup offset fetch/commit validation but it's not supported by the classic protocol. Thus this patch changed `ConsumerGroup#validateOffsetCommit` to ensure compatibility. There's no need to change `ConsumerGroup#validateOffsetFetch` because the member id and member epoch are always empty and -1 in the classic protocol, so the offset fetch request is always valid. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change KStreamKstreamOuterJoinTest to use distinct left and right types [kafka]
gharris1727 commented on code in PR #15513: URL: https://github.com/apache/kafka/pull/15513#discussion_r1621400369 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -737,12 +739,12 @@ public void runOuterJoin(final StreamJoined streamJoine inputTopic1.pipeInput(expectedKey, "C" + expectedKey); } processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "C0+a0", 0L), -new KeyValueTimestamp<>(0, "C0+b0", 0L), -new KeyValueTimestamp<>(1, "C1+a1", 0L), -new KeyValueTimestamp<>(1, "C1+b1", 0L), -new KeyValueTimestamp<>(2, "C2+b2", 0L), -new KeyValueTimestamp<>(3, "C3+b3", 0L) +new KeyValueTimestamp<>(0, "C0+0", 0L), +new KeyValueTimestamp<>(0, "C0+0", 0L), +new KeyValueTimestamp<>(1, "C1+1", 0L), +new KeyValueTimestamp<>(1, "C1+1", 0L), Review Comment: You're right, I didn't notice this. I did a search-and-replace renaming, and reverted the stuff which didn't make sense. I did have to manually renumber stuff like "a0-0", and some places where capital letters "A0" were used on the inputStream2 to fit the pattern better. PTAL, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1621395136 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -327,8 +339,11 @@ void testEnsureEventsAreCompleted() { assertTrue(applicationEventsQueue.isEmpty()); } +// Look into this one Review Comment: My mistake leaving that there, that was a comment for myself that I forgot to remove -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1621386306 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -149,20 +157,28 @@ public void testStartupAndTearDown() throws InterruptedException { "The consumer network thread did not stop within " + DEFAULT_MAX_WAIT_MS + " ms"); } +@Test +void testRequestManagersArePolledOnce() { +consumerNetworkThread.runOnce(); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; +verify(networkClientDelegate).poll(anyLong(), anyLong()); +} + @Test public void testApplicationEvent() { ApplicationEvent e = new PollEvent(100); applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +verify(applicationEventProcessor).process(e); Review Comment: I checked Mockito documentation and adding times(1) is redundant, so not really a big deal either way to keep it or remove it. Do let me know though if there is a stylistic preference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
jolshan commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140804438 Can we look at testCopyQuotaManagerConfig() – kafka.log.remote.RemoteLogManagerTest? It seems like it is failing pretty consistently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]
brenden20 commented on PR #16124: URL: https://github.com/apache/kafka/pull/16124#issuecomment-2140800759 @kirktrue thank you for the suggestions, I have implemented and pushed your suggestions. Let me know if everything looks good! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Adds a test case to test that an exception is thrown in invalid ports [kafka]
chia7712 commented on PR #16112: URL: https://github.com/apache/kafka/pull/16112#issuecomment-2140786920 @ahmedryasser Thanks for your contribution. Could you please add "MINOR: " to your title? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]
cmccabe commented on code in PR #15945: URL: https://github.com/apache/kafka/pull/15945#discussion_r1621372420 ## metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java: ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.publisher; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.List; + +/** + * Tracks the registration of a specific broker, and executes a callback if it should be refreshed. + * + * This tracker handles cases where we might want to re-register the broker. The only such case + * right now is during the transition from non-JBOD mode, to JBOD mode. In other words, the + * transition from a MetadataVersion less than 3.7-IV2, to one greater than or equal to 3.7-IV2. + * In this case, the broker registration will start out containing no directories, and we need to + * resend the BrokerRegistrationRequest to fix that. + * + * As much as possible, the goal here is to keep things simple. We just compare the desired state + * with the actual state, and try to make changes only if necessary. + */ +public class BrokerRegistrationTracker implements MetadataPublisher { +private final Logger log; +private final int id; +private final Runnable refreshRegistrationCallback; + +/** + * Create the tracker. + * + * @param idThe ID of this broker. + * @param targetDirectories The directories managed by this broker. + * @param refreshRegistrationCallback Callback to run if we need to refresh the registration. + */ +public BrokerRegistrationTracker( +int id, +List targetDirectories, +Runnable refreshRegistrationCallback +) { +this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] "). +logger(BrokerRegistrationTracker.class); +this.id = id; +this.refreshRegistrationCallback = refreshRegistrationCallback; +} + +@Override +public String name() { +return "BrokerRegistrationTracker(id=" + id + ")"; +} + +@Override +public void onMetadataUpdate( +MetadataDelta delta, +MetadataImage newImage, +LoaderManifest manifest +) { +boolean checkBrokerRegistration = false; +if (delta.featuresDelta() != null) { +if (delta.metadataVersionChanged().isPresent()) { +if (log.isTraceEnabled()) { +log.trace("Metadata version change is present: {}", +delta.metadataVersionChanged()); +} +checkBrokerRegistration = true; +} +} +if (delta.clusterDelta() != null) { +if (delta.clusterDelta().changedBrokers().get(id) != null) { +if (log.isTraceEnabled()) { +log.trace("Broker change is present: {}", +delta.clusterDelta().changedBrokers().get(id)); +} +checkBrokerRegistration = true; +} +} +if (checkBrokerRegistration) { +if (brokerRegistrationNeedsRefresh(newImage.features().metadataVersion(), +delta.clusterDelta().broker(id))) { +refreshRegistrationCallback.run(); +} +} +} + +/** + * Check if the current broker registration needs to be refreshed. + * + * @param registration The current broker registration, or null if there is none. + * @return True only if we should refresh. + */ +boolean brokerRegistrationNeedsRefresh( +MetadataVersion metadataVersion, +BrokerRegistration registration +) { +// If there is no existing registration, the BrokerLifecycleManager must still be sending it. +// So we don't
Re: [PR] KAFKA-15853: Move configDef out of core [kafka]
chia7712 commented on PR #16116: URL: https://github.com/apache/kafka/pull/16116#issuecomment-2140785638 @OmniaGM Sorry that please fix the conflicts again :_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]
chia7712 commented on code in PR #16097: URL: https://github.com/apache/kafka/pull/16097#discussion_r1621368550 ## build.gradle: ## @@ -787,6 +800,12 @@ subprojects { skipProjects = [ ":jmh-benchmarks", ":trogdor" ] skipConfigurations = [ "zinc" ] } + + afterEvaluate { Review Comment: Maybe we can set spotless directly. For example: ``` if (project.name in spotlessApplyModules) { apply plugin: 'com.diffplug.spotless' spotless { java { importOrder('kafka', 'org.apache.kafka', 'com', 'net', 'org', 'java', 'javax', '', '\\#') removeUnusedImports() } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]
gharris1727 commented on code in PR #16095: URL: https://github.com/apache/kafka/pull/16095#discussion_r1621349710 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java: ## @@ -231,6 +231,12 @@ private synchronized void onFailure(Throwable t) { if (this.state == State.FAILED) return; +// Call stop() on the connector to release its resources. Connector +// could fail in the start() method, which is why we call stop() on +// INIT state as well. +if (this.state == State.STARTED || this.state == State.INIT) +connector.stop(); Review Comment: This is a potentially blocking call to the connector, and I don't think that's a good fit for this onFailure handler. This call would delay the statusListener call, which delays notifying the REST API of the FAILED status and updating the metrics. If it blocks indefinitely, the status and metrics are never updated. There is a connector.stop() call in doShutdown that could be changed to execute for the INIT and FAILED states. That would leave the resources allocated while the connector is waiting in the FAILED state, but would at least ensure they don't leak long-term. We may also change the control flow to make the transition to the FAILED state trigger doShutdown early, rather than having it wait() with all the resources still allocated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]
chia7712 commented on code in PR #16097: URL: https://github.com/apache/kafka/pull/16097#discussion_r1621358471 ## checkstyle/suppressions.xml: ## @@ -361,4 +361,7 @@ + + Review Comment: why we need this change? ## build.gradle: ## @@ -1007,7 +1026,7 @@ project(':core') { testImplementation libs.junitJupiter testImplementation libs.slf4jlog4j testImplementation libs.caffeine - Review Comment: please revert this change ## build.gradle: ## @@ -47,7 +47,7 @@ plugins { // Updating the shadow plugin version to 8.1.1 causes issue with signing and publishing the shadowed // artifacts - see https://github.com/johnrengelman/shadow/issues/901 id 'com.github.johnrengelman.shadow' version '8.1.0' apply false - id 'com.diffplug.spotless' version '6.14.0' apply false // 6.14.1 and newer require Java 11 at compile time, so we can't upgrade until AK 4.0 + id 'com.diffplug.spotless' version "${spotlessVersion}" apply false Review Comment: Do we need this variable? Also, why not using latest version `6.25.0`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Refactor DynamicConfig [kafka]
chia7712 commented on code in PR #16133: URL: https://github.com/apache/kafka/pull/16133#discussion_r1621350440 ## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ## @@ -675,7 +677,7 @@ object DynamicLogConfig { // Exclude message.format.version for now since we need to check that the version // is supported on all brokers in the cluster. @nowarn("cat=deprecation") - val ExcludedConfigs = Set(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG) Review Comment: Do we need this variable? Maybe we can remove it by following change. ```scala // Exclude message.format.version for now since we need to check that the version // is supported on all brokers in the cluster. @nowarn("cat=deprecation") val ReconfigurableConfigs = ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet - ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG ``` ## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ## @@ -319,7 +321,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = CoreUtils.inWriteLock(lock) { -val nonDynamic = configNames.filter(DynamicConfig.Broker.nonDynamicProps.contains) +val nonDynamic = configNames.filter(nonDynamicProps.contains) Review Comment: How about `configNames.intersect(nonDynamicProps)`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
dajac commented on PR #16120: URL: https://github.com/apache/kafka/pull/16120#issuecomment-2140752725 The build does not seem to start… I am not sure why. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it in CommitRequestManagerTest [kafka]
kirktrue commented on PR #16115: URL: https://github.com/apache/kafka/pull/16115#issuecomment-2140747685 @brenden20, as mentioned on another one of your PRs, there's a checkstyle violation here. You can run this command locally to avoid waiting for the CI infrastructure to catch it: ``` ./gradlew check -x test ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
dajac commented on code in PR #16120: URL: https://github.com/apache/kafka/pull/16120#discussion_r1621344907 ## server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java: ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum GroupVersion implements FeatureVersion { + +// Version 1 enables the classic rebalance protocol. This is the default +// behavior even if the feature flag is not set. +GV_1(1, MetadataVersion.IBP_3_8_IV0, Collections.emptyMap()), Review Comment: Updated to use the version 0 approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]
kirktrue commented on PR #16124: URL: https://github.com/apache/kafka/pull/16124#issuecomment-2140745523 The test failures are unrelated, FWIW. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
kirktrue commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1621342452 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -53,89 +39,111 @@ import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation; import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; +import static org.junit.jupiter.api.Assertions.*; Review Comment: ```suggestion import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; ``` ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -53,89 +39,111 @@ import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation; import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; Review Comment: Just bring these explicit imports back to make checkstyle stop complaining ```suggestion import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Enable transaction verification with new group coordinator in TransactionsTest [kafka]
dajac merged PR #16139: URL: https://github.com/apache/kafka/pull/16139 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
kirktrue commented on PR #16140: URL: https://github.com/apache/kafka/pull/16140#issuecomment-2140741298 Looks like there are some checkstyle failures due to the use of wildcard imports. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
kirktrue commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1621334851 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -1290,4 +1290,4 @@ static class MemberInfo { this.memberEpoch = Optional.empty(); } } -} +} Review Comment: We should revert/fix this change as it's whitespace only. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -53,89 +39,111 @@ import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation; import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +LogContext logContext = new LogContext(); +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +this.metadata = mock(ConsumerMetadata.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.client = new MockClient(time); + +this.networkClient = new NetworkClientDelegate( +time, +config, +logContext, +client +); -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new
Re: [PR] KAFKA-16223: Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest [kafka]
chia7712 commented on code in PR #15989: URL: https://github.com/apache/kafka/pull/15989#discussion_r1621334504 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java: ## @@ -1184,6 +1185,141 @@ public void testRestoreRestartRequestInconsistentState() { verify(configLog).stop(); } +@Test +public void testPutTaskConfigsZeroTasks() throws Exception { +when(configLog.partitionCount()).thenReturn(1); + +configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); +verifyConfigure(); +configStorage.start(); + +// Bootstrap as if we had already added the connector, but no tasks had been added yet +whiteBoxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList()); + +// Null before writing +ClusterConfigState configState = configStorage.snapshot(); +assertEquals(-1, configState.offset()); + +// Task configs should read to end, write to the log, read to end, write root. + doAnswer(expectReadToEnd(Collections.emptyMap())).when(configLog).readToEnd(); + +expectConvertWriteRead( +COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0), +"tasks", 0); // We have 0 tasks + +configStorage.putTaskConfigs("connector1", Collections.emptyList()); + +// As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks +configUpdateListener.onTaskConfigUpdate(Collections.emptyList()); Review Comment: We need to use `verify` to make sure this method is called as expected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]
kirktrue commented on code in PR #16124: URL: https://github.com/apache/kafka/pull/16124#discussion_r1621332051 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -478,6 +478,23 @@ public void resetTimer() { this.heartbeatTimer.reset(heartbeatIntervalMs); } +@Override +public String toStringBase() { +return super.toStringBase() + +", heartbeatTimer=" + heartbeatTimer + +", heartbeatIntervalMs=" + heartbeatIntervalMs; +} + +// Visible for testing +protected Timer heartbeatTimer() { +return this.heartbeatTimer; +} + +// Visible for testing +protected long heartbeatIntervalMs() { +return this.heartbeatIntervalMs; Review Comment: ```suggestion return heartbeatIntervalMs; ``` ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -152,6 +152,34 @@ public void cleanup() { } } +@Test +public void testHeartBeatRequestStateToStringBase() { +final long retryBackoffMs = 100; +final long retryBackoffMaxMs = 1000; +LogContext logContext = new LogContext(); +HeartbeatRequestState heartbeatRequestState1 = new HeartbeatRequestState( Review Comment: Super nit: we can drop the `1` at the end of the variable name, right? ```suggestion HeartbeatRequestState heartbeatRequestState = new HeartbeatRequestState( ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -478,6 +478,23 @@ public void resetTimer() { this.heartbeatTimer.reset(heartbeatIntervalMs); } +@Override +public String toStringBase() { +return super.toStringBase() + +", heartbeatTimer=" + heartbeatTimer + +", heartbeatIntervalMs=" + heartbeatIntervalMs; +} + +// Visible for testing +protected Timer heartbeatTimer() { +return this.heartbeatTimer; Review Comment: ```suggestion return heartbeatTimer; ``` ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -152,6 +152,34 @@ public void cleanup() { } } +@Test +public void testHeartBeatRequestStateToStringBase() { +final long retryBackoffMs = 100; +final long retryBackoffMaxMs = 1000; +LogContext logContext = new LogContext(); +HeartbeatRequestState heartbeatRequestState1 = new HeartbeatRequestState( +logContext, +time, +10, +retryBackoffMs, +retryBackoffMaxMs, +.2 +); + +RequestState requestState = new RequestState( +logContext, + "org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager$HeartbeatRequestState", Review Comment: Perhaps we could make `HeartbeatRequestState` package-protected, then we could do this: ```suggestion HeartbeatRequestManager.HeartbeatRequestState.class.getName(), ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16802 : Moving java versions inside java block [kafka]
gharris1727 merged PR #16135: URL: https://github.com/apache/kafka/pull/16135 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16807: DescribeLogDirsResponseData#results#topics have unexpected topics having empty partitions [kafka]
chia7712 commented on PR #16042: URL: https://github.com/apache/kafka/pull/16042#issuecomment-2140664027 > The test names are testDescribeLogDirsWithoutAnyPartitionTopic and testDescribeLogDirs. It seems to me the method needs to come with the guarantee: `DescribeLogDirsTopic` should not have empty `partitions` Hence, could you please add following asserts to `testDescribeLogDirs` ```scala responses.foreach { response => assertEquals(Errors.NONE.code, response.errorCode) assertTrue(response.totalBytes > 0) assertTrue(response.usableBytes >= 0) assertFalse(response.topics().isEmpty) response.topics().forEach(t => assertFalse(t.partitions().isEmpty)) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: make public the consumer group migration policy config [kafka]
dajac merged PR #16128: URL: https://github.com/apache/kafka/pull/16128 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on PR #16031: URL: https://github.com/apache/kafka/pull/16031#issuecomment-2140558404 I'm going to close and reopen to force another build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue closed pull request #16031: KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout URL: https://github.com/apache/kafka/pull/16031 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on PR #16031: URL: https://github.com/apache/kafka/pull/16031#issuecomment-2140558063 @lianetm—relevant test failures have been addressed. There are three unrelated test failures from flaky tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on code in PR #16068: URL: https://github.com/apache/kafka/pull/16068#discussion_r1621207749 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberSubscriptionSpecImpl.java: ## @@ -18,105 +18,63 @@ import org.apache.kafka.common.Uuid; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; /** - * The assignment specification for a consumer group member. + * Implementation of the {@link MemberSubscriptionSpec} interface. */ -public class AssignmentMemberSpec { -/** - * The instance ID if provided. - */ -private final Optional instanceId; - -/** - * The rack ID if provided. - */ +public class MemberSubscriptionSpecImpl implements MemberSubscriptionSpec { private final Optional rackId; - -/** - * Topics Ids that the member is subscribed to. - */ private final Set subscribedTopicIds; /** - * Partitions assigned keyed by topicId. - */ -private final Map> assignedPartitions; - -/** - * @return The instance ID as an Optional. + * Constructs a new {@code MemberSubscriptionSpecImpl}. + * + * @param rackIdThe rack Id. + * @param subscribedTopicIdsThe set of subscribed topic Ids. */ -public Optional instanceId() { -return instanceId; +public MemberSubscriptionSpecImpl( +Optional rackId, +Set subscribedTopicIds +) { +Objects.requireNonNull(rackId); +Objects.requireNonNull(subscribedTopicIds); +this.rackId = rackId; +this.subscribedTopicIds = subscribedTopicIds; Review Comment: I was just following the format I saw in TargetAssignmentResult and a few places, wasn't sure what to use -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on code in PR #16068: URL: https://github.com/apache/kafka/pull/16068#discussion_r1621196989 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java: ## @@ -39,4 +41,20 @@ public interface GroupSpec { * False, otherwise. */ boolean isPartitionAssigned(Uuid topicId, int partitionId); + +/** + * Gets the member subscription specification for a member. + * + * @param memberId The member Id. + * @return The member's subscription metadata. + */ +MemberSubscriptionSpec memberSubscriptionSpec(String memberId); Review Comment: I actually wanted to asak about whether we want to return null or return an empty object. I returned a new memberSubscriptionSpecImpl object in the impl -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on code in PR #16068: URL: https://github.com/apache/kafka/pull/16068#discussion_r1621186636 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java: ## @@ -39,4 +41,20 @@ public interface GroupSpec { * False, otherwise. */ boolean isPartitionAssigned(Uuid topicId, int partitionId); + +/** + * Gets the member subscription specification for a member. + * + * @param memberId The member Id. + * @return The member's subscription metadata. + */ +MemberSubscriptionSpec memberSubscriptionSpec(String memberId); Review Comment: thanks for the comment! I wanted to say that during our design discussions, we had agreed to keep it as memberSubscriptionSpec to maintain consistency and streamline the review process so we don't go back and forth. If there are new considerations or changes that we might not have anticipated, I would love to understand them better so we can make this more efficient going forward. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]
rreddy-22 commented on code in PR #16068: URL: https://github.com/apache/kafka/pull/16068#discussion_r1621186636 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java: ## @@ -39,4 +41,20 @@ public interface GroupSpec { * False, otherwise. */ boolean isPartitionAssigned(Uuid topicId, int partitionId); + +/** + * Gets the member subscription specification for a member. + * + * @param memberId The member Id. + * @return The member's subscription metadata. + */ +MemberSubscriptionSpec memberSubscriptionSpec(String memberId); Review Comment: thanks for the comment! I wanted to say that during our design discussions, we had agreed to keep it as memberSubscriptionSpec to maintain consistency and streamline the review process so we don't go back and forth. If there are new considerations or changes that we might not have anticipated, I would love to understand them better so we can make this more efficient going forward. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]
jolshan commented on code in PR #16130: URL: https://github.com/apache/kafka/pull/16130#discussion_r1621089819 ## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ## @@ -354,6 +354,7 @@ Found problem: MetadataVersion.LATEST_PRODUCTION, Map(TestFeatureVersion.FEATURE_NAME -> featureLevel), allFeatures, +false, Review Comment: `if (featureLevel <= Features.TEST_VERSION.defaultValue(MetadataVersion.LATEST_PRODUCTION))` this means we skip version 2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16827: Integrate kafka native-image with system tests [kafka]
omkreddy merged PR #16046: URL: https://github.com/apache/kafka/pull/16046 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
yashmayya commented on PR #13375: URL: https://github.com/apache/kafka/pull/13375#issuecomment-2140247839 @mdedetrich apologies for the late response, I didn't get notified for your comment oddly enough. Please feel free to take over, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14919 sync topic configs test [kafka]
gharris1727 commented on PR #16143: URL: https://github.com/apache/kafka/pull/16143#issuecomment-2140209270 Hi @anton-liauchuk Thanks for the PR! Is the KAFKA ticket linked the right one? I don't think this implementation matches the description of that ticket. I expected changes to the FakeLocalMetadataStore to be necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]
junrao commented on code in PR #16130: URL: https://github.com/apache/kafka/pull/16130#discussion_r1621034615 ## server-common/src/main/java/org/apache/kafka/server/common/Features.java: ## @@ -76,16 +76,21 @@ public short latestProduction() { return defaultValue(MetadataVersion.LATEST_PRODUCTION); } +public short latestTesting() { +return featureVersions[featureVersions.length - 1].featureLevel(); +} + /** * Creates a FeatureVersion from a level. * * @param level the level of the feature * @return the FeatureVersionUtils.FeatureVersion for the feature the enum is based on. * @throwsIllegalArgumentException if the feature is not known. */ -public FeatureVersion fromFeatureLevel(short level) { +public FeatureVersion fromFeatureLevel(short level, + boolean allowUnstableFeatureVersions) { Review Comment: Could we add the new param to javadoc? ## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ## @@ -354,6 +354,7 @@ Found problem: MetadataVersion.LATEST_PRODUCTION, Map(TestFeatureVersion.FEATURE_NAME -> featureLevel), allFeatures, +false, Review Comment: Hmm, TEST_VERSION level 2 is not in production and should show an exception when calling `StorageTool.generateFeatureRecord`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Re-add EventAccumulator.poll(long, TimeUnit) [kafka]
jeffkbkim opened a new pull request, #16144: URL: https://github.com/apache/kafka/pull/16144 We have revamped the thread idle ratio metric in https://github.com/apache/kafka/pull/15835. https://github.com/apache/kafka/pull/15835#discussion_r1588068337 describes a case where the metric loses accuracy and in order to set a lower bound to the accuracy, this patch re-adds a poll with a timeout that was removed as part of https://github.com/apache/kafka/pull/15430. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14919 sync topic configs test [kafka]
anton-liauchuk commented on PR #16143: URL: https://github.com/apache/kafka/pull/16143#issuecomment-2140109909 @gharris1727 Please take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14919 sync topic configs test [kafka]
anton-liauchuk opened a new pull request, #16143: URL: https://github.com/apache/kafka/pull/16143 KAFKA-14919 sync topic configs test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
junrao merged PR #15625: URL: https://github.com/apache/kafka/pull/15625 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
jolshan commented on code in PR #16120: URL: https://github.com/apache/kafka/pull/16120#discussion_r1621009962 ## server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java: ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum GroupVersion implements FeatureVersion { + +// Version 1 enables the classic rebalance protocol. This is the default +// behavior even if the feature flag is not set. +GV_1(1, MetadataVersion.IBP_3_8_IV0, Collections.emptyMap()), Review Comment: When we bootstrap we technically will not get this version. Does that make sense? I'm wondering if the metadata version should be the bootstrap metadata version since it's a little odd to say the classic protocol is not set by default. Alternatively we take the version 0 approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]
jolshan commented on code in PR #16130: URL: https://github.com/apache/kafka/pull/16130#discussion_r1621005757 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -449,8 +449,8 @@ object KafkaConfig { /** Internal Configurations **/ // This indicates whether unreleased APIs should be advertised by this node. .defineInternal(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH) - // This indicates whether unreleased MetadataVersions should be enabled on this node. - .defineInternal(ServerConfigs.UNSTABLE_METADATA_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH) + // This indicates whether unreleased MetadataVersions or other feature versions should be enabled on this node. + .defineInternal(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH) Review Comment: Yeah, it's a fair question. I think the original intent was to keep it internal and only use for testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Revert "KAFKA-16448: Add ProcessingExceptionHandler interface and implementations (#16090)" [kafka]
cadonna opened a new pull request, #16142: URL: https://github.com/apache/kafka/pull/16142 This reverts commit 8d11d9579503426edfaeae791ec4bb212da37ad2. We decided to not release KIP-1033 with AK 3.8 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Revert "KAFKA-16448: Add ProcessingExceptionHandler in Streams configuration (#16092)" [kafka]
cadonna merged PR #16141: URL: https://github.com/apache/kafka/pull/16141 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Revert "KAFKA-16448: Add ProcessingExceptionHandler in Streams configuration (#16092)" [kafka]
cadonna opened a new pull request, #16141: URL: https://github.com/apache/kafka/pull/16141 We decided not to release KIP-1033 with AK 3.8 This reverts commit 3f70c46874e1dd9591443392f51ff1efc9fdc40e. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org